如何为Pyspark createDataFrame(rdd,模式)定义模式?

2024-09-30 12:29:14 发布

您现在位置:Python中文网/ 问答频道 /正文

我看了看spark-rdd to dataframe

我将gzipedJSON读入rdd

rdd1 =sc.textFile('s3://cw-milenko-tests/Json_gzips/ticr_calculated_2_2020-05-27T11-59-06.json.gz')

我想把它转换成spark数据帧。链接SO问题的第一种方法无效。这是文件的第一行

{"code_event": "1092406", "code_event_system": "LOTTO", "company_id": "2", "date_event": "2020-05-27 12:00:00.000", "date_event_real": "0001-01-01 00:00:00.000", "ecode_class": "", "ecode_event": "183", "eperiod_event": "", "etl_date": "2020-05-27", "event_no": 1, "group_no": 0, "name_event": "Ungaria Putto - 8/20", "name_event_short": "Ungaria Putto - 8/20", "odd_coefficient": 1, "odd_coefficient_entry": 1, "odd_coefficient_user": 1, "odd_ekey": "11", "odd_name": "11", "odd_status": "", "odd_type": "11", "odd_voidfactor": 0, "odd_win_types": "", "special_bet_value": "", "ticket_id": "899M-E2X93P", "id_update": 8000001036823656, "topic_group": "cwg5", "kafka_key": "899M-E2X93P", "kafka_epoch": 1590580609424, "kafka_partition": 0, "kafka_topic": "tickets-calculated_2"}

如何推断模式

答案是这样的

schema = StructType([StructField(str(i), StringType(), True) for i in range(32)])

为什么是范围(32)


Tags: kafkanonameeventiddategroupcode
2条回答

为了回答您的问题,范围(32)仅指示StrucField类可应用于所需架构的列数。在您的例子中,有30列。 基于您的数据,我能够使用以下逻辑创建数据帧:

from pyspark.sql.functions import *
from pyspark.sql.types import *

data_json = {"code_event": "1092406", "code_event_system": "LOTTO", "company_id": "2", "date_event": "2020-05-27 12:00:00.000",
          "date_event_real": "0001-01-01 00:00:00.000", "ecode_class": "", "ecode_event": "183", "eperiod_event": "",
          "etl_date": "2020-05-27", "event_no": 1, "group_no": 0, "name_event": "Ungaria Putto - 8/20", "name_event_short": "Ungaria Putto - 8/20",
          "odd_coefficient": 1, "odd_coefficient_entry": 1, "odd_coefficient_user": 1, "odd_ekey": "11", "odd_name": "11", "odd_status": "",
          "odd_type": "11", "odd_voidfactor": 0, "odd_win_types": "", "special_bet_value": "", "ticket_id": "899M-E2X93P", "id_update": 8000001036823656,
          "topic_group": "cwg5", "kafka_key": "899M-E2X93P", "kafka_epoch": 1590580609424, "kafka_partition": 0, "kafka_topic": "tickets-calculated_2"}
column_names = [x for x in data_json.keys()]
row_data = [([x for x in data_json.values()])]

input = []
for i in column_names:
  if str(type(data_json[i])).__contains__('str') :
    input.append(StructField(str(i), StringType(), True))
  elif str(type(data_json[i])).__contains__('int') and len(str(data_json[i])) <= 8:
         input.append(StructField(str(i), IntegerType(), True))
  else :
      input.append(StructField(str(i), LongType(), True))
  
schema = StructType(input)
data = spark.createDataFrame(row_data, schema)
data.show()

输出

# +     +        -+     +          +          +     -+     -+      -+     +    +    +          +          +       -+          -+          +    +    +     +    +       +      -+        -+     -+        +     -+     -+      -+       -+          +
# |code_event|code_event_system|company_id|          date_event|     date_event_real|ecode_class|ecode_event|eperiod_event|  etl_date|event_no|group_no|          name_event|    name_event_short|odd_coefficient|odd_coefficient_entry|odd_coefficient_user|odd_ekey|odd_name|odd_status|odd_type|odd_voidfactor|odd_win_types|special_bet_value|  ticket_id|       id_update|topic_group|  kafka_key|  kafka_epoch|kafka_partition|         kafka_topic|
# +     +        -+     +          +          +     -+     -+      -+     +    +    +          +          +       -+          -+          +    +    +     +    +       +      -+        -+     -+        +     -+     -+      -+       -+          +
# |   1092406|            LOTTO|         2|2020-05-27 12:00:...|0001-01-01 00:00:...|           |        183|             |2020-05-27|       1|       0|Ungaria Putto - 8/20|Ungaria Putto - 8/20|              1|                    1|                   1|      11|      11|          |      11|             0|             |                 |899M-E2X93P|8000001036823656|       cwg5|899M-E2X93P|1590580609424|              0|tickets-calculated_2|
# +     +        -+     +          +          +     -+     -+      -+     +    +    +          +          +       -+          -+          +    +    +     +    +       +      -+        -+     -+        +     -+     -+      -+       -+          +

range(32)在该示例中,这只是一个示例-他们正在生成包含32列的模式,每个列都以数字作为名称。如果确实要定义架构,则需要显式定义每一列:

from pyspark.sql.types import *
schema = StructType([
    StructField('code_event', IntegerType(), True),
    StructField('code_event_system', StringType(), True),
    ...
    ])

但更好的方法是避免使用RDDAPI,并使用以下代码直接将文件读入数据帧(请参见documentation):

>>> data = spark.read.json('s3://cw-milenko-tests/Json_gzips/ticr_calculated_2_2020-05-27T11-59-06.json.gz')
>>> data.printSchema()
root
 |  code_event: string (nullable = true)
 |  code_event_system: string (nullable = true)
 |  company_id: string (nullable = true)
 |  date_event: string (nullable = true)
 |  date_event_real: string (nullable = true)
 |  ecode_class: string (nullable = true)
 |  ecode_event: string (nullable = true)
 |  eperiod_event: string (nullable = true)
 |  etl_date: string (nullable = true)
....

相关问题 更多 >

    热门问题