异步和并联发电机

2024-10-04 01:22:13 发布

您现在位置:Python中文网/ 问答频道 /正文

我有python脚本,它懒散地收集数据,创建训练样本,并将其传递给我的ML模型进行学习。目前,我正在使用标准python生成器生成数据,据我所知,这是同步的。我正在寻找一个聪明的干净的方法,使我的生成器真正异步,所以当我使用它作为迭代器时,下一个数据样本的处理将在我取出最后一个样本后立即开始。考虑以下示例:

def asyncgen():
    for i in range(5):
        print("I want this part to work asynchronously :(")
        i = 0;
        while(i<1e8):
            i+=1
        yield "Hi"

a = asyncgen()
for w in a:
    print(w)
    i = 0
    while (i < 1e8):
        i += 1

如何使生成器在收到“Hi”后立即开始处理内容(并且异步地,在不同的进程下)?当前处理仅在for循环调用next()之后开始。你知道吗

我一直在研究Asynchronous generatorspep525,但它们似乎只是并行工作,而不是并行工作(该死的GIL!)。用Python做这件事有什么好方法,最好是本地方法。你知道吗


Tags: 数据方法in模型脚本示例for标准
2条回答

插座也是解决这个问题的好方法。
基本上,与其让一个程序带有线程或多个进程,不如让一个外部程序执行您从外部连接到的异步程序。你知道吗

PMOTW了解有关套接字的更多信息

下面是一个非常完整的示例(Python2.7):

import argparse
import logging
import socket
import time
import random

description = """
This program sends data across a socket
"""
arg_parser = argparse.ArgumentParser(description=description)
arg_parser.add_argument(' AgentIP', '-i', action='store'    
    , default='localhost' , type=str
    , help="The IP address of this Server that client will connect to."
    )
arg_parser.add_argument(' AgentPort', '-p', action='store'     
    , default='13000' , type=int
    , help="The port number of the socket that client will connect to."
    )
arg_parser.add_argument(' log', action='store'     
    , default='INFO' , type=str
    , help="Log level"
    , choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
    )
args = arg_parser.parse_args()

# use Logger for great justice and heavenly output
logging.basicConfig(level=getattr(logging, args.log, 'CRITICAL'))
log = logging.getLogger(name='SERVER')

### vvv YOUR CODE vvv ###
def asyncgen(log):
    for i in range(5):
        log.debug("I want this part to work asynchronously :(") 
        time.sleep(random.random())
        yield "Hi"
### ^^^ YOUR CODE ^^^ ###

def make_a_connection(server, log):
    # Accept outside connections
    (client_socket, address) = server.accept()
    log.info("Got a connection : {}:{}".format(client_socket,address))

    for value in asyncgen(log):
        client_socket.send(value)
        log.info("SEND:{}".format(value))

    client_socket.close()


def main(args, log):
    server   = socket.socket( socket.AF_INET , socket.SOCK_STREAM )
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    hostname = socket.gethostbyname(args.AgentIP)
    port     = args.AgentPort
    address  = (hostname, port)

    server.bind(address)
    server.listen(1)
    log.info("Server started on {}:{}".format(hostname, port))

    try:
        while True:
          make_a_connection(server, log)

    except (KeyboardInterrupt, SystemExit):
       server.close()
       log.info("Server connections closed")


if __name__=='__main__':
    main(args, log)

绕过GIL的唯一方法是使用multiprocessing。你知道吗

from multiprocessing import Process

def asynch_part(i):
    print("I want this part to work asynchronously :(")
    k = 0;
    while(k<1e8):
        k+=1
    yield "Hi" # +" from " + str(i)

if __name__ == '__main__':
    p=[]
    for i in range(5): # I am keeping the processes listed and trackable,  
                       # perhaps you do not care. os.getpid() tracks them anyway
        p[i] = Process(target=asynch_part, args=(i))
        p[i].start()

    for i in range(5):
        p[i].join()

因此,在上面的代码中,asyncgen作为并行进程独立运行了5次。然后在节目结束前加入。保持一个列表p只是一个说明。你知道吗

相关问题 更多 >