Pypark流式处理

2024-09-28 01:29:10 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在MapR环境中使用spark2.1.0版本和kafka0.9,我试图从kafka主题读入spark流。但是,当我运行kafkatils createDirectStream命令时,我遇到了如下错误。在

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.streaming.kafka09.KafkaUtilsPythonHelper.createDirectStream.
Trace:
    py4j.Py4JException: Method createDirectStream([class org.apache.spark.streaming.api.java.JavaStreamingContext, class
java.util.ArrayList, class java.util.HashMap]) does not exist

我运行的代码

from __future__ import print_function
import sys
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.streaming.kafka09 import KafkaUtils;

sqlContext = SQLContext(sc)
ssc = StreamingContext(sc, 3)
strLoc   = '/home/mapr/stream:info'
kafkaparams = {"zookeeper.connect" : "x.x.x.x:5181","metadata.broker.list" : "x.x.x.x:9092"}

strarg = KafkaUtils.createDirectStream(ssc,[strLoc],kafkaparams) <- Error when i run this command on pyspark shell

Tags: fromorgimportapacheutiljavaclassspark
1条回答
网友
1楼 · 发布于 2024-09-28 01:29:10

我在努力完善你的代码。请尝试用下面的代码执行。在

from pyspark.sql import SQLContext, SparkSession
from pyspark.streaming import StreamingContext
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
from pyspark.streaming.kafka import KafkaUtils
import json

var_schema_url = 'http://localhost:8081'
var_kafka_parms_src = {"metadata.broker.list": 'localhost:9092'}

schema_registry_client = CachedSchemaRegistryClient(var_schema_url)
serializer = MessageSerializer(schema_registry_client)

spark = SparkSession.builder \
  .appName('Advertiser_stream') \
  .master('local[*]') \
  .getOrCreate()


def handler(message):
    records = message.collect()
    for record in records:
        <<You can process that data >>


sc = spark.sparkContext
ssc = StreamingContext(sc, 5)

kvs = KafkaUtils.createDirectStream(ssc, ['Topic-name'], var_kafka_parms_src,valueDecoder=serializer.decode_message)
kvs.foreachRDD(handler)

ssc.start()
ssc.awaitTermination()

相关问题 更多 >

    热门问题