如何对Ml模型进行数据分割和预处理?

2024-09-24 06:31:23 发布

您现在位置:Python中文网/ 问答频道 /正文

使用卡夫卡主题中的数据。使用Spark流读取数据,作为预处理步骤的一部分,我希望基于一些组合列数据分离或拆分数据。下面的示例是流中的“我的数据”格式

ApplicationName,results,server,datetime,value
   Cashbuilder,89,serv1008,3/16/2021 0:01,90
   Testbuilder,89,serv1009,3/16/2021 0:01,60
   Cashbuilder,89,serv1006,3/16/2021 0:02,90
   Testbuilder,89,serv1009,3/16/2021 0:03,60
   Cashbuilder,89,serv1008,3/16/2021 0:04,90
   Testbuilder,89,serv1008,3/16/2021 0:41,60
   Cashbuilder,89,serv1006,3/16/2021 0:21,50
   Testbuilder,89,serv1009,3/16/2021 0:11,60

我正在使用Pyspark连接并读取卡夫卡主题中的数据

spark = SparkSession.builder.appName("PySpark App").getOrCreate()
df=spark.readStream.format("kafka").option("kafka.bootstrap.servers",servers).option("kafka.group.id", "server1") \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest") \
        .load()

作为数据预处理和预测目的的一部分,我希望结合ApplicationName和server来分离数据。之后,对于每个组合,需要执行其他一些预处理步骤。请提出任何可能的方法来执行预处理步骤


Tags: kafka数据主题server步骤sparkoptionservers