Pypark公司_Pickle.PicklingError错误:无法序列化对象:TypeError:无法pickle_螺纹锁紧物体

2024-06-26 10:55:13 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个包含多个IP的数据集,我想查找IP的大致位置。你知道吗

使用geoip(https://pythonhosted.org/python-geoip/)进行查找。你知道吗

代码段:

def convert_location(ip_address):
    location = geolite2.lookup(ip_address).location
    return location

convert_location_udf = udf(convert_location, StringType())

scamsDf = spark.read.option("header", "true").option("inferSchema","true").json(<data_location>, multiLine=True)

ipDf = scamsDf.select("ip")

locationsDF = ipDf.withColumn("location", convert_location_udf(ipDf.ip))

它工作正常,直到最后一行查找IP。使用geolite模块是一个问题,但我不知道如何解决。这是导致Pickle错误的原因,但我不知道如何避免它。你知道吗

完全错误:

Traceback (most recent call last):
  File "/import/bigdata/andromeda-conf/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/pyspark/serializers.py", line 587, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/import/bigdata/andromeda-conf/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/pyspark/cloudpickle.py", line 863, in dumps
    cp.dump(obj)
  File "/import/bigdata/andromeda-conf/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/pyspark/cloudpickle.py", line 260, in dump
    return Pickler.dump(self, obj)
  File "/homes/nj303/anaconda2/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/homes/nj303/anaconda2/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/homes/nj303/anaconda2/lib/python2.7/pickle.py", line 554, in save_tuple
    save(element)
  File "/homes/nj303/anaconda2/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/import/bigdata/andromeda-conf/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/pyspark/cloudpickle.py", line 400, in save_function
    self.save_function_tuple(obj)
  File "/import/bigdata/andromeda-conf/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/pyspark/cloudpickle.py", line 549, in save_function_tuple
    save(state)
  File "/homes/nj303/anaconda2/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/homes/nj303/anaconda2/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/homes/nj303/anaconda2/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/homes/nj303/anaconda2/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/homes/nj303/anaconda2/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/homes/nj303/anaconda2/lib/python2.7/pickle.py", line 692, in _batch_setitems
    save(v)
  File "/homes/nj303/anaconda2/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/homes/nj303/anaconda2/lib/python2.7/pickle.py", line 425, in save_reduce
    save(state)
  File "/homes/nj303/anaconda2/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/homes/nj303/anaconda2/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/homes/nj303/anaconda2/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/homes/nj303/anaconda2/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/import/bigdata/andromeda-conf/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/pyspark/sql/udf.py", line 189, in wrapper
    return self(*args)
  File "/import/bigdata/andromeda-conf/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/pyspark/sql/udf.py", line 167, in __call__
    judf = self._judf
  File "/import/bigdata/andromeda-conf/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/pyspark/sql/udf.py", line 151, in _judf
    self._judf_placeholder = self._create_judf()
  File "/import/bigdata/andromeda-conf/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/pyspark/sql/udf.py", line 160, in _create_judf
    wrapped_func = _wrap_function(sc, self.func, self.returnType)
  File "/import/bigdata/andromeda-conf/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/pyspark/sql/udf.py", line 35, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/import/bigdata/andromeda-conf/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/import/bigdata/andromeda-conf/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/pyspark/serializers.py", line 597, in dumps
    raise pickle.PicklingError(msg)
cPickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects


Tags: inpyimportselfobjsavelibline