在不遇到StackOverflowerError异常的情况下向前填充pyspark DF值的最佳实践

2024-09-28 23:08:38 发布

您现在位置:Python中文网/ 问答频道 /正文

我有几个数据源,提供关于相同项(键)的数据。我的任务是加载数据源,将它们连接在一起(都有相同的键列)并填充forward值以得到DF。
我的问题是如何执行此过程而不获取StackOverflowerr异常?
我的代码运行于:

AWS Glue 1.0 job
10 workers, worker type G.1X
Spark 2.4
python 2

我从4个数据源加载数据,每次运行它们总共有大约250列和1 mil记录。
现在让我们举一个例子来解释确切的情况。假设我们只有两个D,每个D有几个列。DF1和DF2的键列如下:

df1 = spark.createDataFrame([   (1, '001', '2019-09-01 10:01:00.000', None, None, '-1', 'TEN'),
                                (2, '002', '2019-09-01 10:05:00.000', '4', '-1', 'TEN', None), 
                                (2, '003', '2019-09-01 10:10:00.000', None, '-1', 'TEN', '0'), 
                                (1, '004', '2019-09-01 10:11:00.000', '4', '-1', 'TEN', '0'), 
                                (2, '005', '2019-09-01 10:18:00.000', None, None, None, None), 
                                (1, '006', '2019-09-01 10:22:00.000', None, None, None, None), 
                                (1, '007', '2019-09-01 10:23:00.000', '6', '-1', 'NINE', '0')])
df1 = df1.toDF(*['df1_key_col', 'df1_lsn_col', 'df1_date_col', 'df1_col1', 'df1_col2', 'df1_col3', 'df1_col4'])
df1.show(truncate = False)

DF1输出为:

+-----------+-----------+-----------------------+--------+--------+--------+--------+
|df1_key_col|df1_lsn_col|df1_date_col           |df1_col1|df1_col2|df1_col3|df1_col4|
+-----------+-----------+-----------------------+--------+--------+--------+--------+
|1          |001        |2019-09-01 10:01:00.000|null    |null    |-1      |TEN     |
|2          |002        |2019-09-01 10:05:00.000|4       |-1      |TEN     |null    |
|2          |003        |2019-09-01 10:10:00.000|null    |-1      |TEN     |0       |
|1          |004        |2019-09-01 10:11:00.000|4       |-1      |TEN     |0       |
|2          |005        |2019-09-01 10:18:00.000|null    |null    |null    |null    |
|1          |006        |2019-09-01 10:22:00.000|null    |null    |null    |null    |
|1          |007        |2019-09-01 10:23:00.000|6       |-1      |NINE    |0       |
+-----------+-----------+-----------------------+--------+--------+--------+--------+

和DF2:

df2 = spark.createDataFrame([   (1, '33001', '2019-09-01 10:02:00.000', None, None, '-1', 'TEN'),
                                (1, '33002', '2019-09-01 10:03:00.000', '4', '-1', 'TEN', None), 
                                (2, '33003', '2019-09-01 10:04:00.000', None, '-1', 'TEN', '0'), 
                                (2, '33004', '2019-09-01 10:15:00.000', '4', '-1', 'TEN', '0'), 
                                (1, '33005', '2019-09-01 10:19:00.000', None, None, None, None), 
                                (1, '33006', '2019-09-01 10:23:00.000', None, None, None, None), 
                                (2, '33007', '2019-09-01 10:29:00.000', '6', '-1', 'NINE', '0')])
df2 = df2.toDF(*['df2_key_col', 'df2_lsn_col', 'df2_date_col', 'df2_col5', 'df2_col6', 'df2_col7', 'df2_col8'])
df2.show(truncate = False)

输出为:

+-----------+-----------+-----------------------+--------+--------+--------+--------+
|df2_key_col|df2_lsn_col|df2_date_col           |df2_col5|df2_col6|df2_col7|df2_col8|
+-----------+-----------+-----------------------+--------+--------+--------+--------+
|1          |33001      |2019-09-01 10:02:00.000|null    |null    |-1      |TEN     |
|1          |33002      |2019-09-01 10:03:00.000|4       |-1      |TEN     |null    |
|2          |33003      |2019-09-01 10:04:00.000|null    |-1      |TEN     |0       |
|2          |33004      |2019-09-01 10:15:00.000|4       |-1      |TEN     |0       |
|1          |33005      |2019-09-01 10:19:00.000|null    |null    |null    |null    |
|1          |33006      |2019-09-01 10:23:00.000|null    |null    |null    |null    |
|2          |33007      |2019-09-01 10:29:00.000|6       |-1      |NINE    |0       |
+-----------+-----------+-----------------------+--------+--------+--------+--------+

是的,列名的前缀表示colun与哪个数据源相关。我用这个前缀来填充转发值
我在key\u col上加入它们,去掉df1\u key\u col和df2\u key\u col,在结果DF上生成一个general date列:

