我有几个数据源,提供关于相同项(键)的数据。我的任务是加载数据源,将它们连接在一起(都有相同的键列)并填充forward值以得到DF。
我的问题是如何执行此过程而不获取StackOverflowerr异常?
我的代码运行于:
AWS Glue 1.0 job
10 workers, worker type G.1X
Spark 2.4
python 2
我从4个数据源加载数据,每次运行它们总共有大约250列和1 mil记录。
现在让我们举一个例子来解释确切的情况。假设我们只有两个D,每个D有几个列。DF1和DF2的键列如下:
df1 = spark.createDataFrame([ (1, '001', '2019-09-01 10:01:00.000', None, None, '-1', 'TEN'),
(2, '002', '2019-09-01 10:05:00.000', '4', '-1', 'TEN', None),
(2, '003', '2019-09-01 10:10:00.000', None, '-1', 'TEN', '0'),
(1, '004', '2019-09-01 10:11:00.000', '4', '-1', 'TEN', '0'),
(2, '005', '2019-09-01 10:18:00.000', None, None, None, None),
(1, '006', '2019-09-01 10:22:00.000', None, None, None, None),
(1, '007', '2019-09-01 10:23:00.000', '6', '-1', 'NINE', '0')])
df1 = df1.toDF(*['df1_key_col', 'df1_lsn_col', 'df1_date_col', 'df1_col1', 'df1_col2', 'df1_col3', 'df1_col4'])
df1.show(truncate = False)
DF1输出为:
+-----------+-----------+-----------------------+--------+--------+--------+--------+
|df1_key_col|df1_lsn_col|df1_date_col |df1_col1|df1_col2|df1_col3|df1_col4|
+-----------+-----------+-----------------------+--------+--------+--------+--------+
|1 |001 |2019-09-01 10:01:00.000|null |null |-1 |TEN |
|2 |002 |2019-09-01 10:05:00.000|4 |-1 |TEN |null |
|2 |003 |2019-09-01 10:10:00.000|null |-1 |TEN |0 |
|1 |004 |2019-09-01 10:11:00.000|4 |-1 |TEN |0 |
|2 |005 |2019-09-01 10:18:00.000|null |null |null |null |
|1 |006 |2019-09-01 10:22:00.000|null |null |null |null |
|1 |007 |2019-09-01 10:23:00.000|6 |-1 |NINE |0 |
+-----------+-----------+-----------------------+--------+--------+--------+--------+
和DF2:
df2 = spark.createDataFrame([ (1, '33001', '2019-09-01 10:02:00.000', None, None, '-1', 'TEN'),
(1, '33002', '2019-09-01 10:03:00.000', '4', '-1', 'TEN', None),
(2, '33003', '2019-09-01 10:04:00.000', None, '-1', 'TEN', '0'),
(2, '33004', '2019-09-01 10:15:00.000', '4', '-1', 'TEN', '0'),
(1, '33005', '2019-09-01 10:19:00.000', None, None, None, None),
(1, '33006', '2019-09-01 10:23:00.000', None, None, None, None),
(2, '33007', '2019-09-01 10:29:00.000', '6', '-1', 'NINE', '0')])
df2 = df2.toDF(*['df2_key_col', 'df2_lsn_col', 'df2_date_col', 'df2_col5', 'df2_col6', 'df2_col7', 'df2_col8'])
df2.show(truncate = False)
输出为:
+-----------+-----------+-----------------------+--------+--------+--------+--------+
|df2_key_col|df2_lsn_col|df2_date_col |df2_col5|df2_col6|df2_col7|df2_col8|
+-----------+-----------+-----------------------+--------+--------+--------+--------+
|1 |33001 |2019-09-01 10:02:00.000|null |null |-1 |TEN |
|1 |33002 |2019-09-01 10:03:00.000|4 |-1 |TEN |null |
|2 |33003 |2019-09-01 10:04:00.000|null |-1 |TEN |0 |
|2 |33004 |2019-09-01 10:15:00.000|4 |-1 |TEN |0 |
|1 |33005 |2019-09-01 10:19:00.000|null |null |null |null |
|1 |33006 |2019-09-01 10:23:00.000|null |null |null |null |
|2 |33007 |2019-09-01 10:29:00.000|6 |-1 |NINE |0 |
+-----------+-----------+-----------------------+--------+--------+--------+--------+
是的,列名的前缀表示colun与哪个数据源相关。我用这个前缀来填充转发值
我在key\u col上加入它们,去掉df1\u key\u col和df2\u key\u col,在结果DF上生成一个general date列:
df = df1.join(df2, ((df1.df1_key_col == df2.df2_key_col)&(df1.df1_date_col == df2.df2_date_col)), "full")
df = df.withColumn("key_col", when((df.df1_key_col.isNull())&(df.df2_key_col.isNotNull()), df.df2_key_col).otherwise(df.df1_key_col))
df = df.drop("df1_key_col").drop("df2_key_col")
df = df.withColumn("general_date", when(df.df1_date_col.isNotNull(), df.df1_date_col).otherwise(df.df2_date_col))
基于“常规日期”列的排序输出为:
+-----------+-----------------------+--------+--------+--------+--------+-----------+-----------------------+--------+--------+--------+--------+-------+-----------------------+
|df1_lsn_col|df1_date_col |df1_col1|df1_col2|df1_col3|df1_col4|df2_lsn_col|df2_date_col |df2_col5|df2_col6|df2_col7|df2_col8|key_col|general_date |
+-----------+-----------------------+--------+--------+--------+--------+-----------+-----------------------+--------+--------+--------+--------+-------+-----------------------+
|001 |2019-09-01 10:01:00.000|null |null |-1 |TEN |null |null |null |null |null |null |1 |2019-09-01 10:01:00.000|
|null |null |null |null |null |null |33001 |2019-09-01 10:02:00.000|null |null |-1 |TEN |1 |2019-09-01 10:02:00.000|
|null |null |null |null |null |null |33002 |2019-09-01 10:03:00.000|4 |-1 |TEN |null |1 |2019-09-01 10:03:00.000|
|004 |2019-09-01 10:11:00.000|4 |-1 |TEN |0 |null |null |null |null |null |null |1 |2019-09-01 10:11:00.000|
|null |null |null |null |null |null |33005 |2019-09-01 10:19:00.000|null |null |null |null |1 |2019-09-01 10:19:00.000|
|006 |2019-09-01 10:22:00.000|null |null |null |null |null |null |null |null |null |null |1 |2019-09-01 10:22:00.000|
|007 |2019-09-01 10:23:00.000|6 |-1 |NINE |0 |33006 |2019-09-01 10:23:00.000|null |null |null |null |1 |2019-09-01 10:23:00.000|
|null |null |null |null |null |null |33003 |2019-09-01 10:04:00.000|null |-1 |TEN |0 |2 |2019-09-01 10:04:00.000|
|002 |2019-09-01 10:05:00.000|4 |-1 |TEN |null |null |null |null |null |null |null |2 |2019-09-01 10:05:00.000|
|003 |2019-09-01 10:10:00.000|null |-1 |TEN |0 |null |null |null |null |null |null |2 |2019-09-01 10:10:00.000|
|null |null |null |null |null |null |33004 |2019-09-01 10:15:00.000|4 |-1 |TEN |0 |2 |2019-09-01 10:15:00.000|
|005 |2019-09-01 10:18:00.000|null |null |null |null |null |null |null |null |null |null |2 |2019-09-01 10:18:00.000|
|null |null |null |null |null |null |33007 |2019-09-01 10:29:00.000|6 |-1 |NINE |0 |2 |2019-09-01 10:29:00.000|
+-----------+-----------------------+--------+--------+--------+--------+-----------+-----------------------+--------+--------+--------+--------+-------+-----------------------+
现在,这个DF已经准备好填充,并随着时间的推移转发同一个键的值。
重要的一点是,我加入的每个DF都有一个lsn列(来自sqlservercdc表)。例如,如果df1\ulsn\col为null,则意味着在该记录中,我需要填充与df1相关的前向列。
我对“向前填充”所做的操作包括以下步骤:
1.用伪值替换记录中的实际空值
2.填充前向值以用最新的有效值替换空值
3.将伪值替换回null
注意:通过替换伪值,我保留了真正的空值,只保留了没有值的单元格
基于DF上的前缀,为每个DS分别定义列。假设我想对df1列进行FF,我将执行以下步骤:
lsn_col = "df1_lsn_col"
columns = [col for col in df.columns if col.startswith("df1_")]
for col in columns:
stringRealNullReplace = "__NA__"
df= df.withColumn(col, when( (df[lsn_col].isNotNull() & df[col].isNull() ), stringRealNullReplace).otherwise(df[col]))
window = Window.partitionBy("key_col").orderBy("general_date").rowsBetween(-sys.maxsize, 0)
filled_column = func.last(df[col], ignorenulls=True).over(window)
df= df.withColumn(col, when(df[col].isNull(), filled_column).otherwise(df[col]))
df= df.withColumn(col, when( (df[col].isNotNull()&(df[col] == stringRealNullReplace) ), None).otherwise(df[col]))
这工作得很好,但当我对所有列运行它时,它会引发StackOverflowerr异常
目前没有回答
相关问题 更多 >
编程相关推荐