为什么pyspark中两种不同的数据处理方式会产生不同的结果?

2024-09-30 12:15:45 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试从当前数据集创建一个示例数据集。我尝试了两种不同的方法,它们产生了两种不同的结果。以某种方式分开每个采样行应该是整数和字符串([5,unprivate],[1,hiprivate])。第一种方法是为每一行提供string和string([private,private],[unprivate,hiprivate])。第二种方法是给我正确的输出。你知道吗

为什么他们要产生两个完全不同的数据集?你知道吗

数据集

5,unprivate
1,private
2,hiprivate

接收数据

from pyspark import SparkContext

sc = SparkContext()
INPUT = "./dataset"

def parse_line(line):
    bits = line.split(",")
    return bits

df = sc.textFile(INPUT).map(parse_line)

第一路-输出类似 [[u'unprivate', u'unprivate'], [u'unprivate', u'unprivate']]

#1st way
columns = df.first()
new_df = None
for i in range(0, len(columns)):
    column = df.sample(withReplacement=True, fraction=1.0).map(lambda row: row[i]).zipWithIndex().map(lambda e: (e[1], [e[0]]))
    if new_df is None:
        new_df = column
    else:
        new_df = new_df.join(column)
        new_df = new_df.map(lambda e: (e[0], e[1][0] + e[1][1]))
new_df = new_df.map(lambda e: e[1])
print new_df.collect()

第二种方式-输出类似 [(0, [u'5', u'unprivate']), (1, [u'1', u'unprivate']), (2, [u'2', u'private'])]

#2nd way
new_df = df.sample(withReplacement=True, fraction=1.0).map(lambda row: row[0]).zipWithIndex().map(lambda e: (e[1], [e[0]]))
new_df2 = df.sample(withReplacement=True, fraction=1.0).map(lambda row: row[1]).zipWithIndex().map(lambda e: (e[1], [e[0]]))

new_df = new_df.join(new_df2)
new_df = new_df.map(lambda e: (e[0], e[1][0] + e[1][1]))
print new_df.collect()

我试图找出第62页中的unisample函数 http://info.mapr.com/rs/mapr/images/Getting_Started_With_Apache_Spark.pdf


Tags: 数据sample方法lambdatruemapdfnew
1条回答
网友
1楼 · 发布于 2024-09-30 12:15:45

这与Spark如何执行代码有关。在第一个示例中,尝试将此print语句放入代码中:

for i in range(0, len(columns)):
    if new_df:
        print(new_df.take(1))

由于代码是延迟执行的for这样的循环将不起作用,因为Spark实际上只执行最后一个循环。因此,当您为第二列启动for循环时,您已经得到了new_df的值,该值等于第二个for循环的输出。你知道吗

你必须使用你在第二个例子中使用的方法。你知道吗

相关问题 更多 >

    热门问题