正如我们可以观察到的,我们有两个数据帧currdf
和hist_df
,逻辑是在代码中定义的,预期的输出是
因此,在预期的输出中,我们可以观察到,对于我们得到的每个迭代yellow marked data frame
,总是有一行或者没有行,我们需要继续附加到最终的数据帧,如果数据帧(tmp)为空,那么只有pcode将被存储为实际的,其余列将为空
复制代码
import pyspark
import os
import warnings
warnings.filterwarnings('ignore')
import pyspark.sql.functions as sf
from pyspark.sql import*
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Pycode2Pyspark_RProducer').getOrCreate()
spark
currdf = spark.createDataFrame([(66, "d","4"),
(67, "a", "0"),
(70, "b", "4"),
(71, "d", "4"),
(45, "x", "3"),
(48, "y", "3")],
("id", "pcode", "amt"))
currdf.show()
hist_df = spark.createDataFrame([(66, "d",4,1),
(67, "a", 0,0),
(70, "b", 4,1),
(71, "a", 9,0),
(45, "c", 2,1),
(12, "d", 7,0),
(37, "b", 3,0),
(89, "c", 1,0),
(11, "e", 9,1),
(79, "f", 6,1)],
("id", "pcode", "amt","status1"))
hist_df.show()
import functools
from pyspark.sql import DataFrame
dataCollect = currdf.collect()
output_dfs = []
for row in dataCollect:
temp_var = row['pcode']
print(temp_var)
temp_filter = hist_df.where((hist_df['pcode'] == temp_var)) # Filter
temp_filter.show()
tmp = temp_filter.groupby('pcode').agg(sf.sum('amt').alias('amt'),sf.sum('status1').alias('status1'))
tmp = tmp.withColumn('status', sf.when((sf.col('amt')> 3) & (sf.col('status1')> 0),'Yes').otherwise('No'))
tmp.show()
output_dfs.append(tmp)
df_output = functools.reduce(DataFrame.union, output_dfs)
相关问题 更多 >
编程相关推荐