从Pulsar读取数据并使用asyncio加载到Postgres

2024-04-27 23:46:20 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试用Python中的asyncio和asyncpg从pulsar读取数据并写入postgres表

import asyncio
import asyncpg
import requests
import nest_asyncio
nest_asyncio.apply()
        
class Connection:

    loop = asyncio.get_event_loop()

    @classmethod
    async def connect_persist_postgresql(cls, data):
        connection = await asyncpg.connect(
            user='postgres,
            password='password_postgres',
            database='postgres',
            host='10.xxx.xxx.xx',
            port=5432
        )

        query = '''INSERT INTO {}.{} (
                                        uid, 
                                        col1, 
                                        col2, 
                                        col3, 
                                        col4
                                        ) 
                   VALUES (
                           $1, 
                           $2, 
                           $3, 
                           $4, 
                           $5, 
                           )'''
        if isinstance(data, list):
            for event in data:
                await connection.execute(
                    query,
                    'schema',
                    'table_test',
                    event['uid'],
                    event['col1'],
                    event['col2'],
                    event['col3'],
                    event['col4'],
                )
        else:
            await connection.execute(
                query,
                'schema',
                'table_test',
                data[0]['uid'],
                data[0]['col1'],
                data[0]['col2'],
                data[0]['col3'],
                data[0]['col4']
            )

        await connection.close()

然后,我作为消费者连接到pulsar,获取数据并尝试写入postgresql表,下一行: def run_consumer():

client = pulsar.Client(
    'pulsar://10.xxx.xx.xx:6650')
consumer = client.subscribe('test-topic-pulsar-postgresql',
                            'test-subs')
while True:
    msg = consumer.receive()
    unpacked_list = []
    loop = asyncio.get_event_loop()
    try:
        my_new_string_value = msg.data()
        unpacked_list.append(my_new_string_value)
        loop.run_until_complete(connect_persist_postgresql(unpacked_list))
    except Exception as a:
        print(a)

但是我得到一个错误:This event loop is already running

欢迎任何提示,提前谢谢


Tags: testimportloopeventasynciodatapostgresqlconnect