窗口化和聚合Pypark DataFram

2024-09-27 07:28:20 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试处理来自套接字的传入事件,然后打开窗口并聚合事件数据。我把窗户关上了。看起来,即使我为DataFrame指定了一个模式,它也不会转换为列。你知道吗

import sys
from pyspark.sql.types import StructType, StringType, TimestampType, FloatType, IntegerType, StructField

from pyspark.sql import SparkSession
import pyspark.sql.functions as F


if __name__ == "__main__":
    # our data currently looks like this (tab separated).
    # -SYMBOL   DATE            PRICE   TICKVOL BID         ASK
    # NQU7  2017-05-28T15:00:00 5800.50 12      5800.50     5800.50
    # NQU7  2017-05-28T15:00:00 5800.50 1       5800.50     5800.50
    # NQU7  2017-05-28T15:00:00 5800.50 5       5800.50     5800.50
    # NQU7  2017-05-28T15:00:00 5800.50 1       5800.50     5800.50

    if len(sys.argv) != 3:
        # print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)

    spark = SparkSession \
        .builder \
        .appName("StructuredTickStream") \
        .getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel('WARN')

    # Read all the csv files written atomically in a directory
    tickSchema = StructType([
        StructField("symbol", StringType(), True),
        StructField("dt", TimestampType(), True),
        StructField("price", FloatType(), True),
        StructField("tickvol", IntegerType(), True),
        StructField("bid", FloatType(), True),
        StructField("ask", FloatType(), True)
    ])

    events_df = spark \
        .readStream \
        .option("sep", "\t") \
        .option("host", sys.argv[1]) \
        .option("port", sys.argv[2]) \
        .format("socket") \
        .schema(tickSchema) \
        .load()

    events_df.printSchema()
    print("columns = ", events_df.columns)

    ohlc_df = events_df \
        .groupby(F.window("dt", "5 minutes", "1 minutes")) \
        .agg(
            F.first('price').alias('open'),
            F.max('price').alias('high'),
            F.min('price').alias('low'),
            F.last('price').alias('close')
        ) \
        .collect()


    query = ohlc_df \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .start()

    query.awaitTermination()

print("columns = ", events_df.columns)的输出是['value'],进程失败,跟踪如下:

pyspark.sql.utils.AnalysisException: "cannot resolve '`dt`' given input columns: [value];;\n'Aggregate [timewindow('dt, 300000000, 60000000, 0)], [timewindow('dt, 300000000, 60000000, 0) AS window#3, first('price, false) AS open#7, max('price) AS high#9, min('price) AS low#11, last('price, false) AS close#13]\n+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3a32b1a2,socket,List(),Some(StructType(StructField(symbol,StringType,true), StructField(dt,TimestampType,true), StructField(price,FloatType,true), StructField(tickvol,IntegerType,true), StructField(bid,FloatType,true), StructField(ask,FloatType,true))),List(),None,Map(sep -> \t, host -> localhost, port -> 9999),None), textSocket, [value#0]\n"

知道我做错了什么吗?你知道吗


Tags: columnsimporttruedfsqlassysdt
1条回答
网友
1楼 · 发布于 2024-09-27 07:28:20

您的数据帧只有一列value,这里您试图从这个events_df访问列dt。这是问题的主要原因。你知道吗

下面的语句清楚地显示了它有单列value

print("columns = ", events_df.columns)

你需要检查一下这个

events_df = spark \
    .readStream \
    .option("sep", "\t") \
    .option("host", sys.argv[1]) \
    .option("port", sys.argv[2]) \
    .format("socket") \
    .schema(tickSchema) \
    .load()

为什么它只使用一列创建df。你知道吗

相关问题 更多 >

    热门问题