如果有多个工作人员被分配到数据流管道,他们能够访问全局变量吗?

2024-05-19 03:37:44 发布

您现在位置:Python中文网/ 问答频道 /正文

基本上,我使用apachebeampythonsdk在Google云数据流上运行一个管道。 首先,我从cloud pubsub读取一个JSON字符串,并对照全局dictionary变量检查是否已经接收到带有ID的数据。如果这是第一条带有ID的消息,那么我将ID作为键添加到字典中,并将接收到的消息添加为值,否则我不会更改字典。基本上,每次收到新的字典时,我都会给字典加一个键。 接下来,我通过比较新接收到的数据和上次读取的数据来监视数据的变化。在

一旦处理数据流作业的工人数量超过1,使用公共变量是否会导致任何问题?在

我目前编写管道的方式是可行的,但是现在只有一个工人在处理gcp数据流作业。我不确定如果再派一个工人,是否会出现任何问题。在

这里我添加了一个简化版本的代码,但是实际的代码有多个分支检查不同类型的事件。在

dictionary={}
class AddId2Dict(beam.DoFn):
    def process(self,e):
        if(e[0] not in dictionary.keys()):
            dictionary[e[0]]=e[1]
        return((e,))
class ChangeChecker(beam.DoFn):
    def process(self,e):
       if(e[0] in dictionary.keys()):
            if dictionary[e[0]]<e[1]:
                print 'Increase occurred for id:'+str(e[0])|
                dictionary[e[0]]=e[1]
            elif dictionary[e[0]]>e[1]:
                print 'Decrease occurred for id:'+str(e[0])
            else:
                print 'Stayed constant for id:'+str(e[0])

def run():
    p = beam.Pipeline(options=options)
    (
     p
     | 'read from pubsub'<<beam.io.ReadFromPubSub(topic=topic_name).with_output_type(bytes)
     | 'parse json & create tuple' >> beam.Map(lambda e: ((json.loads(x)['id'],int(json.loads(x)['data'])))
     | 'add key to dict if it does not exist' >> beam.ParDo(AddId2Dict())
     | 'check for event' >> beam.ParDo(ChangeChecker())
    )
    result = p.run()
    result.wait_until_finish()

if __name__  ==  '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()


Tags: 数据runidjsonfordictionaryif字典

热门问题