<p>据我所知,<code>sliding</code>函数在Python中不可用,<code>SlidingRDD</code>是一个私有类,不能在<code>MLlib</code>外部访问。</p>
<p>如果要在现有RDD上使用<code>sliding</code>,可以创建穷人的<code>sliding</code>,如下所示:</p>
<pre class="lang-py prettyprint-override"><code>def sliding(rdd, n):
assert n > 0
def gen_window(xi, n):
x, i = xi
return [(i - offset, (i, x)) for offset in xrange(n)]
return (
rdd.
zipWithIndex(). # Add index
flatMap(lambda xi: gen_window(xi, n)). # Generate pairs with offset
groupByKey(). # Group to create windows
# Sort values to ensure order inside window and drop indices
mapValues(lambda vals: [x for (i, x) in sorted(vals)]).
sortByKey(). # Sort to makes sure we keep original order
values(). # Get values
filter(lambda x: len(x) == n)) # Drop beginning and end
</code></pre>
<p>或者您可以尝试这样的方法(在<a href="https://github.com/pytoolz/toolz" rel="noreferrer">^{<cd6>}</a>的帮助下)</p>
<pre><code>from toolz.itertoolz import sliding_window, concat
def sliding2(rdd, n):
assert n > 1
def get_last_el(i, iter):
"""Return last n - 1 elements from the partition"""
return [(i, [x for x in iter][(-n + 1):])]
def slide(i, iter):
"""Prepend previous items and return sliding window"""
return sliding_window(n, concat([last_items.value[i - 1], iter]))
def clean_last_items(last_items):
"""Adjust for empty or to small partitions"""
clean = {-1: [None] * (n - 1)}
for i in range(rdd.getNumPartitions()):
clean[i] = (clean[i - 1] + list(last_items[i]))[(-n + 1):]
return {k: tuple(v) for k, v in clean.items()}
last_items = sc.broadcast(clean_last_items(
rdd.mapPartitionsWithIndex(get_last_el).collectAsMap()))
return rdd.mapPartitionsWithIndex(slide)
</code></pre>