pyspark:计算写入的行数

2024-10-01 22:31:12 发布

您现在位置:Python中文网/ 问答频道 /正文

当我这样做的时候

df: DataFrame = ...
df.write.parquet('some://location/')

我可以跟踪并报告(用于监视)刚刚写入some://location的行数吗

df.write.parquet('some://location/')
# I imagine something like:
spark_session.someWeirdApi().mostRecentOperation().number_of_rows_written

Tags: numberdataframedfsession报告locationsomeimagine
1条回答
网友
1楼 · 发布于 2024-10-01 22:31:12

在做了一些挖掘之后,我找到了一种方法:

  • 您可以通过py4j's callbacks注册QueryExecutionListener(注意,这是在source中注释的@DeveloperApi
  • 但您需要在应用程序运行结束时启动回调服务器并手动停止网关

这是受cloudera社区中的一个post的启发,我不得不将它移植到更新的spark版本(使用spark 3.0.1,那里建议的答案使用不推荐的SQLContext)和pyspark(使用py4j回调)

import numpy as np
import pandas as pd
from pyspark.sql import SparkSession, DataFrame


class Listener:
    def onSuccess(self, funcName, qe, durationNs):
        print("success", funcName, durationNs, qe.executedPlan().metrics())
        print("rows", qe.executedPlan().metrics().get("numOutputRows").value())
        print("files", qe.executedPlan().metrics().get("numFiles").value())
        print("bytes", qe.executedPlan().metrics().get("numOutputBytes").value())

    def onFailure(self, funcName, qe, exception):
        print("failure", funcName, exception, qe.executedPlan().metrics())

    class Java:
        implements = ["org.apache.spark.sql.util.QueryExecutionListener"]


def run():
    spark: SparkSession = SparkSession.builder.getOrCreate()

    df: DataFrame = spark.createDataFrame(pd.DataFrame(np.random.randn(20, 3), columns=["foo", "bar", "qux"]))

    gateway = spark.sparkContext._gateway
    gateway.start_callback_server()

    listener = Listener()
    spark._jsparkSession.listenerManager().register(listener)

    df.write.parquet("/tmp/file.parquet", mode='overwrite')

    spark._jsparkSession.listenerManager().unregister(listener)

    spark.stop()
    spark.sparkContext.stop()
    gateway.shutdown()


if __name__ == '__main__':
    run()

相关问题 更多 >

    热门问题