<p>好吧,用maxymo的例子,我把我自己的可重用代码放在一起。这并不完全是我想要的,但它让我更接近于我想要如何解决这个特殊问题:没有lambdas和使用可重用代码。在</p>
<pre><code>#!/usr/bin/env pyspark
# -*- coding: utf-8 -*-
data = [
{'timestamp': '20080411204445', 'address': '100 Sunder Ct', 'name': 'Joe Schmoe'},
{'timestamp': '20040218165319', 'address': '100 Lee Ave', 'name': 'Joe Schmoe'},
{'timestamp': '20120309173318', 'address': '1818 Westminster', 'name': 'John Doe'},
]
def combine(field):
'''Returns a function which reduces on a specific field
Args:
field(str): data field to use for merging
Returns:
func: returns a function which supplies the data for the field
'''
def _reduce_this(data):
'''Returns the field value using data'''
return data[field]
return _reduce_this
def aggregate(*fields):
'''Merges data based on a list of fields
Args:
fields(list): a list of fields that should be used as a composite key
Returns:
func: a function which does the aggregation
'''
def _merge_this(iterable):
name, iterable = iterable
new_map = dict(name=name, window=dict(max=None, min=None))
for data in iterable:
for field, value in data.iteritems():
if field in fields:
new_map[field] = value
else:
new_map.setdefault(field, set()).add(value)
return new_map
return _merge_this
# sc provided by pyspark context
combined = sc.parallelize(data).groupBy(combine('name'))
reduced = combined.map(aggregate('name'))
output = reduced.collect()
</code></pre>