如何将空行(用于循环输出)附加到pyspark中的数据帧

2024-10-04 01:26:13 发布

您现在位置:Python中文网/ 问答频道 /正文

Data set Image

正如我们可以观察到的,我们有两个数据帧currdfhist_df,逻辑是在代码中定义的,预期的输出是

Output

因此,在预期的输出中,我们可以观察到,对于我们得到的每个迭代yellow marked data frame,总是有一行或者没有行,我们需要继续附加到最终的数据帧,如果数据帧(tmp)为空,那么只有pcode将被存储为实际的,其余列将为空

复制代码

import pyspark
import os
import warnings
warnings.filterwarnings('ignore')
import pyspark.sql.functions as sf
from pyspark.sql import*
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Pycode2Pyspark_RProducer').getOrCreate()
spark
     currdf = spark.createDataFrame([(66, "d","4"), 
                                    (67, "a", "0"), 
                                    (70, "b", "4"), 
                                    (71, "d", "4"),
                                    (45, "x", "3"),
                                    (48, "y", "3")],
                                    ("id", "pcode", "amt"))
    currdf.show()
    
    hist_df = spark.createDataFrame([(66, "d",4,1), 
                                    (67, "a", 0,0), 
                                    (70, "b", 4,1), 
                                    (71, "a", 9,0),
                                    (45, "c", 2,1),
                                    (12, "d", 7,0),
                                    (37, "b", 3,0),
                                    (89, "c", 1,0),
                                    (11, "e", 9,1),
                                    (79, "f", 6,1)],
                                    ("id", "pcode", "amt","status1"))
    hist_df.show()
    
            import functools
    from pyspark.sql import DataFrame
    dataCollect = currdf.collect()
    output_dfs = []
    for row in dataCollect:
        temp_var =  row['pcode']
        print(temp_var)    
        temp_filter =  hist_df.where((hist_df['pcode'] == temp_var)) # Filter 
        temp_filter.show()
        tmp = temp_filter.groupby('pcode').agg(sf.sum('amt').alias('amt'),sf.sum('status1').alias('status1'))
        tmp = tmp.withColumn('status', sf.when((sf.col('amt')> 3) & (sf.col('status1')> 0),'Yes').otherwise('No'))
        tmp.show()
        output_dfs.append(tmp)
    df_output = functools.reduce(DataFrame.union, output_dfs)

  

Tags: importdfoutputsqlshowsftemphist
1条回答
网友
1楼 · 发布于 2024-10-04 01:26:13
from pyspark.sql import DataFrame
dataCollect = claimdf.collect()
output_dfs = []
for row in dataCollect:
    temp_var =  row['pcode']
    #print(temp_var)   
    temp_filter =  hist_df.where((hist_df['pcode'] == temp_var)) # Filter
    #temp_filter.show()
    tmp = temp_filter.groupby('pcode').agg(sf.sum('amt').alias('amt'),sf.sum('status1').alias('status1'))
    tmp = tmp.withColumn('status', sf.when((sf.col('amt')> 3) & (sf.col('status1')> 0),'Yes').otherwise('No'))
    #tmp.show()
    if tmp.rdd.isEmpty():
        def convert(list):
            return tuple(list)
        columns = tmp.columns
        mylist = ['' for i in range(0,len(columns))]        
        element_index = [i[0] for i in  enumerate(columns) if i[1] == 'pcode'][0]
        mylist[element_index] =  temp_var
        mylist = convert(mylist)
        newRow = sqlContext.createDataFrame([mylist], columns)
        #newRow.show()
        output_dfs.append(newRow)
    else:
        output_dfs.append(tmp)
df_output = functools.reduce(DataFrame.union, output_dfs)

相关问题 更多 >