我需要对Spark数据帧中列的每个单元格应用一个方法。我正在使用数据库查找单元格的值。我使用的是一个UDF,它将数据库作为如下所示的输入,但它不起作用并返回一个错误。你知道吗
from pyspark.sql.functions import udf, col
import random
asndb = pyasn.pyasn('/dbfs/mnt/geoip/ipasn.db')
def asn_mapper(ip, asndb):
try:
ret = asndb.lookup(ip)
ret = ret[0]
if ret == None:
return '0'
else: return str(ret)
except:
return '0'
def make_asn(asndb):
return udf(lambda c: asn_mapper(c, asndb))
b= sqlContext.createDataFrame([("A", '22.33.44.55'), ("B", '11.22.11.44'), ("D", '44.32.11.44')],["Letter", "ip"])
b.withColumn("asn", make_asn(asndb)(col("ip"))).show()
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o1094.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 276, 10.65.251.77, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 394, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/databricks/spark/python/pyspark/worker.py", line 246, in read_udfs
arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf)
File "/databricks/spark/python/pyspark/worker.py", line 160, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/databricks/spark/python/pyspark/worker.py", line 71, in read_command
command = serializer.loads(command.value)
File "/databricks/spark/python/pyspark/serializers.py", line 672, in loads
return pickle.loads(obj)
UnpicklingError: state is not a dictionary
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:496)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
但如果我把数据库放在自定义项里面,它就可以工作了。以下代码起作用。我不想在UDF中添加pyasn.pyasn('/dbfs/mnt/geoip/ipasn.db')
,这会让它变得非常慢。你知道吗
def asn_mapper(ip):
asndb = pyasn.pyasn('/dbfs/mnt/geoip/ipasn.db')
try:
ret = asndb.lookup(ip)
ret = ret[0]
if ret == None:
return '0'
else: return str(ret)
except:
return '0'
def make_asn():
return udf(lambda c: asn_mapper(c, ))
b= sqlContext.createDataFrame([("A", '22.33.44.55'), ("B", '11.22.11.44'), ("D", '44.32.11.44')],["Letter", "ip"])
b.withColumn("asn", make_asn()(col("ip"))).show()
有没有办法让第一个代码运行?你知道吗
在我看来,你似乎是想从geodb查找ip,在两个表上使用join。在ip列上。这应该能奏效。你知道吗
相关问题 更多 >
编程相关推荐