SparkContext只能在驱动程序上使用,不能在工作程序上运行的代码中使用

2024-06-28 11:34:54 发布

您现在位置:Python中文网/ 问答频道 /正文

在spark上下文文本文件上应用映射函数读取带有多个分隔符的文件时,我收到错误消息。下面是引发错误的代码

import json
import base64
import re
import subprocess
from subprocess import PIPE
from sys import argv
from pyspark.sql import SparkSession
from py4j.protocol import Py4JJavaError
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark import SparkConf, SparkContext
import pprint
import sys
import os
import datetime
from pyspark import StorageLevel
from subprocess import Popen, PIPE
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import Row


class sparkimportOperations(object):
    def __init__(self, starttime, source_filename, source_delimiter):
        self.spark = ''
        self.sc = ''
        self.headcal = ''
        self.source_delimiter = source_delimiter
        self.source_filename = source_filename
        self.starttime = starttime

    def initializeSparkSession(self):
        try:
            print('HIFI_SPARK_INFO - initializeSparkSession() - Initializing Spark Session')
            self.spark = SparkSession.builder.appName("ppname").getOrCreate()
            self.sc = self.spark.sparkContext
            print('HIFI_SPARK_DEBUG - Initialized Spark Session')
            self.spark.sparkContext.setLogLevel("WARN")
            return True
        except Py4JJavaError as e:
            print('HIFI_SPARK_ERROR - initializeSparkSession() - Failed to Initialize Spark Session')
            self.status = "FAILED"
            self.error = str(e)
            self.error = self.error + 'HIFI_SPARK_ERROR - initializeSparkSession() - Failed to Initialize Spark Session'
            return False

    def importsrctemptable(self):
        self.headcal = self.spark.read.text(self.source_filename)
        df = self.spark.sparkContext.parallelize(self.headcal.take(1)).map(lambda x: Row(x)).toDF()
        df.write.json("/hdfsData/bdipoc/poc/Inbound/hifitmp/.HIFI/header_datfile" + self.starttime + ".json")
        self.headcal = self.spark.read.json(
            "/hdfsData/bdipoc/poc/Inbound/hifitmp/.HIFI/header_datfile" + self.starttime + ".json").collect()
        self.headers = self.headcal[0][0]['value']
        self.header_column = self.headers.split(self.source_delimiter)
        self.inmemdf = self.spark.sparkContext.textFile(self.source_filename).map(
            lambda x: x.split(self.source_delimiter)).toDF(self.header_column)
        self.inmemdf.show(100, False)

    def sparkImportMain(self):
        if self.initializeSparkSession():
            if self.importsrctemptable():
                return True


source_filename = '/hdfsData/bdipoc/poc/Inbound/hifi_unit_test/db2/T5706_CET_ITM_INV_MSV/'
starttime = '10121'
source_delimiter = "|,"
executeimport = sparkimportOperations(starttime, source_filename, source_delimiter)
out = executeimport.sparkImportMain()

上述程序的错误

    rv = reduce(self.proto)
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/context.py", line 330, in __getnewargs__
    "It appears that you are attempting to reference SparkContext from a broadcast "
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

我有另一个原始程序做同样的逻辑,没有类和def函数,没有任何问题

from pyspark.sql import SparkSession
from pyspark.sql import Row
spark=SparkSession.builder.appName("spark engine").getOrCreate()
source_filename='/hdfsData/bdipoc/poc/Inbound/hifi_unit_test/db2/T5706_CET_ITM_INV_MSV/'
starttime = '1012'
source_delimiter = "|,"
headcal = spark.read.text(source_filename)
df = spark.sparkContext.parallelize(headcal.take(1)).map(lambda x: Row(x)).toDF()
df.write.json("/hdfsData/bdipoc/poc/Inbound/hifitmp/.HIFI/header_datfile" + starttime + ".json")
headcal=spark.read.json("/hdfsData/bdipoc/poc/Inbound/hifitmp/.HIFI/header_datfile" + starttime + ".json").collect()
headers = headcal[0][0]['value']
header_column = headers.split(source_delimiter)
inmemdf = spark.sparkContext.textFile(source_filename).map(lambda x: x.split(source_delimiter)).toDF(header_column)
inmemdf.show(10,False)

这两个程序都执行相同的操作,它读取一个“|,”分隔的文件,但是在其中一个中调用代码以导出,而另一个是原始python代码

It appears that you are attempting to reference SparkContext from a broadcast

请帮忙


Tags: fromimportselfjsonsourcesqlfilenamespark