尝试从操作或转换广播RDD或引用RDD

2024-10-02 18:26:24 发布

您现在位置:Python中文网/ 问答频道 /正文

我想用Python广播一个hashmap,我想用它来查找worker节点。在

class datatransform:

   # Constructor

    def __init__(self, lookupFileName, dataFileName):
        self.lookupFileName = lookupFileName
        self.dataFileName = dataFileName
        self.hamp = {}
        self.broadcastVar = None;

    # Read lookup file from the filesystem and create a local hashmap
    # first and then create a broadcast variable.

    def create_dictionary(self):
        lookup_read = sc.textFile(self.lookupFileName)
        self.lookup_parsed = (lookup_read
            .map(lambda line: [line.split('\t')[0], line.split('\t')[1]]))
        self.broadcastVar = sc.broadcast(self.lookup_parsed)

    # This function will map the given id to a new index using the broadcasted hashmap.

    def featurize(self) :
        data_projected = sqlContext.sql("SELECT uid, prod_id FROM userprods ")
        data = data_projected.map(lambda p: [p.uid, p.prod_id])
        bcastmap = self.broadcastVar
        data_featurized = (data_projected
            .map(lambda p: [p.uid, bcastmap.value[p.prod_id]]))

datatransform = datatransform ('/path/to/lookupfile', '/path/to/datafile')

datatransform.create_dictionary()

datatransform.read_data()

我收到以下错误消息:

Error message: It appears that you are attempting to broadcast an RDD or reference an RDD from an " Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not


Tags: thetoselfanidmapdatadef
1条回答
网友
1楼 · 发布于 2024-10-02 18:26:24

如果你想广播一本词典,你应该先收集。这意味着create_dictionary应该看起来像这样

def create_dictionary(self):
    lookup_read = sc.textFile(self.lookupFileName)
    lookup_parsed = (lookup_read
        .map(lambda line: [line.split('\t')[0], line.split('\t')[1]]))
    self.broadcastVar = sc.broadcast(lookup_parsed.collectAsMap())

相关问题 更多 >