我想构建一个项目,在任何时候请求都将被放入python队列,一组tensorflow模型使用来自队列的请求,并立即返回结果。在
模型是在不同的线程,不同的传递函数图,但结构和权重值是相同的。在
每种型号的使用tf.data.Dataset.from_generator封装从队列获取请求的python迭代器。在
问题是,当有多个模型时,请求可能会被阻塞,直到将来的请求到来。从测试结果来看,python迭代器似乎确实在将请求放入队列时收到了请求,但是没有来自模型的结果。此外,似乎没有丢弃任何请求,但可能被tf Dataset迭代器阻塞了。在
这是我的测试代码:
# -*- coding: utf-8 -*-
import tensorflow as tf
import numpy as np
import sys
import random
import time
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
thread_count=int(sys.argv[1])
request_queue=Queue(128)
def data_iter():
while True:
yield request_queue.get()
def task():
with tf.Graph().as_default():
ds=tf.data.Dataset.from_generator(data_iter, (tf.int32), output_shapes=([1, 8]))
sample=ds.make_one_shot_iterator().get_next()
with tf.Session() as sess:
coord=tf.train.Coordinator()
threads=tf.train.start_queue_runners(sess=sess, coord=coord)
while not coord.should_stop():
try:
result=sess.run(sample)
print(result)
except:
coord.request_stop()
coord.join(threads)
executor=ThreadPoolExecutor(thread_count)
try:
for i in range(thread_count):
executor.submit(task)
rand=random.Random()
for i in range(100):
request_queue.put(np.full((1, 8), i, 'int32'))
time.sleep(1e-3)#to let the model get request from the request_queue
t=rand.randint(5,10)
print('round {}, request_queue size is about {}, sleeping {} secs...'.format(i, request_queue.qsize(), t))
time.sleep(t)
finally:
for i in range(thread_count):
request_queue.put(None)
executor.shutdown()
环境:python3.5.3,tensorflow 1.4.0
检测结果:
python tf_ds_test.py 1
结果如下:
^{pr2}$一切顺利。在
python tf_ds_test.py 32
结果如下:
2017-12-21 10:45:41.660251: I C:\tf_jenkins\home\workspace\rel-win\M\windows\PY\35\tensorflow\core\platform\cpu_feature_guard.cc:137] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX AVX2
round 0, request_queue size is about 1, sleeping 9 secs...
[[0 0 0 0 0 0 0 0]]
[[1 1 1 1 1 1 1 1]]
round 1, request_queue size is about 0, sleeping 5 secs...
round 2, request_queue size is about 0, sleeping 8 secs...
round 3, request_queue size is about 0, sleeping 10 secs...
[[4 4 4 4 4 4 4 4]]
[[2 2 2 2 2 2 2 2]]
[[3 3 3 3 3 3 3 3]]
round 4, request_queue size is about 0, sleeping 8 secs...
round 5, request_queue size is about 0, sleeping 6 secs...
round 6, request_queue size is about 0, sleeping 10 secs...
[[6 6 6 6 6 6 6 6]]
[[5 5 5 5 5 5 5 5]]
round 7, request_queue size is about 0, sleeping 9 secs...
[[7 7 7 7 7 7 7 7]]
round 8, request_queue size is about 0, sleeping 5 secs...
round 9, request_queue size is about 0, sleeping 10 secs...
round 10, request_queue size is about 0, sleeping 6 secs...
round 11, request_queue size is about 0, sleeping 10 secs...
[[8 8 8 8 8 8 8 8]]
round 12, request_queue size is about 0, sleeping 8 secs...
请求被阻止!python迭代器立即消费了请求,但是模型在任意时间段之前没有结果,可能直到模型得到下一个请求。在
有人知道吗?如何让这些模型立即返回结果?在
您是否可以将生成元素到队列中的循环修改为:
分享产出?在
我试图重现您的问题(使用TF的夜间构建),但是即使有1000个任务和10000次循环迭代,事情还是运行得很顺利。在
你能在TF的夜间建造中试试这个吗?在
相关问题 更多 >
编程相关推荐