我尝试在使用python udf时将文件加载到pig,我尝试了两种方法:
•(myudf1,sample1.pig):尝试从python读取文件,该文件位于我的客户机服务器上。在
•(myudf2,sample2.pig):首先将文件从hdfs加载到gruntshell,然后将其作为参数传递给python udf。在
myudf1.py
from __future__ import with_statement
def get_words(dir):
stopwords=set()
with open(dir) as f1:
for line1 in f1:
stopwords.update([line1.decode('ascii','ignore').split("\n")[0]])
return stopwords
stopwords=get_words("/home/zhge/uwc/mappings/english_stop.txt")
@outputSchema("findit: int")
def findit(stp):
stp=str(stp)
if stp in stopwords:
return 1
else:
return 0
样品1.猪:
^{pr2}$我得到:IOError:(2,'没有这样的文件或目录','/home/zhge/uwc/mappings/english_停止.txt')
对于解决方案2:
第二个多年电价:
def get_wordlists(wordbag):
stopwords=set()
for t in wordbag:
stopwords.update(t.decode('ascii','ignore'))
return stopwords
@outputSchema("findit: int")
def findit(stopwordbag, stp):
stopwords=get_wordlists(stopwordbag)
stp=str(stp)
if stp in stopwords:
return 1
else:
return 0
样品2.猪
REGISTER '/home/zhge/uwc/scripts/myudf2.py' USING jython as pyudf;
stops = load '/user/zhge/uwc/mappings/stopwords.txt' AS (stop_w:chararray);
-- this step works fine and i can see the "stops" obejct is loaded to pig
item_title = load '/user/zhge/data/item_title_sample/000000_0' USING PigStorage(',') AS (title:chararray);
T = limit item_title 1;
S = FOREACH T GENERATE pyudf.findit(stops.stop_w, title);
DUMP S;
然后我得到: 错误org.apache.pig.工具.咕噜.咕噜-错误1066:无法打开别名S的迭代器。后端错误:Scalar在输出中有多行。第一:(a),第二:(as
你的第二个例子应该有用。虽然您
LIMIT
编错了表达式,但它应该在stops
关系上。因此,它应该是:但是,因为看起来你需要先处理所有的停止词,这是不够的。您需要执行
^{pr2}$GROUP ALL
,然后将结果传递给您的get_wordlist
函数:你必须更新你的自定义项以接受一个dict列表,但是这个方法才能工作。在
相关问题 更多 >
编程相关推荐