擅长:python、mysql、java
<p>我也遇到了类似的问题。
你是对的,使用directStream意味着直接使用kafka底层API,它没有更新reader offset。
这里有几个scala/java的例子,但python没有。
但是你自己做很容易,你需要做的是:</p>
<ul>
<li>从开头的偏移量读取</li>
<li>保存末端的偏移</li>
</ul>
<p>例如,我通过执行以下操作来保存redis中每个分区的偏移量:</p>
<pre><code>stream.foreachRDD(lambda rdd: save_offset(rdd))
def save_offset(rdd):
ranges = rdd.offsetRanges()
for rng in ranges:
rng.untilOffset # save offset somewhere
</code></pre>
<p>然后在开始时,您可以使用:</p>
^{pr2}$
<p>对于一些使用zk跟踪偏移的工具,最好在zookeeper中保存偏移量。
本页:
<a href="https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html" rel="nofollow noreferrer">https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html</a>
描述如何设置偏移量,zk节点基本上是:
/使用者/[使用者名称]/offset/[主题名称]/[分区id]
因为我们使用的是directStream,所以你必须编一个消费者名称。在</p>