我在清理数据时遇到了一些问题,特别是丢失的数据。在
解析我的数据文件并进行初步的数据清理后,数据快照如下:
r2.take(5)
Out[210]:
[u'2016-10-26 13.43.06 C03 R27 80327 ONYX E 2605',
u'2016-10-26 18.16.57 C04 R34 80544 GOLD E 7094',
u'2016-10-26 18.24.15 C04 R33 80544 GOLD E 6875',
u'2016-10-26 17.43.57 C03 R19 80908 ONYX E 5214',
u'2016-10-26 17.07.55 C03 R27 80436 GOLD E 3436']
基于Spark教程,我将执行以下步骤[最终]创建Spark数据帧(我省略了一些步骤):
^{pr2}$再过几步,我试图运行一些简单的命令,比如.groupBy().count().collect(),但却遇到了错误。当我回到我的数据时,我发现我丢失了数据,尤其是当我运行以下命令时:
parts.filter(lambda x: len(x) != 8).take(12)
Out[236]:
[[u'2016-10-26', u'10.25.26', u'C04', u'800779', u'BLUE', u'E'],
[u'2016-10-26', u'11.19.42', u'C03', u'803817', u'ONYX', u'E'],
[u'2016-10-26', u'11.22.12', u'C04', u'8132705', u'GOLD', u'E'],
[u'2016-10-26', u'11.22.22', u'C04', u'8154705', u'GOLD', u'E'],
[u'2016-10-26', u'12.34.22', u'C05', u'8125781', u'ONYX', u'E'],
[u'2016-10-26', u'12.40.05', u'C05', u'812381', u'ONYX', u'E'],
[u'2016-10-26', u'22.16.42', u'C03', u'832347', u'ONYX', u'E'],
[u'2016-10-26', u'18.07.36', u'C01', u'852300', u'GOLD', u'E'],
[u'2016-10-26', u'11.26.39', u'C02', u'845424', u'BLUE', u'F'],
[u'2016-10-26',
u'12.16.00',
u'C05',
u'R39',
u'845436668',
u'BLUE',
u'6975'],
[u'2016-10-26', u'11.26.58', u'C05', u'83344280', u'GOLD', u'F']]
如您所见,我们缺少以下列的观察结果:商品代码、语言和总销售额。在
我要做的是if-else语句(或类似语句),这样我就可以修复这些列。我在SQL环境中完成了这段代码,但不确定如何将其转换为PySpark环境。在
例如,如果我们在SQL环境中工作,下面是我的SQL代码:
select date, time, category_code,
if(item_code like '8%','',item_code) as item_code,
/* Lets look at this code in the example below*/
if(item_code like '8%',item_code,if(purchase_ref not in ('GOLD','ONYX'),purchase_ref,'')) as purchase_ref,
if(purchase_ref in ('GOLD','ONYX'),purchase_ref,if(selling_team in ('GOLD','ONYX'),selling_team,'')) as selling_team,
if(selling_team in ('E','F'),selling_team,if(language in ('E','F'),language,'')) as language,
if(gross_sales is null,language,gross_sales) as gross_sales
让我们看一个例子。purchase_ref
总是以数字“8”开头。对于purchase_ref
列,如果item_code
包含purchase_ref
数,这是由于缺少数据而转移的值,那么它的结果应该是该数字。如果它包含一个selling_team
值,请将其留空。在
当我试图修复丢失数据的行时,SQL代码起作用了。在
我的问题是: 如何从这个问题中清除我的数据?在
另外,如果我在Spark数据帧中这样做是否更可行(我可以想象我将如何使用Pandas np.where
来处理这个问题,但不确定这在这里是如何翻译的)?在
谢谢。在
编辑:
为了更清楚地了解我的初步步骤,这些步骤如下:
从AWS文件导入文本。这是它看起来像是生的:
myApacheLogs.take(25)
Out[227]:
[u' 2016-10-26 13.43.06 C03 R27 84327 ONYX E 2605 ',
u'',
u' 2016-10-26 18.16.57 C04 R34 896544 GOLD E 7094 ',
u'',
u' 2016-10-26 18.24.15 C04 R33 867544 GOLD E 6875 ',
u'',
u' 2016-10-26 17.43.57 C03 R19 8776908 ONYX E 5214 ',
u'',
u' 2016-10-26 17.07.55 C03 R27 8654436 GOLD E 3436 ']
我做了一些清洁工作:
r1 = myApacheLogs.filter(lambda x: x is not u'').filter(lambda x: x is not u'')
r1.take(10)
Out[209]:
[u' 2016-10-26 13.43.06 C03 R27 84327 ONYX E 2605 ',
u' 2016-10-26 18.16.57 C04 R34 84544 GOLD E 7094 ',
u' 2016-10-26 18.24.15 C04 R33 84544 GOLD E 6875 ',
u' 2016-10-26 17.43.57 C03 R19 895408 ONYX E 5214 ',
u' 2016-10-26 17.07.55 C03 R27 86436 GOLD E 3436 ',
u' 2016-10-26 13.01.30 C05 R43 248945 GOLD E 6956 ',
u' 2016-10-26 10.07.10 C03 R29 863356 GOLD E 0861 ',
u' 2016-10-26 11.31.19 C04 R32 865684 ONYX E 2595 ',
u' 2016-10-26 13.31.33 C01 RR1 865690 ONYX F 7093 ',
u' 2016-10-26 14.06.37 C03 R20 865243 GOLD E 6959 ']
def myFunc(s):
results = re.sub("\s\s+" , " ", s) # Properly space columns
results = results.lstrip() # Remove whitespace on the left
results = results.rstrip() # Remove whitespace on the right
return results
r2 = r1.map(myFunc)
目前没有回答
相关问题 更多 >
编程相关推荐