我写信给你是因为我试图创建两列:
数据i:第一个火花流窗口的“precio\U actual”列的值
data\u f:使用窗口“precio\u actual”列的最后一个值
例如,现在我有以下内容:
-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+--------------------+-------------+-------+-------+------+------+------+--------------------+-------------------+-------------------+-------------------+------+
|localSymbol| time|precio_actual| bid| ask| high| low| close| ventana| string_real_time| comienzo_ventana| fin_ventana|PRUEBA|
+-----------+--------------------+-------------+-------+-------+------+------+------+--------------------+-------------------+-------------------+-------------------+------+
| EUR.USD|2021-01-07 16:30:...| 1.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:00|2021-01-07 16:30:00|2021-01-07 16:30:30| 1.0|
| EUR.USD|2021-01-07 16:30:...| 3.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:14|2021-01-07 16:30:00|2021-01-07 16:30:30| 1.0|
| EUR.USD|2021-01-07 16:30:...| 3.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:15|2021-01-07 16:30:00|2021-01-07 16:30:30| 3.0|
| EUR.USD|2021-01-07 16:30:...| 4.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:16|2021-01-07 16:30:00|2021-01-07 16:30:30| 4.0|
| EUR.USD|2021-01-07 16:30:...| 2.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:29|2021-01-07 16:30:00|2021-01-07 16:30:30| 2.0|
+-----------+--------------------+-------------+-------+-------+------+------+------+--------------------+-------------------+-------------------+-------------------+------+
我需要的是以下内容
+-----------+--------------------+-------------+-------+-------+------+------+------+--------------------+-------------------+-------------------+-------------------+------+------+
|localSymbol| time|precio_actual| bid| ask| high| low| close| ventana| string_real_time| comienzo_ventana| fin_ventana|data_i|data_f|
+-----------+--------------------+-------------+-------+-------+------+------+------+--------------------+-------------------+-------------------+-------------------+------+------+
| EUR.USD|2021-01-07 16:30:...| 1.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:00|2021-01-07 16:30:00|2021-01-07 16:30:30| 1.0| 2.0|
| EUR.USD|2021-01-07 16:30:...| 3.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:14|2021-01-07 16:30:00|2021-01-07 16:30:30| 1.0| 2.0|
| EUR.USD|2021-01-07 16:30:...| 3.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:15|2021-01-07 16:30:00|2021-01-07 16:30:30| 1.0| 2.0|
| EUR.USD|2021-01-07 16:30:...| 4.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:16|2021-01-07 16:30:00|2021-01-07 16:30:30| 1.0| 2.0|
| EUR.USD|2021-01-07 16:30:...| 2.0|1.22665|1.22666|1.2345|1.2245|1.2328|[2021-01-07 16:30...|2021-01-07 16:30:29|2021-01-07 16:30:00|2021-01-07 16:30:30| 1.0| 2.0|
+-----------+--------------------+-------------+-------+-------+------+------+------+--------------------+-------------------+-------------------+-------------------+------+------+
我的代码如下:
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
if __name__ == "__main__":
# Paso 1: Creacion de SparkSession
spark = SparkSession \
.builder \
.appName("Spark IB") \
.master("local[3]") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config("spark.sql.shuffle.partitions", 2) \
.getOrCreate()
# Paso 2: Leer un stream del kafka topic
level_1_data_kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "level_1_data_test") \
.option("startingOffsets", "earliest") \
.load()
# Paso 3: Definicion de Schema
level_1_data_schema = StructType([
StructField("localSymbol", StringType()),
StructField("time", StringType()),
StructField("precio_actual", StringType()),
StructField("bid", StringType()),
StructField("ask", StringType()),
StructField("high", StringType()),
StructField("low", StringType()),
StructField("close", StringType()),
])
#Paso 4: Extraigo el campo "value" del kafka record
"""
col("value") el campo "value" viene en formato binario
.cast("string") Le hacemos un cast a string y ahora lo que tenemos es un string
from_json sirve para deserializar un string a json, es decir, sirve para pasar de string a json con schema en particular
.alias es para renombrar el schema, si hago level_1_data_value_df.printSchema() lo puedo ver
"""
level_1_data_value_df = level_1_data_kafka_df.select(f.from_json(f.col("value").cast("string"), level_1_data_schema).alias("value"))
# Paso 5: Transformacion de datos y ventana
level_1_data_value_transform_df = level_1_data_value_df\
.select(
"value.*"
) \
.withColumn("ventana", f.window(f.col("time"), "30 seconds")) \
.withColumn("time", f.to_timestamp(f.col("time")) ) \
.withColumn("string_real_time",f.col("time")[0:19]) \
.withColumn("precio_actual", f.col("precio_actual").cast("double"))
# Paso 6: Queries
level_1_data_value_window_agg_df = level_1_data_value_transform_df\
.select (
f.col("*"),
)\
.withColumn("comienzo_ventana",f.col("ventana.start")) \
.withColumn("fin_ventana", f.col("ventana.end")) \
.withColumn("PRUEBA", f.expr("case when comienzo_ventana < fin_ventana then precio_actual else 0 end")) \
window_query = level_1_data_value_window_agg_df.writeStream \
.format("console") \
.outputMode("update") \
.option("checkpointLocation", "chk-point-dir") \
.trigger(processingTime="5 seconds") \
.start()
window_query.awaitTermination()
我的解决方案:
您可以在列时间内使用
first
和last
或者使用select
相关问题 更多 >
编程相关推荐