在python集群模式下解析spark配置文件

2024-06-30 08:53:09 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试以纱线客户端纱线簇模式读取和解析配置文件(使用PySpark) 配置文件包含启动spark会话所需的行,因此需要在启动之前完成。而且看起来,Thread只在会话启动后将其复制到hdfs,这会导致读取问题。我试图停止并启动会话,但结果是将文件复制到hdfs两次,然后我得到了一个异常,因为有第三方jar和一个“已履行的承诺”,所以我宁愿只启动一次会话

到目前为止我有

火花提交

    /spark-2.4.5/bin/spark-submit   \
      --master yarn  \
      --deploy-mode "cluster" \ # or client
      --name test_app  \
      --files ClusterTest/config/FilesToRead.conf \
      --conf spark.yarn.dist.files=ClusterTest/config/FileToRead.conf \
      --py-files ClusterTest/ClusterModeTest.py \
      --class main ClusterTest/ClusterModeTest.py

配置文件

{
  "db_host": "192.168.0.1"
}

python

def get_file_json_dict(file):
    with open(file, 'r') as f:
        json_dict = json.load(f)
    return json_dict

def setup_logger(spark_session):
    sc = spark_session.sparkContext
    sc.setLogLevel("WARN")
    return sc._jvm.org.apache.log4j.LogManager.getLogger(__name__)

def main():
    FILE = "FileToRead.conf"
    parsed_json = get_file_json_dict(FILE)
    spark_session = pyspark.sql.SparkSession.builder \
                    .config("spark.cassandra.connection.host", parsed_json["db_host"]) \
                    .getOrCreate()
    log = setup_logger(spark_session)
    log.error(f" PARSED JSON [{parsed_json}]")

Tags: pyconfigjsonhostsessionconfdef配置文件