张量流Dataset.from_生成器阻止输入?

2024-10-06 12:15:31 发布

您现在位置:Python中文网/ 问答频道 /正文

我想构建一个项目,在任何时候请求都将被放入python队列,一组tensorflow模型使用来自队列的请求,并立即返回结果。在

模型是在不同的线程,不同的传递函数图,但结构和权重值是相同的。在

每种型号的使用tf.data.Dataset.from_generator封装从队列获取请求的python迭代器。在

问题是,当有多个模型时,请求可能会被阻塞,直到将来的请求到来。从测试结果来看,python迭代器似乎确实在将请求放入队列时收到了请求,但是没有来自模型的结果。此外,似乎没有丢弃任何请求,但可能被tf Dataset迭代器阻塞了。在

这是我的测试代码:

# -*- coding: utf-8 -*-

import tensorflow as tf
import numpy as np
import sys
import random
import time

from queue import Queue
from concurrent.futures import ThreadPoolExecutor

thread_count=int(sys.argv[1])
request_queue=Queue(128)

def data_iter():
    while True:
        yield request_queue.get()

def task():
    with tf.Graph().as_default():
        ds=tf.data.Dataset.from_generator(data_iter, (tf.int32), output_shapes=([1, 8]))
        sample=ds.make_one_shot_iterator().get_next()
        with tf.Session() as sess:
            coord=tf.train.Coordinator()
            threads=tf.train.start_queue_runners(sess=sess, coord=coord)
            while not coord.should_stop():
                try:
                    result=sess.run(sample)
                    print(result)
                except:
                    coord.request_stop()
            coord.join(threads)

executor=ThreadPoolExecutor(thread_count)
try:
    for i in range(thread_count):
        executor.submit(task)

    rand=random.Random()
    for i in range(100):
        request_queue.put(np.full((1, 8), i, 'int32'))
        time.sleep(1e-3)#to let the model get request from the request_queue
        t=rand.randint(5,10)
        print('round {}, request_queue size is about {}, sleeping {} secs...'.format(i, request_queue.qsize(), t))
        time.sleep(t)
finally:
    for i in range(thread_count):
        request_queue.put(None)
    executor.shutdown()

环境:python3.5.3,tensorflow 1.4.0

检测结果:

  1. 使用一个模型运行:python tf_ds_test.py 1

结果如下:

^{pr2}$

一切顺利。在

  1. 但是在运行32个模型时:python tf_ds_test.py 32

结果如下:

2017-12-21 10:45:41.660251: I C:\tf_jenkins\home\workspace\rel-win\M\windows\PY\35\tensorflow\core\platform\cpu_feature_guard.cc:137] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX AVX2
round 0, request_queue size is about 1, sleeping 9 secs...
[[0 0 0 0 0 0 0 0]]
[[1 1 1 1 1 1 1 1]]
round 1, request_queue size is about 0, sleeping 5 secs...
round 2, request_queue size is about 0, sleeping 8 secs...
round 3, request_queue size is about 0, sleeping 10 secs...
[[4 4 4 4 4 4 4 4]]
[[2 2 2 2 2 2 2 2]]
[[3 3 3 3 3 3 3 3]]
round 4, request_queue size is about 0, sleeping 8 secs...
round 5, request_queue size is about 0, sleeping 6 secs...
round 6, request_queue size is about 0, sleeping 10 secs...
[[6 6 6 6 6 6 6 6]]
[[5 5 5 5 5 5 5 5]]
round 7, request_queue size is about 0, sleeping 9 secs...
[[7 7 7 7 7 7 7 7]]
round 8, request_queue size is about 0, sleeping 5 secs...
round 9, request_queue size is about 0, sleeping 10 secs...
round 10, request_queue size is about 0, sleeping 6 secs...
round 11, request_queue size is about 0, sleeping 10 secs...
[[8 8 8 8 8 8 8 8]]
round 12, request_queue size is about 0, sleeping 8 secs...

请求被阻止!python迭代器立即消费了请求,但是模型在任意时间段之前没有结果,可能直到模型得到下一个请求。在

有人知道吗?如何让这些模型立即返回结果?在


Tags: from模型importsize队列queueisrequest
1条回答
网友
1楼 · 发布于 2024-10-06 12:15:31

您是否可以将生成元素到队列中的循环修改为:

for i in range(100):
    request_queue.put(np.full((1, 8), i, 'int32'))
    print('round {}, queue size {}'.format(i, request_queue.qsize()))

分享产出?在

我试图重现您的问题(使用TF的夜间构建),但是即使有1000个任务和10000次循环迭代,事情还是运行得很顺利。在

你能在TF的夜间建造中试试这个吗?在

相关问题 更多 >