下面的代码似乎不是同时运行的,我也不清楚确切的原因:
def run_normalizers(config, debug, num_threads, name=None):
def _run():
print('Started process for normalizer')
sqla_engine = init_sqla_from_config(config)
image_vfs = create_s3vfs_from_config(config, config.AWS_S3_IMAGE_BUCKET)
storage_vfs = create_s3vfs_from_config(config, config.AWS_S3_STORAGE_BUCKET)
pp = PipedPiper(config, image_vfs, storage_vfs, debug=debug)
if name:
pp.run_pipeline_normalizers(name)
else:
pp.run_all_normalizers()
print('Normalizer process complete')
threads = []
for i in range(num_threads):
threads.append(multiprocessing.Process(target=_run))
[t.start() for t in threads]
[t.join() for t in threads]
run_normalizers(...)
config
变量只是在_run()
函数之外定义的字典。所有的过程似乎都是创建的,但它并不比我用单个进程来创建它快。基本上,run_**_normalizers()
函数中发生的事情是从数据库(SQLAlchemy)的队列表读取数据,然后发出一些HTTP请求,然后运行规范化器的“管道”来修改数据,然后将其保存回数据库中。我来自JVM领域,那里的线程是“重”的,并且经常用于并行——我对此有点困惑,因为我认为多进程模块应该绕过Python GIL的限制。在
如果您仍在寻找一个多处理解决方案,您可能首先想看看如何使用一个worker池,然后您就不必自己管理num_threads进程:http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers
对于减速问题,您是否尝试过将config对象作为参数传递给\u run函数?我不知道这是否/如何在内部产生改变,但我猜测它可能会改变一些东西。在
修复了我的多处理问题-并实际切换了线程。我不知道到底是什么修复了它的想法-我只是重新设计了所有的东西,让工人和任务,什么不做,事情现在都在飞。以下是我所做的基本工作:
我所要做的就是创建一个TaskRunner并向其添加任务(数千个),然后调用wait_for_tasks()。所以,很明显,在我所做的重新架构中,我“修复”了我遇到的其他一些问题。不过很奇怪。在
相关问题 更多 >
编程相关推荐