<p>我编写了一些函数来保存和读取python<a href="https://github.com/python-zk/kazoo" rel="nofollow noreferrer">kazoo</a>库中的Kafka偏移量。在</p>
<p>获取Kazoo客户端singleton的第一个函数:</p>
<pre><code>ZOOKEEPER_SERVERS = "127.0.0.1:2181"
def get_zookeeper_instance():
from kazoo.client import KazooClient
if 'KazooSingletonInstance' not in globals():
globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS)
globals()['KazooSingletonInstance'].start()
return globals()['KazooSingletonInstance']
</code></pre>
<p>然后函数读取和写入偏移量:</p>
^{pr2}$
<p>然后在开始流式传输之前,可以从zookeeper读取偏移量并将其传递给<a href="http://spark.apache.org/docs/2.1.0/api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils.createDirectStream" rel="nofollow noreferrer">createDirectStream</a>
对于<code>fromOffsets</code>参数:</p>
<pre><code>from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
def main(brokers="127.0.0.1:9092", topics=['test1', 'test2']):
sc = SparkContext(appName="PythonStreamingSaveOffsets")
ssc = StreamingContext(sc, 2)
zk = get_zookeeper_instance()
from_offsets = read_offsets(zk, topics)
directKafkaStream = KafkaUtils.createDirectStream(
ssc, topics, {"metadata.broker.list": brokers},
fromOffsets=from_offsets)
directKafkaStream.foreachRDD(save_offsets)
if __name__ == "__main__":
main()
</code></pre>