使用PySp处理RDD中的缺失数据

2024-10-05 10:13:32 发布

您现在位置:Python中文网/ 问答频道 /正文

我在清理数据时遇到了一些问题,特别是丢失的数据。在

解析我的数据文件并进行初步的数据清理后,数据快照如下:

r2.take(5)
Out[210]: 
[u'2016-10-26 13.43.06 C03 R27 80327 ONYX E 2605',
 u'2016-10-26 18.16.57 C04 R34 80544 GOLD E 7094',
 u'2016-10-26 18.24.15 C04 R33 80544 GOLD E 6875',
 u'2016-10-26 17.43.57 C03 R19 80908 ONYX E 5214',
 u'2016-10-26 17.07.55 C03 R27 80436 GOLD E 3436']

基于Spark教程,我将执行以下步骤[最终]创建Spark数据帧(我省略了一些步骤):

^{pr2}$

再过几步,我试图运行一些简单的命令,比如.groupBy().count().collect(),但却遇到了错误。当我回到我的数据时,我发现我丢失了数据,尤其是当我运行以下命令时:

parts.filter(lambda x: len(x) != 8).take(12)

Out[236]: 
[[u'2016-10-26', u'10.25.26', u'C04', u'800779', u'BLUE', u'E'],
 [u'2016-10-26', u'11.19.42', u'C03', u'803817', u'ONYX', u'E'],
 [u'2016-10-26', u'11.22.12', u'C04', u'8132705', u'GOLD', u'E'],
 [u'2016-10-26', u'11.22.22', u'C04', u'8154705', u'GOLD', u'E'],
 [u'2016-10-26', u'12.34.22', u'C05', u'8125781', u'ONYX', u'E'],
 [u'2016-10-26', u'12.40.05', u'C05', u'812381', u'ONYX', u'E'],
 [u'2016-10-26', u'22.16.42', u'C03', u'832347', u'ONYX', u'E'],
 [u'2016-10-26', u'18.07.36', u'C01', u'852300', u'GOLD', u'E'],
 [u'2016-10-26', u'11.26.39', u'C02', u'845424', u'BLUE', u'F'],
 [u'2016-10-26',
  u'12.16.00',
  u'C05',
  u'R39',
  u'845436668',
  u'BLUE',
  u'6975'],
 [u'2016-10-26', u'11.26.58', u'C05', u'83344280', u'GOLD', u'F']]

如您所见,我们缺少以下列的观察结果:商品代码、语言和总销售额。在

我要做的是if-else语句(或类似语句),这样我就可以修复这些列。我在SQL环境中完成了这段代码,但不确定如何将其转换为PySpark环境。在

例如,如果我们在SQL环境中工作,下面是我的SQL代码:

select date, time, category_code, 
if(item_code like '8%','',item_code) as item_code,

/* Lets look at this code in the example below*/
if(item_code like '8%',item_code,if(purchase_ref not in ('GOLD','ONYX'),purchase_ref,'')) as purchase_ref,

if(purchase_ref in ('GOLD','ONYX'),purchase_ref,if(selling_team in ('GOLD','ONYX'),selling_team,'')) as selling_team,

if(selling_team in ('E','F'),selling_team,if(language in ('E','F'),language,'')) as language,

if(gross_sales is null,language,gross_sales) as gross_sales

让我们看一个例子。purchase_ref总是以数字“8”开头。对于purchase_ref列,如果item_code包含purchase_ref数,这是由于缺少数据而转移的值,那么它的结果应该是该数字。如果它包含一个selling_team值,请将其留空。在

当我试图修复丢失数据的行时,SQL代码起作用了。在

我的问题是: 如何从这个问题中清除我的数据?在

另外,如果我在Spark数据帧中这样做是否更可行(我可以想象我将如何使用Pandas np.where来处理这个问题,但不确定这在这里是如何翻译的)?在

谢谢。在

编辑:

为了更清楚地了解我的初步步骤,这些步骤如下:

从AWS文件导入文本。这是它看起来像是生的:

myApacheLogs.take(25)
Out[227]: 
[u'  2016-10-26  13.43.06  C03    R27       84327  ONYX  E     2605                                        ',
 u'',
 u'  2016-10-26  18.16.57  C04    R34       896544  GOLD  E     7094                                        ',
 u'',
 u'  2016-10-26  18.24.15  C04    R33       867544  GOLD  E     6875                                        ',
 u'',
 u'  2016-10-26  17.43.57  C03    R19       8776908  ONYX  E     5214                                        ',
 u'',
 u'  2016-10-26  17.07.55  C03    R27       8654436  GOLD  E     3436                                        ']

我做了一些清洁工作:

r1 = myApacheLogs.filter(lambda x: x is not u'').filter(lambda x: x is not u'')
r1.take(10)
Out[209]: 
[u'  2016-10-26  13.43.06  C03    R27       84327  ONYX  E     2605                                        ',
 u'  2016-10-26  18.16.57  C04    R34       84544  GOLD  E     7094                                        ',
 u'  2016-10-26  18.24.15  C04    R33       84544  GOLD  E     6875                                        ',
 u'  2016-10-26  17.43.57  C03    R19       895408  ONYX  E     5214                                        ',
 u'  2016-10-26  17.07.55  C03    R27       86436  GOLD  E     3436                                        ',
 u'  2016-10-26  13.01.30  C05    R43       248945  GOLD  E     6956                                        ',
 u'  2016-10-26  10.07.10  C03    R29       863356  GOLD  E     0861                                        ',
 u'  2016-10-26  11.31.19  C04    R32       865684  ONYX  E     2595                                        ',
 u'  2016-10-26  13.31.33  C01    RR1       865690  ONYX  F     7093                                        ',
 u'  2016-10-26  14.06.37  C03    R20       865243  GOLD  E     6959                                        ']

def myFunc(s):
  results = re.sub("\s\s+" , " ", s) # Properly space columns
  results = results.lstrip() # Remove whitespace on the left
  results = results.rstrip() # Remove whitespace on the right
  return results
r2 = r1.map(myFunc)

Tags: 数据inrefifcodepurchaseitemresults

热门问题