我正在研究在大数据集上计算hessian矩阵。我试图在多个cpu上并行执行这些计算。我的设置目前有1个节点和10个CPU。我正在使用Python2.7
为了更好地理解分布式tensorflow,我编写了一个代码的小抽象。以下是错误
2017-07-23 16:16:17.281414: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:316] Started server with target: grpc://localhost:2225
Process Process-3:
Traceback (most recent call last):
File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 32, in cifar10
serv = tf.train.Server(cluster, job_name= params.job_name,task_index=params.task_index)
File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/training/server_lib.py", line 145, in __init__
self._server_def.SerializeToString(), status)
File "/home/skay/anaconda2/lib/python2.7/contextlib.py", line 24, in __exit__
self.gen.next()
File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/framework/errors_impl.py", line 466, in raise_exception_on_not_ok_status
pywrap_tensorflow.TF_GetCode(status)) UnknownError: Could not start gRPC server
每次运行代码时都会收到此错误。然而,它继续为我设置的两个过程中的一个生成输出,如下所示
^{pr2}$在此基础上,它继续等待下一个
ERROR:tensorflow:==================================
Object was never used (type <class 'tensorflow.python.framework.ops.Operation'>):
<tf.Operation 'worker_0/init' type=NoOp>
If you want to mark it as used call its "mark_used()" method.
It was originally created here:
['File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 83, in <module>\n proc.start()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 130, in start\n self._popen = Popen(self)', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/forking.py", line 126, in __init__\n code = process_obj._bootstrap()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap\n self.run()', 'File "/home/skay/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in run\n self._target(*self._args, **self._kwargs)', 'File "/home/skay/.PyCharmCE2017.1/config/scratches/scratch_6.py", line 49, in cifar10\n init_op=tf.initialize_all_variables(),logdir=\'/tmp/mydir\')', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 170, in wrapped\n return _add_should_use_warning(fn(*args, **kwargs))', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 139, in _add_should_use_warning\n wrapped = TFShouldUseWarningWrapper(x)', 'File "/home/skay/anaconda2/lib/python2.7/site-packages/tensorflow/python/util/tf_should_use.py", line 96, in __init__\n stack = [s.strip() for s in traceback.format_stack()]']
==================================
2017-07-23 16:28:28.646871: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
2017-07-23 16:28:38.647276: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:0
2017-07-23 16:28:48.647526: I tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:
我有两个问题
[{'worker': 0}, {'worker': 0}]
我可以使用多处理队列在tensorflow上的两个不同进程上运行的两个会话之间共享字典吗?在
下面是我的代码
# build a python mutliprocess.py
import multiprocessing
import time
import tensorflow as tf
from tensorflow.contrib.training import HParams
import os
import psutil
import numpy as np
from tensorflow.python.client import device_lib
from resnet import *
import Queue
cluster_spec ={"ps": ["localhost:2226"
],
"worker": [
"localhost:2227",
"localhost:2228"]}
cluster = tf.train.ClusterSpec(cluster_spec)
im_Test = np.linspace(1,10,10)
def model_fun(input):
print multiprocessing.current_process().name
return input
def cifar10(device,return_dict,result_t):
params = HParams(cluster=cluster,
job_name = device[0],
task_index = device[1])
serv = tf.train.Server(cluster, job_name= params.job_name,task_index=params.task_index)
input_img=[]
true_lab=[]
if params.job_name == "ps":
##try and wait for all the wokers t
serv.join()
elif params.job_name == "worker":
with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/replica:0/task:%d" % params.task_index,
cluster=cluster)):
# with tf.Graph().as_default(), tf.device('/cpu:%d' % params.task_index):
# with tf.container('%s %d' % ('batchname', params.task_index)) as scope:
input_img = tf.placeholder(dtype=tf.float32, shape=[10,])
with tf.name_scope('%s_%d' % (params.job_name, params.task_index)) as scope:
hess_op = model_fun(input_img)
global_step = tf.contrib.framework.get_or_create_global_step()
sv = tf.train.Supervisor(is_chief=(params.task_index == 0),
global_step=global_step,
init_op=tf.initialize_all_variables(),logdir='/tmp/mydir')
with sv.prepare_or_wait_for_session(serv.target) as sess:
step = 0
while not sv.should_stop() :
hess = sess.run(hess_op, feed_dict={input_img:im_Test })
print(np.array(hess))
print multiprocessing.current_process().name
step += 1
if(step==3):
return_dict[params.job_name] = params.task_index
result_t.put(return_dict)
break
sv.stop()
sess.close()
return
if __name__ == '__main__':
logger = multiprocessing.log_to_stderr()
manager = multiprocessing.Manager()
result = manager.Queue()
return_dict = manager.dict()
processes = []
devices = [['ps', 0],
['worker', 0],
['worker', 1]
]
for i in (devices):
start_time = time.time()
proc = multiprocessing.Process(target=cifar10,args=(i,return_dict,result))
processes.append(proc)
proc.start()
for p in processes:
p.join()
# print return_dict.values()
kill = []
while True:
if result.empty() == True:
break
kill.append(result.get())
print kill
print("time taken = %d" % (start_time - time.time()))
在我的例子中,当我提交一个tensorflowonspark作业纱线集群模式时,我发现ps引发了这个错误,woker等待响应。在
ps
错误如下然后检查ps服务器中的端口。是的,这个端口被使用了。在
所以重新提交作业解决问题。在
但是,如果每次运行代码时都会收到此错误,则应该检查更多日志以查找原因。在
相关问题 更多 >
编程相关推荐