如何在pyspark/hadoop中从python复制文件

2024-09-27 07:29:01 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用pyspark将数据帧保存为拼花文件或csv文件:

def write_df_as_parquet_file(df, path, mode="overwrite"):
    df = df.repartition(1)  # join partitions to produce 1 parquet file
    dfw = df.write.format("parquet").mode(mode)
    dfw.save(path)

def write_df_as_csv_file(df, path, mode="overwrite", header=True):
    df = df.repartition(1)  # join partitions to produce 1 csv file
    header = "true" if header else "false"
    dfw = df.write.format("csv").option("header", header).mode(mode)
    dfw.save(path)

但这会将parquet/csv文件保存在一个名为path的文件夹中,在那里它保存了一些我们不需要的其他文件,方法是:

4 files are created in path, but we only care about the PARQUET file

图片:https://ibb.co/9c1D8RL

基本上,我想创建一些函数,使用上述方法将文件保存到一个位置,然后将CSV或PARQUET文件移动到一个新位置。比如:

^{pr2}$

我怎么能做到呢?如何实现copy_file或{}?我在scala中看到了一些使用hadoopapi的解决方案,但是我无法在python中实现这一点。我想我需要使用sparkContext,但我仍在学习Hadoop,还没有找到实现它的方法。在


Tags: 文件csvpath方法dfmodedefas
1条回答
网友
1楼 · 发布于 2024-09-27 07:29:01

您可以使用Python的HDFS库之一连接到HDFS实例,然后执行所需的任何操作。在

来自hdfs3文档(https://hdfs3.readthedocs.io/en/latest/quickstart.html):

from hdfs3 import HDFileSystem
hdfs = HDFileSystem(host=<host>, port=<port>)
hdfs.mv(tmp_folder + "*.parquet", path)

把上面的内容打包成一个函数,就可以开始了。在

注意:我刚才以hdfs3为例。您也可以使用hdfsCLI。在

相关问题 更多 >

    热门问题