如何将任务分发到gcp中的所有工作节点?我正在使用Pypark

2024-09-30 01:35:33 发布

您现在位置:Python中文网/ 问答频道 /正文

我已经在gcpdataproc中创建了一个集群,其中包含1个主节点(clus-m)和两个工作节点(clus-w-0,clus-w-1)。 现在使用pyspark rdd,我想分发一个任务,以便所有节点都参与其中。 下面是我的代码片段

def pair_dist(row):
    dissimlarity = 0
    Z = row[0].split(',')
    X = row[1].split(',')

    for j in range(len(Z)):
        if Z[j] != X[j]:
            dissimlarity += 1

    return str(dissimlarity) + **os.uname()[1]**

sc = SparkContext.getOrCreate()
rdd = sc.textFile( "input.csv" )

rdd = sc.parallelize(rdd.take(5))
rdd = rdd.cartesian(rdd)
dist = rdd.map(lambda x: pair_dist(x)).collect()
dist = np.array(dist).reshape((5,5))
print(dist)

sc.stop()

为了检查它是否正确发生,我将主机名与结果放在一起。 但是我总是在结果中得到主机名clus-m,而不是工作节点的主机名

输出: [0clus-m 2clus-m。。。。。。 1clus-m 0clus-m……]5x5

请建议我到底需要做什么


Tags: 代码节点distdef集群pysparkrow主机名
1条回答
网友
1楼 · 发布于 2024-09-30 01:35:33

要分配工作,必须对输入数据集进行切分。由于您使用的是sc.textFile( "input.csv" ),因此将有一个映射器读取该文件

例如,如果输入数据集通过转换大量增加,则可以RDD.repartition使后续操作更好地并行化

您最好将输入拆分为多个文件

Spark programming guide有以下几点与您的问题相关:

All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").

The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.

相关问题 更多 >

    热门问题