使用卡夫卡主题中的数据。使用Spark流读取数据,作为预处理步骤的一部分,我希望基于一些组合列数据分离或拆分数据。下面的示例是流中的“我的数据”格式
ApplicationName,results,server,datetime,value
Cashbuilder,89,serv1008,3/16/2021 0:01,90
Testbuilder,89,serv1009,3/16/2021 0:01,60
Cashbuilder,89,serv1006,3/16/2021 0:02,90
Testbuilder,89,serv1009,3/16/2021 0:03,60
Cashbuilder,89,serv1008,3/16/2021 0:04,90
Testbuilder,89,serv1008,3/16/2021 0:41,60
Cashbuilder,89,serv1006,3/16/2021 0:21,50
Testbuilder,89,serv1009,3/16/2021 0:11,60
我正在使用Pyspark连接并读取卡夫卡主题中的数据
spark = SparkSession.builder.appName("PySpark App").getOrCreate()
df=spark.readStream.format("kafka").option("kafka.bootstrap.servers",servers).option("kafka.group.id", "server1") \
.option("subscribe", kafka_topic_name) \
.option("startingOffsets", "latest") \
.load()
作为数据预处理和预测目的的一部分,我希望结合ApplicationName和server来分离数据。之后,对于每个组合,需要执行其他一些预处理步骤。请提出任何可能的方法来执行预处理步骤
目前没有回答
相关问题 更多 >
编程相关推荐