如何将数据转换为所需格式并写入Python+Apache Beam文件

2024-10-02 12:23:29 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个.ndjson文件,如下所示:

{"property_id": "107", "transaction_unique_id": "{C3C3F9B5-FB9E-362B-E053-6B04A8C03ACC}", "price": 80000, "date_of_transfer": "2021-05-07 00:00", "postcode": "BL2 2GY", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "14", "SAON": "", "street": "RIVER VIEW COURT", "locality": "", "town_city": "BOLTON", "district": "BOLTON", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "108", "transaction_unique_id": "{C3C3F9B5-FB9F-362B-E053-6B04A8C03ACC}", "price": 330000, "date_of_transfer": "2021-02-26 00:00", "postcode": "SK6 4AN", "property_type": "S", "old_new": "N", "duration": "F", "PAON": "18", "SAON": "", "street": "GUYWOOD LANE", "locality": "ROMILEY", "town_city": "STOCKPORT", "district": "STOCKPORT", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transaction_unique_id": "{C3C3F9B5-FBA0-362B-E053-6B04A8C03ACC}", "price": 215000, "date_of_transfer": "2021-02-19 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 022", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transaction_unique_id": "{C3C3F9B5-FBD3-362B-E053-6B04A8C03ACC}", "price": 226500, "date_of_transfer": "2021-02-08 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 727", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transaction_unique_id": "{C3C3F9B5-FBF8-362B-E053-6B04A8C03ACC}", "price": 262000, "date_of_transfer": "2021-05-14 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 025", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
...

我使用ApacheBeam读取数据并按property_id对数据进行分组,然后将输出写入json文件,但数据如下所示:

('107', [PPD(property_id='107', transaction_unique_id='{C3C3F9B5-FB9E-362B-E053-6B04A8C03ACC}', price=80000, date_of_transfer='2021-05-07 00:00', postcode='BL2 2GY', property_type='F', old_new='N', duration='L', PAON='14', SAON='', street='RIVER VIEW COURT', locality='', town_city='BOLTON', district='BOLTON', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A')])
('108', [PPD(property_id='108', transaction_unique_id='{C3C3F9B5-FB9F-362B-E053-6B04A8C03ACC}', price=330000, date_of_transfer='2021-02-26 00:00', postcode='SK6 4AN', property_type='S', old_new='N', duration='F', PAON='18', SAON='', street='GUYWOOD LANE', locality='ROMILEY', town_city='STOCKPORT', district='STOCKPORT', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A')])
('109', [PPD(property_id='109', transaction_unique_id='{C3C3F9B5-FBA0-362B-E053-6B04A8C03ACC}', price=215000, date_of_transfer='2021-02-19 00:00', postcode='M1 2BL', property_type='F', old_new='N', duration='L', PAON='40', SAON='APARTMENT 022', street='HILTON STREET', locality='', town_city='MANCHESTER', district='MANCHESTER', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A'), PPD(property_id='109', transaction_unique_id='{C3C3F9B5-FBD3-362B-E053-6B04A8C03ACC}', price=226500, date_of_transfer='2021-02-08 00:00', postcode='M1 2BL', property_type='F', old_new='N', duration='L', PAON='40', SAON='APARTMENT 727', street='HILTON STREET', locality='', town_city='MANCHESTER', district='MANCHESTER', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A'), PPD(property_id='109', transaction_unique_id='{C3C3F9B5-FBF8-362B-E053-6B04A8C03ACC}', price=262000, date_of_transfer='2021-05-14 00:00', postcode='M1 2BL', property_type='F', old_new='N', duration='L', PAON='40', SAON='APARTMENT 025', street='HILTON STREET', locality='', town_city='MANCHESTER', district='MANCHESTER', country='GREATER MANCHESTER', PPD_category_type='A', record_status='A')])
...

我们可以看到,对于property_id = '109',它将三条记录分组,但上面的输出格式真的很奇怪……有什么知道为什么会这样吗?我如何将其转换为换行符分隔的JSON格式,然后写入JSON文件

预期的格式类似于(不确定这是否是有效的换行符分隔的json格式,但想法是在数组中包含相同property_id(例如109)的事务):

{"property_id": "107", "transaction_unique_id": "{C3C3F9B5-FB9E-362B-E053-6B04A8C03ACC}", "price": 80000, "date_of_transfer": "2021-05-07 00:00", "postcode": "BL2 2GY", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "14", "SAON": "", "street": "RIVER VIEW COURT", "locality": "", "town_city": "BOLTON", "district": "BOLTON", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "108", "transaction_unique_id": "{C3C3F9B5-FB9F-362B-E053-6B04A8C03ACC}", "price": 330000, "date_of_transfer": "2021-02-26 00:00", "postcode": "SK6 4AN", "property_type": "S", "old_new": "N", "duration": "F", "PAON": "18", "SAON": "", "street": "GUYWOOD LANE", "locality": "ROMILEY", "town_city": "STOCKPORT", "district": "STOCKPORT", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}
{"property_id": "109", "transactions": [{"transaction_unique_id": "{C3C3F9B5-FBA0-362B-E053-6B04A8C03ACC}", "price": 215000, "date_of_transfer": "2021-02-19 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 022", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"},{"transaction_unique_id": "{C3C3F9B5-FBD3-362B-E053-6B04A8C03ACC}", "price": 226500, "date_of_transfer": "2021-02-08 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 727", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"},{"transaction_unique_id": "{C3C3F9B5-FBF8-362B-E053-6B04A8C03ACC}", "price": 262000, "date_of_transfer": "2021-05-14 00:00", "postcode": "M1 2BL", "property_type": "F", "old_new": "N", "duration": "L", "PAON": "40", "SAON": "APARTMENT 025", "street": "HILTON STREET", "locality": "", "town_city": "MANCHESTER", "district": "MANCHESTER", "country": "GREATER MANCHESTER", "PPD_category_type": "A", "record_status": "A"}]}
...

有人能帮忙吗,我对梁很陌生,任何帮助都将不胜感激。谢谢


Tags: ofiddatetypepropertypriceoldtransfer
1条回答
网友
1楼 · 发布于 2024-10-02 12:23:29

我假设PPD是一个命名的元组,您将获取一个PPD对象的PCollection并将它们像

grouped = (
    ppd_pcoll
    | beam.Map(lambda ppd: (ppd.property_id, property_id)
    | beam.GroupByKey())

现在grouped是一个2元组的PCollection,其中第一个是属性id字符串,第二个是PPD的iterable(具有该属性id)

为了得到您想要的,您需要将它映射到所需的字典,以json的形式输出它,例如

to_write_to_json = (grouped
 | beam.MapTuple(lambda property_id, ppds: {
      'property_id': property_id,
      'transactions': [ppd_to_transaction(ppd) for ppd in ppds],
     })

其中ppd_to_transaction是一个函数,它接受PPD对象并返回具有所需事务属性的dict

相关问题 更多 >

    热门问题