Pyspark:似乎您正试图从广播变量、操作或转换引用SparkContext

2024-09-29 19:32:35 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图用Pyspark中的滑动窗口计算两个数据流之间的Pearson相关性。但我一直收到以下错误:

Traceback (most recent call last): File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/examples/src/main/python/streaming/Cross-Corr.py", line 63, in <module> result = Statistics.corr(windowedds1,windowedds2, method="pearson") File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/stat/_statistics.py", line 157, in corr File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/common.py", line 130, in callMLlibFunc File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/common.py", line 122, in callJavaFunc File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/common.py", line 87, in _py2java File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 555, in dumps File "/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/context.py", line 315, in __getnewargs__ Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

错误来自以下几行:

result = Statistics.corr(windowedds1,windowedds2, method="pearson")

首先,我读取两个文本文件中的行并将它们加载到两个Kafka主题中,然后对每个数据流应用窗口操作并计算它们之间的Pearson相关性。在

这是我的代码:

from __future__ import print_function
from future.builtins import *
from pyspark.ml.linalg import Vectors
from pyspark.mllib.stat import Statistics
from pyspark.ml.stat import Correlation
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import time
from collections import deque
import sys
from operator import add
import numpy as np
from itertools import chain
import warnings
from obspy import UTCDateTime
from obspy.signal.cross_correlation import templates_max_similarity
from obspy import read

if __name__ == "__main__":
    print("hello spark")

    sc = SparkContext("local[2]", appName="CrossCorrelation")
    ssc = StreamingContext(sc, 5)
    broker, topic1, topic2 = sys.argv[1:]
    # Connect to Kafka

    kvs1 = KafkaUtils.createStream(ssc, broker, "real-time-cross-correlation",{topic1:1})
    kvs2 = KafkaUtils.createStream(ssc, broker, "real-time-cross-correlation",{topic2:1})
    lines1 = kvs1.map(lambda x1: x1[1])
    ds1 = lines1.flatMap(lambda line1: line1.strip().split("\n")).map(lambda strelem1: float(strelem1))
    lines2 = kvs2.map(lambda x2: x2[1])
    ds2 = lines2.flatMap(lambda line2: line2.strip().split("\n")).map(lambda strelem2: float(strelem2))
    #Windowing
    windowedds1= ds1.window(10,5)
    windowedds2= ds2.window(10,5)
    #Correlation
    result = Statistics.corr(windowedds1,windowedds2, method="pearson")
    if result > 0.7:
        print("ds1 and ds2 are correlated!!!")

    ssc.start()
    ssc.awaitTermination()

有人知道我做错了什么吗?在

谢谢。在


Tags: lambdainfrompyimporthomebinlib

热门问题