我的spark数据帧的输出有问题。该文件的范围从几GB到50 GB以上
SparkDF = spark.read.format("csv").options(header="true", delimiter="|", maxColumns="100000").load(my_file.csv)
这给了我正确的DF,我想要的。但根据要求,我需要将列名和集合中与该键相关的所有值作为键
例如:
df = {'col1': ['1', '2', '3', '4'], 'col2': ['Jean', 'Cecil', 'Annie', 'Maurice'], 'col3': ['test', 'aaa', 'bbb', 'ccc','ddd']}
df = pd.DataFrame(data=d)
最后应该给我:
{'col1': {'1', '2', '3', '4'},'col2': {'Jean', 'Cecil', 'Annie', 'Maurice'},'col3': {'test', 'aaa', 'bbb', 'ccc','ddd'}
我已经实现了以下功能:
def columnDict(dataFrame):
colDict = dict(zip(dataFrame.schema.names, zip(*dataFrame.collect())))
return colDict if colDict else dict.fromkeys(dataFrame.schema.names, ())
然而,它返回了一个dict,其值是元组,而不是我所需要的集合
我想将字典中的元组转换成一个集合,或者直接将字典作为集合作为函数的输出
编辑:
有关完整要求:
除了上面提到的字典之外,还有一个包含类似数据的字典,用于检查
意味着我加载到spark DF并转换为字典的文件包含必须对照另一个字典检查的数据
目标是对照检查字典检查我的dict(加载的文件)中的每个键,首先查看它们是否存在,然后检查它们是否存在以检查键的值是否是检查值的子集
如果我在数据帧中加载检查数据,它将如下所示:(注意,我可能无法更改它是dict的事实,我将查看是否可以从dict修改为spark df)
df = {'KeyName': ['col1', 'col2', 'col3'], 'ValueName': ['1, 2, 3, 4', 'Jean, Cecil, Annie, Maurice, Annie, Maurice', 'test, aaa, bbb, ccc,ddd,eee']}
df = pd.DataFrame(data=df)
print(df)
KeyName ValueName
0 col1 1, 2, 3, 4
1 col2 Jean, Cecil, Annie, Maurice, Annie, Maurice
2 col3 test, aaa, bbb, ccc,ddd,eee
因此,最后,我的文件中的数据应该是一行的子集,该行的KeyName与我的dict相同
我有点被遗留代码卡住了,我有点难以将其迁移到spark databricks
编辑2: 希望这能奏效。我上传了2个带有修改数据的文件: https://filebin.net/1rnnvqn2b0ww7qc8
FakeData.csv包含我用上述代码加载的数据,必须是第二个代码的子集。 FakeDataChecker.csv包含实际完整可用集的数据
编辑3: 忘记添加FakeData中的所有空字符串以及FakeDataChecker中的空字符串都不应考虑在内
所以我不确定我是否完全理解了您的用例。但是让我们试一下初稿
据我所知,您有一个包含所有数据的第一个文件。以及一个文件检查器,其中包含需要在data foreach列中包含的密钥。并且应该过滤掉数据中存在的其他键
这可以通过初始数据和数据检查器之间的内部联接来完成。如果数据检查器中没有太多的键,Spark应自动广播数据检查器数据帧以进行优化连接
这是代码的初稿,它还没有完全自动化,等待您的第一个问题和评论
首先,让我们导入所需的函数和数据:
我们根据需要删除空值,您可以使用
subset
关键字指定所需的列然后,让我们准备连接数据帧
我们需要在连接期间为列添加别名,以避免重复的列名。我们在删除列时指定区分大小写的选项,这样就不会删除大写的初始列
最后,我们通过内部连接过滤掉数据检查器中不存在的所有键:
在接下来的步骤中,我们可以通过检索列的不同键名(例如:“日期”、“位置”等)来实现自动化,这样我们就不必在将来复制粘贴代码4次或X次
类似于:
相关问题 更多 >
编程相关推荐