我将gzipedJSON读入rdd
rdd1 =sc.textFile('s3://cw-milenko-tests/Json_gzips/ticr_calculated_2_2020-05-27T11-59-06.json.gz')
我想把它转换成spark数据帧。链接SO问题的第一种方法无效。这是文件的第一行
{"code_event": "1092406", "code_event_system": "LOTTO", "company_id": "2", "date_event": "2020-05-27 12:00:00.000", "date_event_real": "0001-01-01 00:00:00.000", "ecode_class": "", "ecode_event": "183", "eperiod_event": "", "etl_date": "2020-05-27", "event_no": 1, "group_no": 0, "name_event": "Ungaria Putto - 8/20", "name_event_short": "Ungaria Putto - 8/20", "odd_coefficient": 1, "odd_coefficient_entry": 1, "odd_coefficient_user": 1, "odd_ekey": "11", "odd_name": "11", "odd_status": "", "odd_type": "11", "odd_voidfactor": 0, "odd_win_types": "", "special_bet_value": "", "ticket_id": "899M-E2X93P", "id_update": 8000001036823656, "topic_group": "cwg5", "kafka_key": "899M-E2X93P", "kafka_epoch": 1590580609424, "kafka_partition": 0, "kafka_topic": "tickets-calculated_2"}
如何推断模式
答案是这样的
schema = StructType([StructField(str(i), StringType(), True) for i in range(32)])
为什么是范围(32)
为了回答您的问题,范围(32)仅指示StrucField类可应用于所需架构的列数。在您的例子中,有30列。 基于您的数据,我能够使用以下逻辑创建数据帧:
输出
range(32)
在该示例中,这只是一个示例-他们正在生成包含32列的模式,每个列都以数字作为名称。如果确实要定义架构,则需要显式定义每一列:但更好的方法是避免使用RDDAPI,并使用以下代码直接将文件读入数据帧(请参见documentation):
相关问题 更多 >
编程相关推荐