<p>我的解决方案:</p>
<pre><code>from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
if __name__ == '__main__':
# Paso 1: Creacion de SparkSession
spark = SparkSession \
.builder \
.appName('Spark IB') \
.master('local[3]') \
.config('spark.streaming.stopGracefullyOnShutdown', 'true') \
.config('spark.sql.shuffle.partitions', 2) \
.getOrCreate()
# Paso 2: Leer un stream del kafka topic
level_1_data_kafka_df = spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'localhost:9092') \
.option('subscribe', 'level_1_data_test') \
.option('startingOffsets', 'earliest') \
.load()
# Paso 3: Definicion de Schema
level_1_data_schema = StructType([
StructField('localSymbol', StringType()),
StructField('time', StringType()),
StructField('precio_actual', StringType()),
StructField('bid', StringType()),
StructField('ask', StringType()),
StructField('high', StringType()),
StructField('low', StringType()),
StructField('close', StringType()),
])
#Paso 4: Extraigo el campo 'value' del kafka record
'''
col('value') el campo 'value' viene en formato binario
.cast('string') Le hacemos un cast a string y ahora lo que tenemos es un string
from_json sirve para deserializar un string a json, es decir, sirve para pasar de string a json con schema en particular
.alias es para renombrar el schema, si hago level_1_data_value_df.printSchema() lo puedo ver
'''
level_1_data_value_df = level_1_data_kafka_df.select(f.from_json(f.col('value').cast('string'), level_1_data_schema).alias('value'))
# Paso 5: Transformacion de datos y ventana
level_1_data_value_transform_df = level_1_data_value_df\
.select(
'value.*'
) \
.withColumn('ventana', f.window(f.col('time'), '30 seconds')) \
.withColumn('comienzo_ventana', f.col('ventana.start')) \
.withColumn('fin_ventana', f.col('ventana.end')) \
.withColumn('time', f.to_timestamp(f.col('time')) ) \
.withColumn('string_real_time',f.col('time')[0:19]) \
.withColumn('precio_actual', f.col('precio_actual').cast('double')) \
# Paso 6: Queries
level_1_data_value_window_agg_df = level_1_data_value_transform_df \
.groupBy(
f.col('localSymbol'),
f.col('ventana')
) \
.agg(
f.max(f.col('precio_actual')).alias('precio_max_ventana'),
f.min(f.col('precio_actual')).alias('precio_min_ventana'),
f.first(f.col('precio_actual')).alias('precio_ini_ventana'),
f.last(f.col('precio_actual')).alias('precio_fin_ventana'),
)
level_1_data_output_df = level_1_data_value_window_agg_df\
.select(
f.col('ventana.start').alias('inicio_ventana'),
f.col('ventana.end').alias('fin_ventana'),
f.col('precio_max_ventana'),
f.col('precio_min_ventana'),
f.col('precio_ini_ventana'),
f.col('precio_fin_ventana'),
)
window_query = level_1_data_output_df.writeStream \
.format('console') \
.outputMode('update') \
.option('checkpointLocation', 'chk-point-dir') \
.trigger(processingTime='5 seconds') \
.start()
window_query.awaitTermination()
</code></pre>