在spark上下文文本文件上应用映射函数读取带有多个分隔符的文件时,我收到错误消息。下面是引发错误的代码
import json
import base64
import re
import subprocess
from subprocess import PIPE
from sys import argv
from pyspark.sql import SparkSession
from py4j.protocol import Py4JJavaError
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark import SparkConf, SparkContext
import pprint
import sys
import os
import datetime
from pyspark import StorageLevel
from subprocess import Popen, PIPE
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import Row
class sparkimportOperations(object):
def __init__(self, starttime, source_filename, source_delimiter):
self.spark = ''
self.sc = ''
self.headcal = ''
self.source_delimiter = source_delimiter
self.source_filename = source_filename
self.starttime = starttime
def initializeSparkSession(self):
try:
print('HIFI_SPARK_INFO - initializeSparkSession() - Initializing Spark Session')
self.spark = SparkSession.builder.appName("ppname").getOrCreate()
self.sc = self.spark.sparkContext
print('HIFI_SPARK_DEBUG - Initialized Spark Session')
self.spark.sparkContext.setLogLevel("WARN")
return True
except Py4JJavaError as e:
print('HIFI_SPARK_ERROR - initializeSparkSession() - Failed to Initialize Spark Session')
self.status = "FAILED"
self.error = str(e)
self.error = self.error + 'HIFI_SPARK_ERROR - initializeSparkSession() - Failed to Initialize Spark Session'
return False
def importsrctemptable(self):
self.headcal = self.spark.read.text(self.source_filename)
df = self.spark.sparkContext.parallelize(self.headcal.take(1)).map(lambda x: Row(x)).toDF()
df.write.json("/hdfsData/bdipoc/poc/Inbound/hifitmp/.HIFI/header_datfile" + self.starttime + ".json")
self.headcal = self.spark.read.json(
"/hdfsData/bdipoc/poc/Inbound/hifitmp/.HIFI/header_datfile" + self.starttime + ".json").collect()
self.headers = self.headcal[0][0]['value']
self.header_column = self.headers.split(self.source_delimiter)
self.inmemdf = self.spark.sparkContext.textFile(self.source_filename).map(
lambda x: x.split(self.source_delimiter)).toDF(self.header_column)
self.inmemdf.show(100, False)
def sparkImportMain(self):
if self.initializeSparkSession():
if self.importsrctemptable():
return True
source_filename = '/hdfsData/bdipoc/poc/Inbound/hifi_unit_test/db2/T5706_CET_ITM_INV_MSV/'
starttime = '10121'
source_delimiter = "|,"
executeimport = sparkimportOperations(starttime, source_filename, source_delimiter)
out = executeimport.sparkImportMain()
上述程序的错误
rv = reduce(self.proto)
File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/context.py", line 330, in __getnewargs__
"It appears that you are attempting to reference SparkContext from a broadcast "
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
我有另一个原始程序做同样的逻辑,没有类和def函数,没有任何问题
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark=SparkSession.builder.appName("spark engine").getOrCreate()
source_filename='/hdfsData/bdipoc/poc/Inbound/hifi_unit_test/db2/T5706_CET_ITM_INV_MSV/'
starttime = '1012'
source_delimiter = "|,"
headcal = spark.read.text(source_filename)
df = spark.sparkContext.parallelize(headcal.take(1)).map(lambda x: Row(x)).toDF()
df.write.json("/hdfsData/bdipoc/poc/Inbound/hifitmp/.HIFI/header_datfile" + starttime + ".json")
headcal=spark.read.json("/hdfsData/bdipoc/poc/Inbound/hifitmp/.HIFI/header_datfile" + starttime + ".json").collect()
headers = headcal[0][0]['value']
header_column = headers.split(source_delimiter)
inmemdf = spark.sparkContext.textFile(source_filename).map(lambda x: x.split(source_delimiter)).toDF(header_column)
inmemdf.show(10,False)
这两个程序都执行相同的操作,它读取一个“|,”分隔的文件,但是在其中一个中调用代码以导出,而另一个是原始python代码
It appears that you are attempting to reference SparkContext from a broadcast
请帮忙
目前没有回答
相关问题 更多 >
编程相关推荐