通过 Spark Python 到 Pandas 的时间戳往返

2024-09-27 20:17:46 发布

您现在位置:Python中文网/ 问答频道 /正文

如何将时间戳数据从Spark Python来回转换为Pandas?我从Spark的Hive表中读取数据,想在Pandas中进行一些计算,然后将结果写回Hive。只有最后一部分失败了,将Pandas时间戳转换回Spark数据帧时间戳。在

import datetime
import pandas as pd

dates = [
    ('today', '2017-03-03 11:30:00')
  , ('tomorrow', '2017-03-04 08:00:00')
  , ('next Thursday', '2017-03-09 20:00:00')
]
string_date_rdd = sc.parallelize(dates)
timestamp_date_rdd = string_date_rdd.map(lambda t: (t[0], datetime.datetime.strptime(t[1], "%Y-%m-%d %H:%M:%S')))
timestamp_df = sqlContext.createDataFrame(timestamp_date_rdd, ['Day', 'Date'])
timestamp_pandas_df = timestamp_df.toPandas()
roundtrip_df = sqlContext.createDataFrame(timestamp_pandas_df)
roundtrip_df.printSchema()
roundtrip_df.show()

root
 |-- Day: string (nullable = true)
 |-- Date: long (nullable = true)

+-------------+-------------------+
|          Day|               Date|
+-------------+-------------------+
|        today|1488540600000000000|
|     tomorrow|1488614400000000000|
|next Thursday|1489089600000000000|
+-------------+-------------------+

此时,往返Spark DataFrame的日期列为datatype long。在Pyspark中,这可以很容易地转换回datetime对象,例如。,datetime.datetime.fromtimestamp(148908960000000000/1000000000),尽管一天中的时间缩短了几个小时。这个数据帧的数据类型如何转换?在

Python 3.4.5,Spark 1.6.0

谢谢, 约翰


Tags: 数据pandasdfdatetimedatestring时间timestamp
2条回答

将datetime64列转换为python datetime对象对我很有用。在

from pandas import Series

def convert_to_python_datetime(df):
    df_copy = df.copy()
    for column_name, column in df_copy.iteritems():
        if column.dtype.kind == 'M':
            df_copy[column_name] = Series(column.dt.to_pydatetime(), dtype=object)
    return df_copy


tmp = convert_to_python_datetime(timestamp_pandas_df)
roundtrip_df = sqlContext.createDataFrame(tmp)
roundtrip_df.printSchema()
roundtrip_df.show()

输出:

^{pr2}$

我找到了一个解决方案:

from pyspark.sql.types import TimestampType
extra_column_df = roundtrip_df.select(roundtrip_df.Day, roundtrip_df.Date).withColumn('new_date', roundtrip_df.Date / 1000000000)
roundtrip_timestamp_df = extra_column_df.select(extra_column_df.Day, extra_column_df.new_date.cast(TimestampType()).alias('Date')

输出:

^{pr2}$

作为一个附加的bug或特性,这似乎将所有日期转换为UTC,包括DST感知。在

相关问题 更多 >

    热门问题