Apache Spark广播变量给出mappartitions的错误

2024-10-01 17:33:42 发布

您现在位置:Python中文网/ 问答频道 /正文

我是python和Spark的新手,这里我尝试广播Spark Rtree索引。当我尝试用mapPartitions函数广播索引时,它会出现以下错误

在Windows上:

“文件”avlClass.py“,第42行,在avlFileLine中

for j in bv.intersection([x_meters-buffer_size,y_meters-buffer_size,x_meters
buffer_size,y_meters+buffer_size]):
File "C:\Python27\ArcGIS10.3\lib\site-packages\rtree\index.py", line 440, in in
tersection p_mins, p_maxs = self.get_coordinate_pointers(coordinates)
File "C:\Python27\ArcGIS10.3\lib\site-packages\rtree\index.py", line 294, in ge
t_coordinate_pointers
dimension = self.properties.dimension
File "C:\Python27\ArcGIS10.3\lib\site-packages\rtree\index.py", line 883, in ge
t_dimension
return core.rt.IndexProperty_GetDimension(self.handle)
indowsError: exception: access violation reading 0x00000004
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala
166)

在Linux中:

^{pr2}$

文件:avlClass.py在

import fiona
from shapely.geometry import shape,Point, LineString, Polygon
from shapely.ops import transform
from rtree import index
from numpy import math
import os
import pyproj
from functools import partial
from pyspark import SparkContext, SparkConf

class avlClass(object):
    def __init__(self,name):
        self.name=name

    def create_index(self):
        # Read the ESRI Shape File
        shapeFileName='C:\\shapefiles\\Road.shp'
        polygons= [ pol for pol in fiona.open(shapeFileName,'r') ]
        p=index.Property()
        p.dimension=2
        self_idx=index.Index(property=p)
        # Create Index Entries
        for pos,features in enumerate(polygons):
           self_idx.insert(pos,LineString(features['geometry']  ['coordinates']).bounds )
        return self_idx


    def avlFileLine(self,iter,bv):
      for line in iter:
            splits =line.split(',')
            lat= float(splits[2])
            long= float(splits[3])
            print  lat,long
            x='No'

            # Test the index from broadcast Variable bv
           buffer_size=10
            x_meters=-9511983.32151
            y_meters=4554613.80307
            for j in bv.intersection([x_meters-buffer_size,y_meters-buffer_size,x_meters+buffer_size,y_meters+buffer_size]):
                x= "FOUND"

            yield lat,long,heading_radians,x

文件:avlSpark.py在

import fiona
from shapely.geometry import shape,Point, LineString, Polygon
from shapely.ops import transform
from rtree import index
from numpy import math
import os
import pyproj
from functools import partial
from pyspark import SparkContext, SparkConf
from avlClass import avlClass

if __name__ == '__main__':
    conf = SparkConf().setAppName('AVL_Spark_Job')
    conf = SparkConf().setMaster('local[*]')
    sc= SparkContext(conf=conf)

    sc.addPyFile("avlClass.py")
    test_avlClass=avlClass("Test")

    print test_avlClass.name
    idx= test_avlClass.create_index()

    # Test the created index
    buffer_size=10
    x_meters=-9511983.32151
    y_meters=4554613.80307
    for j in idx.intersection([x_meters-buffer_size,y_meters-buffer_size,x_meters+buffer_size,y_meters+buffer_size]):
         print "FOUND"  # Index Worked

    # broadcast Index for Partitions
    idx2=sc.broadcast(idx)



    FileName='c:\\test\\file1.txt'
    avlFile=sc.textFile(FileName).mapPartitions(lambda line: test_avlClass.avlFileLine(line,idx2.value))
    for line in avlFile.take(10):
     print line

Tags: nameinfrompyimportselfforsize
1条回答
网友
1楼 · 发布于 2024-10-01 17:33:42

我看到的是你正在创建一个广播变量:

# broadcast Index for Partitions
idx2=sc.broadcast(idx)

但是将它的.值传递到AvlFileLine中:

^{pr2}$

但idx和idx2都不是rdd。idx2,作为广播变量,will take on whatever class idx is. (I actually asked this question based on your question :)

您仍然将传递的参数视为广播变量,但尝试将其视为RDD,presumably a PythonRDDwhich as noted, it's not.广播变量不是RDD,它只是您分配给它的任何类型。此外,您还将其值(使用.value())传递到AVLFileLine。在

所以当你对它调用intersection()时,它会爆炸。我很惊讶它没有更好地爆炸,但是我在Java中工作,编译器会捕捉到这个问题,我假设在Python中,解释器只是愉快地运行,直到它碰到一个错误的内存位置,然后你得到一条丑陋的错误消息:)

我认为最好的方法是从一开始就重新考虑你的代码,它只是没有正确使用Spark。我不太了解您的具体应用程序,所以我最好的猜测是您需要放弃intersection()而改为look again at the RDD programming guide part of the Spark docs for Python。找到一种方法将idx2中的value应用到avlfile,这是一个RDD。您需要避免在传递的函数中出现for循环,Spark通过将传递给RDD的每个元素的函数应用于您的“for”循环。记住,结果将是另一个RDD。在

psuedo Java代码中,它看起来像:

 SomeArray theArray = avlfile.map({declare inline or call function}).collect(<if the RDD is not too big for collect>) 

我希望这有帮助,如果您还没有这样做,a great book is Learning Spark by O'Reilly和{a6},这是{a7}的下一步。Learning Spark的租金不到10美元,在我的情况下,作为一名大学生,我可以通过狩猎书籍免费获得它。在

编写Spark程序有一个陡峭的学习曲线,如果你不习惯从函数式编程的角度来思考,那么我就在你前面不远了,而且你还不太了解Spark编程模型。我希望这些都有帮助。在


同样,正如在这个答案的原始编辑中所指出的,您对SparkConf的调用是错误的,I had to go back a ways in the docs (.9)来查找示例,但您希望这样做:

from pyspark import SparkConf, SparkContext
conf = (SparkConf()
         .setMaster("local")
         .setAppName("My app")
         .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)

在独立程序部分。。。现在我相信你对conf的第二个任务覆盖了第一个任务。在


小结:我不明白你会在一个broadcast variable, a broadcast variable is NOT an RDD but simply a data structure like a global that is read上调用RDD函数(而不是由所有工人写入)。Per the Broadcast class in Scala

From the docs on Broadcast Variables

>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>

>>> broadcastVar.value
[1, 2, 3]

在我看来,对bv(不是RDD)调用intersection()是没有意义的。在

相关问题 更多 >

    热门问题