使用KAFKA主题中的数据,从中提取字段,并使用python存储在MySQL中

2024-07-05 14:36:53 发布

您现在位置:Python中文网/ 问答频道 /正文

我想使用以下命令使用卡夫卡主题中的数据:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTestTopic --from-beginning

然后将输出以下内容(仅粘贴前2行输出,但将有多行…):

&time=1561768216000&gameCategory=PINPOINT&category=ONE&uniqueId=2518Z-0892A-0030O-16H70&transactionType=CRD&familyId=000-222-115-11119&realTs=1561768319000&sortId=1&msg=SET-UP+PRAYER+%26+intercession+begins+in+just+30+minutes.&remoteIpAddress=127.0.0.1&userAgent=HTTP&
&uniqueId=872541806296826880&time=1571988786000&gameCategory=NOTIFY&category=TWO&transactionType=CRD&familyId=401-222-115-89387&sortId=1&realTs=1571988989000&msg=This-is+a+reminder.&remoteIpAddress=127.0.0.1&userAgent=HTTPS&

我想从输出中使用以下内容:

  • 房地产

  • 家庭ID

  • 消息

  • 唯一ID

您可以看到每个元素都由一个符号(“&;”)分隔。它们不总是在同一个索引/位置,所以我不确定是否需要正则表达式?最终,当我在本地运行的MySQL上进行查询时,我会看到:

描述测试表;

+----------+--------------+------+-----+---------+-------+
| Field    | Type         | Null | Key | Default | Extra |
+----------+--------------+------+-----+---------+-------+
| realTs   | bigint(20)   | YES  |     | NULL    |       |
| familyId | varchar(255) | YES  |     | NULL    |       |
| msg      | text         | YES  |     | NULL    |       |
| uniqueId | varchar(255) | YES  |     | NULL    |       |
+----------+--------------+------+-----+---------+-------+
4 rows in set (0.00 sec)

从测试表中选择*;

+---------------+-------------------+-----------------------------------------------------------+-------------------------+
| realTs        | familyId          | msg                                                       | uniqueId                |
+---------------+-------------------+-----------------------------------------------------------+-------------------------+
| 1561768319000 | 000-222-115-11119 | SET-UP+PRAYER+%26+intercession+begins+in+just+30+minutes. | 2518Z-0892A-0030O-16H70 |
| 1571988989000 | 401-222-115-89387 | This-is+a+reminder.                                       | 872541806296826880      |
+---------------+-------------------+-----------------------------------------------------------+-------------------------+

到目前为止我有什么? 我有一个带python的mysql连接器,我可以连接到本地mysql等等,但是我正在努力解析并插入它。。。你知道吗


Tags: intimemsgnullyescrdupset
1条回答
网友
1楼 · 发布于 2024-07-05 14:36:53

使用Python,可以使用urllib.parse.parse_qs在Python字典中检索URL查询字符串组件,稍后可以迭代这些组件以将数据插入MySQL数据库。你知道吗

例如:

from urllib.parse import parse_qs
line = "&time=1561768216000&gameCategory=PINPOINT&category=ONE&uniqueId=2518Z-0892A-0030O-16H70&transactionType=CRD&familyId=000-222-115-11119&realTs=1561768319000&sortId=1&msg=SET-UP+PRAYER+%26+intercession+begins+in+just+30+minutes.&remoteIpAddress=127.0.0.1&userAgent=HTTP&uniqueId=872541806296826880&time=1571988786000&gameCategory=NOTIFY&category=TWO&transactionType=CRD&familyId=401-222-115-89387&sortId=1&realTs=1571988989000&msg=This-is+a+reminder.&remoteIpAddress=127.0.0.1&userAgent=HTTPS&"

o = parse_qs(line)

print(o)

结果:

{'time': ['1561768216000', '1571988786000'], 'gameCategory': ['PINPOINT', 'NOTIFY'], 'category': ['ONE', 'TWO'], 'uniqueId': ['2518Z-0892A-0030O-16H70', '872541806296826880'], 'transactionType': ['CRD', 'CRD'], 'familyId': ['000-222-115-11119', '401-222-115-89387'], 'realTs': ['1561768319000', '1571988989000'], 'sortId': ['1', '1'], 'msg': ['SET-UP PRAYER & intercession begins in just 30 minutes.', 'This-is a reminder.'], 'remoteIpAddress': ['127.0.0.1', '127.0.0.1'], 'userAgent': ['HTTP', 'HTTPS']}

相关问题 更多 >