使用结构化流(PySpark)运行链式查询

2024-10-01 22:28:51 发布

您现在位置:Python中文网/ 问答频道 /正文

我的代码有点像

df = spark.readStream.option("header","true") \
    .schema(df_schema)\
    .csv(df_file)
df2 = df.filter(df.col == 1)
df3 = df2.withColumn("new_col", udf_f(df2.some_col))
dfc = df3.where(df3.new_col == 2).count()
query = dfc.writeStream.outputMode("append").format("console").start()
query.awaitTermination()

我在dfc行得到错误消息Queries with streaming sources must be executed with writeStream.start(),但我不确定我做错了什么。Spark structured streaming不支持这样的链式查询吗?据我所知,我没有做任何分支。在

编辑:

通过从dfc行中删除count(),我得到了一个由query.awaitTermination()调用引起的新错误StreamingQueryException: Exception thrown in awaitResult。你知道为什么count()不起作用了吗?还有为什么会出现新的错误?在

编辑2:

如果我直接登录到控制台而不运行df之后的所有中间查询,它是有效的。但是,每次尝试运行附加查询时,StreamingQueryException都会被引发。在


Tags: dfnewschemacount错误withcolquery
1条回答
网友
1楼 · 发布于 2024-10-01 22:28:51

由于structured streaming的性质,不可能以与静态数据帧相同的方式获取计数。创建流时,Spark正在使用trigger为新数据轮询源。如果有任何Spark,则将其拆分为小数据帧(微批处理)并沿流传递(转换、聚合、输出)。在

如果需要获取记录数,可以添加一个listener to get progress updates并在onQueryProgress(QueryProgressEvent event)中获得输入数。在

很难说为什么你得到StreamingQueryException,因为filter()和{}在结构化流媒体中工作正常。 您是否在控制台中看到其他可能导致Exception thrown in awaitResult的错误?在

顺便说一下,如果您在一个会话中有多个流,您应该使用spark.streams.awaitAnyTermination()来阻止,直到其中任何一个流终止。在

以下查询应正常工作:

query = spark.readStream
    .option("header","true") \
    .schema(df_schema)\
    .csv(df_file)\
    .filter(df.col == 1)\
    .withColumn("new_col", udf_f(df2.some_col))\
    .writeStream\
    .format("console")\
    .outputMode("append")\
    .start()

query.awaitTermination()
# or spark.streams().awaitAnyTermination()

相关问题 更多 >

    热门问题