Pandas语句的PySpark等价物

2024-10-01 09:33:02 发布

您现在位置:Python中文网/ 问答频道 /正文

我有个问题。我有一个正在清理的Spark数据框,在熊猫中,我通常会使用:

df['cases_since_prev_day'] = df.groupby(['county','state'])['cases'].diff().fillna(0).astype(int)

我是否可以在PySpark中使用与聚合相同的语句

EDIT:因此,原始数据集来自Github上的这个表-https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv

快速浏览如下所示:

date,county,state,fips,cases,deaths
2020-01-21,Snohomish,Washington,53061,1,0
2020-01-22,Snohomish,Washington,53061,1,0
2020-01-23,Snohomish,Washington,53061,1,0
2020-01-24,Cook,Illinois,17031,1,0
2020-01-24,Snohomish,Washington,53061,1,0
2020-01-25,Orange,California,06059,1,0
2020-01-25,Cook,Illinois,17031,1,0
2020-01-25,Snohomish,Washington,53061,1,0

我以前曾使用pandas清理数据集,并以以下形式生成:

date,county,state,fips,cases,deaths,ISO3166_1,ISO3166_2,cases_since_prev_day,deaths_since_prev_day,Last_Update_Date,Last_Reported_Flag
2020-03-19,Abbeville,South Carolina,45001,1,0,US,SC,0,0,2020-10-21 22:34:14.644190,False

我想用PySpark实现同样的功能。到目前为止,我已经为代码和输出提供了:

#Data from The New York Times, based on reports from state and local health agencies
import pandas as pd
import datetime
import pycountry
import numpy as np
import sys
sys.path.append('../utilities')

from utility_setup import create_spark_session, read_s3_to_dataframes
from pyspark.sql.functions import col, create_map, lit
from pyspark.sql.functions import udf
from write_to_s3 import _write_dataframe_to_csv

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window

#Dataframe - read CSV
bucket = 'covid19datalakesafford'
key = 'us-counties.csv'
spark = create_spark_session('COVID-19 NYT - county cases')
df = read_s3_to_dataframes(spark, bucket, key)
df = df.withColumn("fips",df["fips"].cast('string')).withColumn("ISO3166_1",lit("US"))
#df.state.show(2)

subdivisions = {k.name: k.code.replace("US-", "") for k in pycountry.subdivisions.get(country_code="US")}
mapping_func = lambda x: subdivisions.get(x) 
df = df.withColumn('ISO3166_2',udf(mapping_func)("state"))

#df_2["ISO3166_2"] = df_2["state"].apply(lambda x: subdivisions.get(x))



#Old way using Python alone
#df = pd.read_csv("https://raw.githubusercontent.com/nytimes/covid-19-data/master/us-counties.csv", \
#     dtype={'fips': str})

#df = df.withColumnRenamed("ISO3166_1","US") \
#    .withColumnRenamed("ISO3166_2","state")
#df.show(2)

#subdivision_udf = udf(lambda x: subdivisions.get(x))

#function for applying dictionary terms for subdivisions to column

df = df.sort('county', 'date', 'ISO3166_1', 'ISO3166_2')
df.show(2)

#Equivalent in PySpark for lines below?

#df['cases_since_prev_day'] = df.groupby(['county','state'])['cases'].diff().fillna(0).astype(int)
#df["Last_Update_Date"] = datetime.datetime.utcnow()
#df['Last_Reported_Flag'] = df['date'] == df['date'].max()

当前表(前两行):

+----------+---------+--------------+-----+-----+------+---------+---------+
|      date|   county|         state| fips|cases|deaths|ISO3166_1|ISO3166_2|
+----------+---------+--------------+-----+-----+------+---------+---------+
|2020-03-19|Abbeville|South Carolina|45001|    1|     0|       US|       SC|
|2020-03-20|Abbeville|South Carolina|45001|    1|     0|       US|       SC|
+----------+---------+--------------+-----+-----+------+---------+---------+

编辑2:请注意,这是一个时间序列,我预计新冠病毒-19病例的列表每天都在增长,因为每个州的每个县/地区每天都报告新冠病毒-19病例。目前,我的表中的行数接近950000行,使用Pandas时速度很慢(完成需要9分钟)


Tags: csvfromimportdfdatepysparkusstate
1条回答
网友
1楼 · 发布于 2024-10-01 09:33:02

这应该可以让你(几乎)一直做到这一点(我没有你的映射函数,所以不能得到两个字母的状态缩写):

from pyspark.sql.window import Window
from pyspark.sql import functions as F

# county-level windowing - like in SQL
win = Window().partitionBy('county', 'state').orderBy('date')

# lags and final date
df = df.withColumn('cases_since_prev_day', F.lag('cases').over(win))
df = df.withColumn('deaths_since_prev_day', F.lag('deaths').over(win))
df = df.withColumn('Last_Update_Date', F.last('date').over(win))

相关问题 更多 >