df = df1.join(df2, ((df1.df1_key_col == df2.df2_key_col)&(df1.df1_date_col == df2.df2_date_col)), "full")
df = df.withColumn("key_col", when((df.df1_key_col.isNull())&(df.df2_key_col.isNotNull()), df.df2_key_col).otherwise(df.df1_key_col))
df = df.drop("df1_key_col").drop("df2_key_col")
df = df.withColumn("general_date", when(df.df1_date_col.isNotNull(), df.df1_date_col).otherwise(df.df2_date_col))

基于“常规日期”列的排序输出为:

+-----------+-----------------------+--------+--------+--------+--------+-----------+-----------------------+--------+--------+--------+--------+-------+-----------------------+
|df1_lsn_col|df1_date_col           |df1_col1|df1_col2|df1_col3|df1_col4|df2_lsn_col|df2_date_col           |df2_col5|df2_col6|df2_col7|df2_col8|key_col|general_date           |
+-----------+-----------------------+--------+--------+--------+--------+-----------+-----------------------+--------+--------+--------+--------+-------+-----------------------+
|001        |2019-09-01 10:01:00.000|null    |null    |-1      |TEN     |null       |null                   |null    |null    |null    |null    |1      |2019-09-01 10:01:00.000|
|null       |null                   |null    |null    |null    |null    |33001      |2019-09-01 10:02:00.000|null    |null    |-1      |TEN     |1      |2019-09-01 10:02:00.000|
|null       |null                   |null    |null    |null    |null    |33002      |2019-09-01 10:03:00.000|4       |-1      |TEN     |null    |1      |2019-09-01 10:03:00.000|
|004        |2019-09-01 10:11:00.000|4       |-1      |TEN     |0       |null       |null                   |null    |null    |null    |null    |1      |2019-09-01 10:11:00.000|
|null       |null                   |null    |null    |null    |null    |33005      |2019-09-01 10:19:00.000|null    |null    |null    |null    |1      |2019-09-01 10:19:00.000|
|006        |2019-09-01 10:22:00.000|null    |null    |null    |null    |null       |null                   |null    |null    |null    |null    |1      |2019-09-01 10:22:00.000|
|007        |2019-09-01 10:23:00.000|6       |-1      |NINE    |0       |33006      |2019-09-01 10:23:00.000|null    |null    |null    |null    |1      |2019-09-01 10:23:00.000|
|null       |null                   |null    |null    |null    |null    |33003      |2019-09-01 10:04:00.000|null    |-1      |TEN     |0       |2      |2019-09-01 10:04:00.000|
|002        |2019-09-01 10:05:00.000|4       |-1      |TEN     |null    |null       |null                   |null    |null    |null    |null    |2      |2019-09-01 10:05:00.000|
|003        |2019-09-01 10:10:00.000|null    |-1      |TEN     |0       |null       |null                   |null    |null    |null    |null    |2      |2019-09-01 10:10:00.000|
|null       |null                   |null    |null    |null    |null    |33004      |2019-09-01 10:15:00.000|4       |-1      |TEN     |0       |2      |2019-09-01 10:15:00.000|
|005        |2019-09-01 10:18:00.000|null    |null    |null    |null    |null       |null                   |null    |null    |null    |null    |2      |2019-09-01 10:18:00.000|
|null       |null                   |null    |null    |null    |null    |33007      |2019-09-01 10:29:00.000|6       |-1      |NINE    |0       |2      |2019-09-01 10:29:00.000|
+-----------+-----------------------+--------+--------+--------+--------+-----------+-----------------------+--------+--------+--------+--------+-------+-----------------------+

现在,这个DF已经准备好填充,并随着时间的推移转发同一个键的值。 重要的一点是,我加入的每个DF都有一个lsn列(来自sqlservercdc表)。例如,如果df1\ulsn\col为null,则意味着在该记录中,我需要填充与df1相关的前向列。
我对“向前填充”所做的操作包括以下步骤:

1.用伪值替换记录中的实际空值
2.填充前向值以用最新的有效值替换空值
3.将伪值替换回null

注意:通过替换伪值,我保留了真正的空值,只保留了没有值的单元格
基于DF上的前缀,为每个DS分别定义列。假设我想对df1列进行FF,我将执行以下步骤:

lsn_col = "df1_lsn_col"
columns = [col for col in df.columns if col.startswith("df1_")]
for col in columns:

    stringRealNullReplace = "__NA__"

    df= df.withColumn(col, when( (df[lsn_col].isNotNull() & df[col].isNull() ), stringRealNullReplace).otherwise(df[col]))

    window = Window.partitionBy("key_col").orderBy("general_date").rowsBetween(-sys.maxsize, 0)
    filled_column = func.last(df[col], ignorenulls=True).over(window)
    df= df.withColumn(col, when(df[col].isNull(), filled_column).otherwise(df[col]))

    df= df.withColumn(col, when( (df[col].isNotNull()&(df[col] == stringRealNullReplace) ), None).otherwise(df[col]))

这工作得很好,但当我对所有列运行它时,它会引发StackOverflowerr异常


Tags: keynonedfdatecolnulldf1when