使用pyspark从JSON DF数组中删除选择性JSON

2024-09-30 18:26:03 发布

您现在位置:Python中文网/ 问答频道 /正文

我想从json数组中删除多个json,我有一个json,其源代码格式如下所示。 我有一个列表,其中包含需要保存在json数组中的设备id列表,其余的需要删除。 例如,如我的源json中所示,我有3dev_id 100010100 , 200020200 and 300030300

我有python列表device_id_list=[200020200,300030300],我的最终JSON数组在JSON数组中应该只包含2个JSON,dev_id=100010100的JSON将被删除,如输出JSON中所示

我尝试了一个可能不是最优的选项,我的方法是将json读取为字符串,而不是json,如下所示

df = spark.read.text("path\\iot-sensor.json")
df:pyspark.sql.dataframe.DataFrame
value:string

我已经编写了一个udf来删除device_id_list中不存在的json。它将删除不存在的dev_id,并将json作为字符串返回

我希望将这个字符串,即dataframe df2转换为JSON,使用相同的源JSON模式(df2:pyspark.sql.dataframe.DataFrame = [iot_station: array] (Sorce Schema) ),因为源JSON和输出JSON的模式应该相同,并且如果有更好的解决方案,请共享

UDF:

def drop_dev_id(jsonResponse,dict_keys):
    try:
        data = json.loads(jsonResponse)
        i = 0
        n = len(data['iot_station'])
        while (i < n):
            if data['iot_station'][i]["dev_id"] not in dict_keys:
                data['iot_station'].pop(i)       
                n -= 1
            else:
                i += 1
        return data
           
    except Exception as e:
        print('Exception --> ' + str(e))


def drop_dev_id_udf(dict_keys):
     return udf(lambda row: drop_dev_id(row,dict_keys), StringType())
     
df2 = df.select('value',drop_dev_id_udf(dict_keys)('value')).select('<lambda>(value)')
df2:pyspark.sql.dataframe.DataFrame
<lambda>(value):string

源JSON

{
  "iot_station": [
    {
      "dev_id": 100010100,
      "device1": dev_val1,
      "device2": "dev_val2",
      "device3": dev_val3,
      "device4": "dev_val4",
      "stationid": [
        {
          "id": id_val,
          "idrs": idrs_val,
          "idrq": "idrq_val",
          "idrx": "idrx_val"
        }
      ],
      "geospat": {
          "id": id_val,
          "idrs": idrs_val,
          "idrq": "idrq_val",
          "idrx": "idrx_val"
      }
    },
    {
      "dev_id": 200020200,      
      "device1": dev_val1,
      "device2": "dev_val2",
      "device3": dev_val3,
      "device4": "dev_val4",
      "stationid": [
        {
          "id": id_val,
          "idrs": idrs_val,
          "idrq": "idrq_val",
          "idrx": "idrx_val"
        }
      ],
      "geospat": {
          "id": id_val,
          "idrs": idrs_val,
          "idrq": "idrq_val",
          "idrx": "idrx_val"
    }
    },
    {
      "dev_id": 300030300,      
      "device1": dev_val1,
      "device2": "dev_val2",
      "device3": dev_val3,
      "device4": "dev_val4",
      "stationid": [
        {
          "id": id_val,
          "idrs": idrs_val,
          "idrq": "idrq_val",
          "idrx": "idrx_val"
        }
      ],
      "geospat": {
          "id": id_val,
          "idrs": idrs_val,
          "idrq": "idrq_val",
          "idrx": "idrx_val"
    }
    }
    ]
}

输出JSON:

{
  "iot_station": [
    {
      "dev_id": 200020200,      
      "device1": dev_val1,
      "device2": "dev_val2",
      "device3": dev_val3,
      "device4": "dev_val4",
      "stationid": [
        {
          "id": id_val,
          "idrs": idrs_val,
          "idrq": "idrq_val",
          "idrx": "idrx_val"
        }
      ],
      "geospat": {
          "id": id_val,
          "idrs": idrs_val,
          "idrq": "idrq_val",
          "idrx": "idrx_val"
    }
    },
    {
      "dev_id": 300030300,      
      "device1": dev_val1,
      "device2": "dev_val2",
      "device3": dev_val3,
      "device4": "dev_val4",
      "stationid": [
        {
          "id": id_val,
          "idrs": idrs_val,
          "idrq": "idrq_val",
          "idrx": "idrx_val"
        }
      ],
      "geospat": {
          "id": id_val,
          "idrs": idrs_val,
          "idrq": "idrq_val",
          "idrx": "idrx_val"
    }
    }
    ]
}

Tags: devidjsondatavaluevalkeysiot
1条回答
网友
1楼 · 发布于 2024-09-30 18:26:03

你不需要UDF来实现你想要在这里实现的目标。只需将其作为普通JSON而不是文本加载,并使用^{}函数过滤数组列iot_station

from pyspark.sql import functions as F


df = spark.read.json("path/iot-sensor.json", multiLine=True)

device_id_list = [str(i) for i in [200020200, 300030300]]

df1 = df.withColumn(
    "iot_station",
    F.expr(f"""
        filter(
            iot_station, 
            x -> x.dev_id in ({','.join(device_id_list)})
        )
    """)
)

# check filtered json
df1.select(F.col("iot_station").getItem("dev_id").alias("dev_id")).show(truncate=False)

#+           +
#|dev_id                |
#+           +
#|[200020200, 300030300]|
#+           +

相关问题 更多 >