正常写入apachekafka,但无法读取spark作业中的主题数据

2024-09-24 22:17:26 发布

您现在位置:Python中文网/ 问答频道 /正文

HDP 2.6.5无路缘石

我正在运行卡夫卡和星火集群

我正在用kafka向特定主题写入数据,并尝试运行python代码来读取和显示来自kafka的数据

但是,读取会冻结,不会抛出错误

启动pyspark:

pyspark --master yarn --num-executors 1 --executor-cores 4 --executor-memory 16G --driver-cores 4 --driver-memory 8G --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1

在pyspark shell中:

from pyspark.sql import SparkSession, SQLContext, HiveContext
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
sqlcontext = SQLContext(spark.sparkContext)
hivecontext = HiveContext(spark.sparkContext)
hivecontext.setConf("hive.exec.dynamic.partition", "true")
hivecontext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

ds = spark.read.format("kafka").option("kafka.bootstrap.servers", "server-1:6667,server-2:6667").option("subscribe", "testtopic").option("startingOffsets", "earliest").option("endingOffsets", "latest").load()
ds.show()

当我在服务器上读取数据时:

./kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list server-1:6667,server-2:6667 --topic testtopic --partition 0

数据在主题中

我使用以下工具检查了spark所在服务器的服务器和端口的可用性:

nc -zv server-1 2181
nc -zv server-1 6667

没关系

从一台服务器写入主题,从另一台服务器读取。所有服务器都在一个集群中

UPD。 通过科学方法我发现: 在Kafka服务器上使用命令

kafka-console-consumer.sh --zookeeper server-1:2181 --topic testtopic --from-beginning

提供数据

使用命令

kafka-console-consumer.sh --bootstrap-server server-1:6667 --topic testtopic --from-beginning --partition 0

提供数据

但当我在另一台服务器上运行消费者时,它不会出现在kafka消费者列表中


Tags: kafka数据from服务器主题sqlserversh
1条回答
网友
1楼 · 发布于 2024-09-24 22:17:26

一旦定义了最终结果DataFrame/Dataset,剩下的就是开始流计算。为此,必须使用通过Dataset.writeStream()返回的DataStreamWriter(Scala/Java/Python文档)。您必须在此接口中指定以下一项或多项。试试看:

ds.start()

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#starting-streaming-queries

相关问题 更多 >