如何阻止回傳/產出中的多餘重複,同時仍保留給定鍵值對的累加總?

2024-09-30 06:13:59 发布

您现在位置:Python中文网/ 问答频道 /正文

在将Pcollection传递给下一个变换之后,变换的返回/产量将成倍增加,而对于给定的街道和事故计数,我只需要一个KV对。你知道吗

我的理解是,生成器可以通过保存值来帮助实现这一点,但这只能解决我的部分问题。我尝试在发送到下一个转换之前确定大小,但还没有找到任何方法可以提供所传递的Pcollection元素的真实大小。你知道吗

class CountAccidents(beam.DoFn):
    acci_dict = {}

    def process(self, element):
        if self.acci_dict.__contains__(element[0]['STREET_NAME']):
            self.acci_dict[element[0]['STREET_NAME']] += 1
        else:
            self.acci_dict.update({element[0]['STREET_NAME']: 1})
        if self.acci_dict != {}:
            yield self.acci_dict


def run():
    with beam.Pipeline() as pl:
        test = (pl | 'Read' >> beam.io.ReadFromText('/modified_Excel_Crashes_Chicago.csv')
                   | 'Map Accident' >> beam.ParDo(AccidentstoDict())
                   | 'Count Accidents' >> beam.ParDo(CountAccidents())
                   | 'Print to Text' >> beam.io.WriteToText('/letstestthis', file_name_suffix='.txt'))```                                                      

Input Pcollection:
[{'CRASH_DATE': '3/25/19 0:25', 'WEATHER_CONDITION': 'CLEAR', 'STREET_NAME': 'KOSTNER AVE', 'CRASH_HOUR': '0'}]
[{'CRASH_DATE': '3/24/19 23:40', 'WEATHER_CONDITION': 'CLEAR', 'STREET_NAME': 'ARCHER AVE', 'CRASH_HOUR': '23'}]
[{'CRASH_DATE': '3/24/19 23:30', 'WEATHER_CONDITION': 'UNKNOWN', 'STREET_NAME': 'VAN BUREN ST', 'CRASH_HOUR': '23'}]

I expect to get this: 
{'KILPATRICK AVE': 1, 'MILWAUKEE AVE': 1, 'CENTRAL AVE': 2, 'WESTERN AVE': 6, 'DANTE AVE': 1}

What I get is this(a slow build-up till complete): 
{'KOSTNER AVE': 1}
{'KOSTNER AVE': 1, 'ARCHER AVE': 1}
{'KOSTNER AVE': 2, 'ARCHER AVE': 2, 'VAN BUREN ST': 1}

Tags: nameselfstreetdateelementcrashconditiondict
1条回答
网友
1楼 · 发布于 2024-09-30 06:13:59

您需要对每个键进行联合收割机,对于计数,您可以使用此处的一个:

https://beam.apache.org/releases/pydoc/2.9.0/apache_beam.transforms.combiners.html

在读操作之后,输出一个KeyValue,它是{STREET,1},后跟一个Count per key transform,它将给出STREET的全局计数。你知道吗

例如,如果您希望每周的输出,也可以很容易地从那里添加窗口功能。您只需要将时间戳和窗口添加到调用中。如何做到这一点的示例如下:

In a batch pipeline how do I assign timestamps to data from the batch sources for example csv files in a Beam pipeline

相关问题 更多 >

    热门问题