Pypark Kafka直接流式更新Zookeeper/Kafka Offs

2024-09-24 02:20:45 发布

您现在位置:Python中文网/ 问答频道 /正文

目前我正在使用Kafka/Zookeeper和pySpark(1.6.0)。 我已经成功地创建了一个kafka消费者,它使用KafkaUtils.createDirectStream()。在

所有的流媒体都没有问题,但是我意识到,我的Kafka主题在我消费了一些消息之后并没有更新到当前偏移量。在

因为我们需要更新主题来进行监控,这有点奇怪。在

在Spark的文档中,我发现了以下评论:

   offsetRanges = []

     def storeOffsetRanges(rdd):
         global offsetRanges
         offsetRanges = rdd.offsetRanges()
         return rdd

     def printOffsetRanges(rdd):
         for o in offsetRanges:
             print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)

     directKafkaStream\
         .transform(storeOffsetRanges)\
         .foreachRDD(printOffsetRanges)

You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application.

以下是文档: http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers

我在Scala中找到了一个解决方案,但找不到python的等效解决方案。 下面是Scala示例:http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/

问题

但问题是,从那以后我怎么能更新动物园管理员呢?在


Tags: kafkatoin文档http主题defzookeeper
2条回答

我也遇到了类似的问题。 你是对的,使用directStream意味着直接使用kafka底层API,它没有更新reader offset。 这里有几个scala/java的例子,但python没有。 但是你自己做很容易,你需要做的是:

  • 从开头的偏移量读取
  • 保存末端的偏移

例如,我通过执行以下操作来保存redis中每个分区的偏移量:

stream.foreachRDD(lambda rdd: save_offset(rdd))
def save_offset(rdd):
  ranges = rdd.offsetRanges()
  for rng in ranges:
     rng.untilOffset # save offset somewhere

然后在开始时,您可以使用:

^{pr2}$

对于一些使用zk跟踪偏移的工具,最好在zookeeper中保存偏移量。 本页: https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html 描述如何设置偏移量,zk节点基本上是: /使用者/[使用者名称]/offset/[主题名称]/[分区id] 因为我们使用的是directStream,所以你必须编一个消费者名称。在

我编写了一些函数来保存和读取pythonkazoo库中的Kafka偏移量。在

获取Kazoo客户端singleton的第一个函数:

ZOOKEEPER_SERVERS = "127.0.0.1:2181"

def get_zookeeper_instance():
    from kazoo.client import KazooClient

    if 'KazooSingletonInstance' not in globals():
        globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS)
        globals()['KazooSingletonInstance'].start()
    return globals()['KazooSingletonInstance']

然后函数读取和写入偏移量:

^{pr2}$

然后在开始流式传输之前,可以从zookeeper读取偏移量并将其传递给createDirectStream 对于fromOffsets参数:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


def main(brokers="127.0.0.1:9092", topics=['test1', 'test2']):
    sc = SparkContext(appName="PythonStreamingSaveOffsets")
    ssc = StreamingContext(sc, 2)

    zk = get_zookeeper_instance()
    from_offsets = read_offsets(zk, topics)

    directKafkaStream = KafkaUtils.createDirectStream(
        ssc, topics, {"metadata.broker.list": brokers},
        fromOffsets=from_offsets)

    directKafkaStream.foreachRDD(save_offsets)


if __name__ == "__main__":
    main()

相关问题 更多 >