方法动态构建用于导出到csv的大型数据帧(spark或pandas)

2024-09-30 06:12:28 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个csv,我使用spark.read导入到databricks中。这个大文件包含每日级别的记录/事务。我将dataframe裁剪为5列,保留500000行。我正在尝试构建此源文件的摘要表,该表在月份级别(聚合)表示这些记录/事务

该脚本有一个filter/groupby/sum命令,该命令返回一行,将数据汇总为一个月的计数。查询返回的行如下所示:

+---------+---------+-------+-------------+
|  Country|StockCode|YYYY-MM|sum(Quantity)|
+---------+---------+-------+-------------+
|Singapore|        M| 2011-4|           10|
+---------+---------+-------+-------------+

脚本在源数据帧上迭代并每次返回。我无法使用此脚本的输出(显示或csv导出)。在Pypark和pandas,我都有问题。我不确定如何堆叠查询结果以及它应该是什么形式

#熊猫 如果我在pandas中执行此操作,脚本将花费很长时间生成文件(我相信pandas+me执行此操作的效率不高会导致持续时间延长)~2.5小时。display和write.csv命令工作得相当快,大约在几秒钟内完成

#皮斯帕克 如果我在pyspark中执行此操作,脚本需要大约10分钟才能完成,但是显示和导出会崩溃。笔记本电脑返回超时错误、重新启动或抛出崩溃错误

这种方法应该是动态创建列表列表,并在完全构建后将其转换为数据帧以供使用吗?我一直在尝试我遇到的所有方法,但我似乎没有取得任何进展

下面是生成结果的代码

#officeSummaryDFBefore
column_names = "Country|StockCode|YYYY-MM|Quantity"
monthlyCountsBeforeImpactDate = spark.createDataFrame(
  [
    tuple('' for i in column_names.split("|"))
  ],
  column_names.split("|")
).where("1=0")

monthlyCountsBeforeImpacteDateRow = spark.createDataFrame(
  [
    tuple('' for i in column_names.split("|"))
  ],
  column_names.split("|")
).where("1=0")

try :
  for country in country_lookup :
    country = country[0]
    print(country_count, " country(s) left")
    country_count = country_count - 1
    for stockCode in stockCode_lookup :
      stockCode = stockCode[0]
      monthlyCountsBeforeImpacteDateRow = dataBeforeImpactDate.filter((col("Country").rlike(country)) & (col("StockCode").rlike(stockCode))).groupby("Country", "StockCode", "YYYY-MM").sum()
      monthlyCountsBeforeImpacteDateRow.show()
      dfsCountsBefore = [monthlyCountsBeforeImpacteDateRow, monthlyCountsBeforeImpactDate]
      monthlyCountsBeforeImpactDate = reduce(DataFrame.union, dfsCountsBefore)
      
except Exception as e:
  print(e) 

我在循环内声明dfsCountsBeforeImpactDate,这似乎不正确,但当它在循环外时,返回为NULL


Tags: csvin命令脚本fornamescolumncountry
1条回答
网友
1楼 · 发布于 2024-09-30 06:12:28

IIUC 您正在查找country和stock以限制行,然后对它们进行分组以生成聚合

为什么不过滤df,然后分组

df = dataBeforeImpactDate

df = df.filter(col('country').isin(country_lookup) & col('stock').isin(stock_lookup))

df = df.groupby("Country", "StockCode", "YYYY-MM").sum()

df.show()

这将是更快的方式,因为你不循环周围的过滤器,也不需要工会

相关问题 更多 >

    热门问题