pythonspark实现mapreduce算法来创建(列,值)元组

2024-10-01 15:39:18 发布

您现在位置:Python中文网/ 问答频道 /正文

更新(2017年4月20日): 我使用的是apachespark2.1.0,我将使用Python。在

我已经缩小了问题的范围,希望有更了解Spark的人能回答。我需要从值.csv文件:

值.csv(主要收集数据,非常大):

+--------+---+---+---+---+---+----+
|   ID   | 1 | 2 | 3 | 4 | 9 | 11 |
+--------+---+---+---+---+---+----+
|        |   |   |   |   |   |    |
| abc123 | 1 | 2 | 3 | 1 | 0 | 1  |
|        |   |   |   |   |   |    |
| aewe23 | 4 | 5 | 6 | 1 | 0 | 2  |
|        |   |   |   |   |   |    |
| ad2123 | 7 | 8 | 9 | 1 | 0 | 3  |
+--------+---+---+---+---+---+----+

输出(RDD)

^{pr2}$

我将每个值与该值的列名配对,格式如下:

(column_number, value)

如果您对原始格式感兴趣:

id,1,2,3,4,9,11
abc123,1,2,3,1,0,1
aewe23,4,5,6,1,0,2
ad2123,7,8,9,1,0,3

问题:

这个例子值.csv文件只包含少数列,但实际文件中有数千列。我可以提取标头并将其广播到分布式环境中的每个节点,但我不确定这是否是解决问题的最有效方法。有没有可能用并行化的报头来实现输出?在


Tags: 文件csv数据idnumbervalue格式column
1条回答
网友
1楼 · 发布于 2024-10-01 15:39:18

我认为您也可以使用PySpark Dataframe来实现解决方案。然而,我的解决方案还不是最优的。我使用split来获得新的列名和要执行的相应列sum。这取决于你的key_list有多大。如果太大,这可能行不通,因为您必须在内存中加载key_list(使用collect)。在

import pandas as pd
import pyspark.sql.functions as func

# example data
values = spark.createDataFrame(pd.DataFrame([['abc123', 1, 2, 3, 1, 0, 1],
                                             ['aewe23', 4, 5, 6, 1, 0, 2],
                                             ['ad2123', 7, 8, 9, 1, 0, 3]], 
                                             columns=['id', '1', '2', '3','4','9','11']))
key_list = spark.createDataFrame(pd.DataFrame([['a', '1'],
                                               ['b','2;4'],
                                               ['c','3;9;11']], 
                                              columns=['key','cols']))
# use values = spark.read.csv(path_to_csv, header=True) for your data

key_list_df = key_list.select('key', func.split('cols', ';').alias('col'))
key_list_rdd = key_list_df.rdd.collect()
for row in key_list_rdd:
    values = values.withColumn(row.key, sum(values[c] for c in row.col if c in values.columns))
keys = [row.key for row in key_list_rdd]
output_df = values.select(keys)

输出

^{pr2}$

相关问题 更多 >

    热门问题