Tensorflow Multoprocessing;未知错误:无法启动gRPC s

2024-10-01 13:28:39 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在研究在大数据集上计算hessian矩阵。我试图在多个cpu上并行执行这些计算。我的设置目前有1个节点和10个CPU。我正在使用Python2.7

为了更好地理解分布式tensorflow,我编写了一个代码的小抽象。以下是错误

2017-07-23 16:16:17.281414: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:316] Started server with target: grpc://localhost:2225
Process Process-3:
Traceback (most recent call last):
  File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 32, in cifar10
    serv = tf.train.Server(cluster, job_name= params.job_name,task_index=params.task_index)
  File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/training/server_lib.py", line 145, in __init__
    self._server_def.SerializeToString(), status)
  File "/home/skay/anaconda2/lib/python2.7/contextlib.py", line 24, in __exit__
    self.gen.next()
  File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py", line 466, in raise_exception_on_not_ok_status
    pywrap_tensorflow.TF_GetCode(status)) UnknownError: Could not start gRPC server

每次运行代码时都会收到此错误。然而,它继续为我设置的两个过程中的一个生成输出,如下所示

^{pr2}$

在此基础上,它继续等待下一个

ERROR:tensorflow:==================================
Object was never used (type <class 'tensorflow.python.framework.ops.Operation'>):
<tf.Operation 'worker_0/init' type=NoOp>
If you want to mark it as used call its "mark_used()" method.
It was originally created here:
['File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 83, in <module>\n    proc.start()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 130, in start\n    self._popen = Popen(self)', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/forking.py", line 126, in __init__\n    code = process_obj._bootstrap()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap\n    self.run()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in run\n    self._target(*self._args, **self._kwargs)', 'File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 49, in cifar10\n    init_op=tf.initialize_all_variables(),logdir=\'/tmp/mydir\')', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 170, in wrapped\n    return _add_should_use_warning(fn(*args, **kwargs))', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 139, in _add_should_use_warning\n    wrapped = TFShouldUseWarningWrapper(x)', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 96, in __init__\n    stack = [s.strip() for s in traceback.format_stack()]']
==================================
2017-07-23 16:28:28.646871: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
2017-07-23 16:28:38.647276: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
2017-07-23 16:28:48.647526: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica: 

我有两个问题

  1. 如何修复有关Grpc的此错误
  2. 我使用Manager()设置了一个多处理队列'result',并在设置进程时将其传递给两个工人。我希望一旦达到条件,每个进程都会将它们的作业ID写入队列,但是看起来队列总是包含最后完成的进程。这是否意味着队列正在被另一个进程覆盖

[{'worker': 0}, {'worker': 0}]

我可以使用多处理队列在tensorflow上的两个不同进程上运行的两个会话之间共享字典吗?在

下面是我的代码

# build a python mutliprocess.py
import multiprocessing
import time
import tensorflow as tf
from tensorflow.contrib.training import HParams
import os
import psutil
import numpy as np
from tensorflow.python.client import device_lib
from resnet import *
import Queue

cluster_spec ={"ps": ["localhost:2226"
                      ],
    "worker": [
        "localhost:2227",
        "localhost:2228"]}

cluster = tf.train.ClusterSpec(cluster_spec)
im_Test = np.linspace(1,10,10)

def model_fun(input):
    print multiprocessing.current_process().name
    return input

def cifar10(device,return_dict,result_t):
    params = HParams(cluster=cluster,
                     job_name = device[0],
                     task_index = device[1])

    serv = tf.train.Server(cluster, job_name= params.job_name,task_index=params.task_index)
    input_img=[]
    true_lab=[]

    if params.job_name == "ps":
        ##try and wait for all the wokers t
        serv.join()
    elif params.job_name == "worker":
        with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/replica:0/task:%d" % params.task_index,
                                                      cluster=cluster)):
            # with tf.Graph().as_default(), tf.device('/cpu:%d' % params.task_index):
            # with tf.container('%s %d' % ('batchname', params.task_index)) as scope:
            input_img = tf.placeholder(dtype=tf.float32, shape=[10,])
            with tf.name_scope('%s_%d' % (params.job_name, params.task_index)) as scope:
                hess_op = model_fun(input_img)
                global_step = tf.contrib.framework.get_or_create_global_step()
                sv = tf.train.Supervisor(is_chief=(params.task_index == 0),
                                         global_step=global_step,
                                         init_op=tf.initialize_all_variables(),logdir='/tmp/mydir')
                with sv.prepare_or_wait_for_session(serv.target) as sess:
                    step = 0
                    while not sv.should_stop() :
                        hess = sess.run(hess_op, feed_dict={input_img:im_Test })
                        print(np.array(hess))
                        print multiprocessing.current_process().name
                        step += 1
                        if(step==3):
                            return_dict[params.job_name] = params.task_index
                            result_t.put(return_dict)
                            break
                    sv.stop()
                    sess.close()


    return

if __name__ == '__main__':

    logger = multiprocessing.log_to_stderr()
    manager = multiprocessing.Manager()
    result = manager.Queue()
    return_dict = manager.dict()
    processes = []
    devices = [['ps', 0],
               ['worker', 0],
               ['worker', 1]
               ]

    for i in (devices):
        start_time = time.time()
        proc = multiprocessing.Process(target=cifar10,args=(i,return_dict,result))
        processes.append(proc)
        proc.start()

    for p in processes:
        p.join()

    # print return_dict.values()
    kill = []
    while True:
        if result.empty() == True:
                break
        kill.append(result.get())
        print kill


    print("time taken = %d" % (start_time - time.time()))

Tags: nameinpyhometasklibtftensorflow
1条回答
网友
1楼 · 发布于 2024-10-01 13:28:39

在我的例子中,当我提交一个tensorflowonspark作业纱线集群模式时,我发现ps引发了这个错误,woker等待响应。在

ps错误如下

2018-01-17 11:08:46,366 INFO (MainThread-7305) Starting TensorFlow ps:0 on cluster node 0 on background process 2018-01-17 11:08:56,085 INFO (MainThread-7395) 0: ======== ps:0 ======== 2018-01-17 11:08:56,086 INFO (MainThread-7395) 0: Cluster spec: {'ps': ['172.16.5.30:33088'], 'worker': ['172.16.5.22:41428', '172.16.5.30:33595']} 2018-01-17 11:08:56,086 INFO (MainThread-7395) 0: Using CPU 2018-01-17 11:08:56.087452: I tensorflow/core/platform/cpu_feature_guard.cc:137] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.1 SSE4.2 AVX AVX2 FMA E0117 11:08:56.088501182 7395 ev_epoll1_linux.c:1051] grpc epoll fd: 10 E0117 11:08:56.088860707 7395 server_chttp2.c:38] {"created":"@1516158536.088783549","description":"No address added out of total 1 resolved","file":"external/grpc/src/core/ext/transport/chttp2/server/chttp2_server.c","file_line":245,"referenced_errors":[{"created":"@1516158536.088779164","description":"Failed to add any wildcard listeners","file":"external/grpc/src/core/lib/iomgr/tcp_server_posix.c","file_line":338,"referenced_errors":[{"created":"@1516158536.088771177","description":"Unable to configure socket","fd":12,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":200,"referenced_errors":[{"created":"@1516158536.088767669","description":"OS Error","errno":98,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":173,"os_error":"Address already in use","syscall":"bind"}]},{"created":"@1516158536.088778651","description":"Unable to configure socket","fd":12,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":200,"referenced_errors":[{"created":"@1516158536.088776541","description":"OS Error","errno":98,"file":"external/grpc/src/core/lib/iomgr/tcp_server_utils_posix_common.c","file_line":173,"os_error":"Address already in use","syscall":"bind"}]}]}]} Process Process-2: Traceback (most recent call last): File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000001/tfspark.zip/tensorflowonspark/TFSparkNode.py", line 269, in wrapper_fn File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/pyfiles/mnist_dist.py", line 38, in map_fun cluster, server = ctx.start_cluster_server(1, args.rdma) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/tfspark.zip/tensorflowonspark/TFSparkNode.py", line 56, in start_cluster_server return TFNode.start_cluster_server(self, num_gpus, rdma) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/tfspark.zip/tensorflowonspark/TFNode.py", line 110, in start_cluster_server server = tf.train.Server(cluster, ctx.job_name, ctx.task_index) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/site-packages/tensorflow/python/training/server_lib.py", line 145, in init self._server_def.SerializeToString(), status) File "/data/yarn/nm/usercache/hdfs/appcache/application_1515984940590_0270/container_e13_1515984940590_0270_01_000002/Python/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py", line 473, in exit c_api.TF_GetCode(self.status.status)) UnknownError: Could not start gRPC server

woker:1 log

2018-01-17 11:09:14.614244: I tensorflow/core/distributed_runtime/master.cc:221] CreateSession still waiting for response from worker: /job:ps/replica:0/task:0

然后检查ps服务器中的端口。是的,这个端口被使用了。在

所以重新提交作业解决问题。在

但是,如果每次运行代码时都会收到此错误,则应该检查更多日志以查找原因。在

相关问题 更多 >