ZeroMQ with Chapel And Python,无法在当前s中回答

2024-06-25 06:36:10 发布

您现在位置:Python中文网/ 问答频道 /正文

我不能确定错误在哪里,但我正在尝试在Python客户机和Chapel服务器之间传递消息。客户端代码是

import zmq

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

for request in range(10):
    print("Sending request %s ..." % request)
    socket.send(str("Yo"))
    message = socket.recv()
    print("OMG!! He said %s" % message)

礼拜堂的侍者是

^{pr2}$

这个信息似乎很普遍,但我并不真正理解。在

server.chpl:7: error: halt reached - Error in Socket.recv(string): Operation cannot be accomplished in current state

我想我双方都在发送/接收。最初的礼拜堂示例on the Chapel site运行得很好,但是我在修改它时遇到了困难。在

更新

this thread礼拜堂团队的帮助下,这项工作开始了。在

客户。py

import zmq

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

for request in range(10):
    message = "Hello %i from Python" % request
    print("[Python] Sending request: %s" % message)
    socket.send_string(message)
    message = socket.recv_string()
    print("[Python] Received response: %s" % message)

服务器。chpl

use ZMQ;

var context: Context;
var socket = context.socket(ZMQ.REP);
socket.bind("tcp://*:5555");

for i in 0..#10 {
  var msg = socket.recv(string);
  writeln("[Chapel] Received message: ", msg);
  socket.send("Hello %i from Chapel".format(i));
}

Tags: insendmessageforstringrequestcontextsocket
2条回答

@user3666197的回答很好地讨论了ZeroMQ状态机,我认为问题在于ChapelZMQ模块如何序列化和传输字符串。在

Chapel中的^{}^{}方法通过发送两条消息来序列化字符串。这是为了匹配ZeroMQ Guide's "Minor Note on Strings"中的模式,的目的是为了匹配ZeroMQ Guide's "Minor Note on Strings"中的模式,但是,在实现时,此序列化方案是不正确的,并且与某些ZeroMQ套接字模式不兼容。在

为了发送一个字符串,Chapel发送一个包含两个调用的多部分消息zmq_send():第一个字符串大小带有{}标志,第二个是字节缓冲区;接收的工作原理类似。这意味着你对socket.recv(string)的一个调用实际上是在幕后对zmq_recv()进行两个背靠背的调用。对于REQ/REP模式,这两个连续的zmq_recv()调用将ZeroMQ状态机置于无效状态,因此会出现错误消息。在

这绝对是Chapel的ZMQ模块的一个bug。在

作为参考,我是ChapelZMQ模块的作者(绝对不是没有bug的)。在

团队解决并重新确认之前,请仅使用int有效负载测试任何{a1}ZMQ模块服务,并可能避免{}原型(由于字符串匹配悬而未决的问题)。在

由于@Nick最近disclosed here,要实现ZMQ服务以满足ZeroMQ API合规性,并为异构分布式系统完全打开交叉兼容大门,还有一条路要走:

To send a string, Chapel sends one message with the string size followed by another message with the byte buffer; receiving works similarly.

That means that your one call to <aSocket>.recv( string ) was actually making two back-to-back calls to zmq_recv() under the hood. With the REQ/REP pattern, those two back-to-back zmq_recv() calls put the ZeroMQ state machine into an invalid state, hence the error message.

This is definitely a bug with Chapel's ZMQ module.


几个步骤可以让现场更加明朗:

在诊断根本原因之前,让我提出一些要采取的措施。ZeroMQ是一个非常强大的框架,在这个框架中,人们很难选择比REQ/REP更难(和更脆弱)的消息传递原型。在

内部的有限状态自动机(事实上,分布式FSA)都是阻塞的(通过设计,在连接的对等点之间强制执行类似钟摆的消息传递(不需要仅仅是前2个),这样[a]-.send()-.recv()-.send()-.recv()-。。。在一侧[A]匹配[B]-.recv()-.send()-.recv()-…)如果由于某种原因,双方都进入等待状态,此时[a]和[B]都期望从信道的另一端接收下一条消息,那么这个dFSA也存在一个主要无法挽救的相互死锁。在

这就是说,我的建议是首先进行一个最简单的测试-使用一对不受限制的单纯形通道(可以是[a]PUSH/[B]PULL+[B]PUSH/[a]PULL,或者使用{}的更复杂的方案)。在

不是为一个完全网格化的多代理基础设施建立,而是这个基础设施的简化版本(不需要也不打算使用ROUTER/DEALER通道,但是如果扩展模型方案,可能会复制(反转)PUSH/PULL-s):

enter image description here

对于由当前的实现约束产生的隐含限制,还需要花费更多的精力:

In Chapel, sending or receiving messages on a Socket uses multipart messages and the Reflection module to serialize primitive and user-defined data types whenever possible. Currently, the ZMQ module serializes primitive numeric types, strings, and records composed of these types. Strings are encoded as a length (as int) followed by the character array (in bytes).

如果这些备注不仅仅是线级别的内部特性,并且扩展到顶层的ZeroMQ消息传递/信令层(参见管理订阅的详细信息,其中ZeroMQ主题筛选器匹配基于左侧与接收到的消息的精确匹配,则应该进行一些调整 ). 在


侧享有更大的设计自由:

#
# python
# #########

import time
import zmq; context = zmq.Context()

print( "INF: This Agent uses ZeroMQ v.{0:}".format( zmq.__version__ ) )

dataAB = context.socket( zmq.REQ )
dataAB.setsockopt( zmq.LINGER, 0 )        # ( a must in pre v4.0+ )
dataAB.connect( "tcp://localhost:5555" )

heartB = context.socket( zmq.SUB )
heartB.setsockopt( zmq.LINGER,   0 )      # ( a must in pre v4.0+ )
heartB.setsockopt( zmq.CONFLATE, 0 )      # ( ignore history, keep just last )

heartB.connect( "tcp://localhost:6666" )
heartB.setsockopt( zmq.SUBSCRIBE, "[chapel2python.HB]" )
heartB.setsockopt( zmq.SUBSCRIBE, "" )    # in case [Chapel] complicates serialisation
#                                  -    
while ( True ):
      pass;             print( "INF: waiting for a [Chapel] HeartBeat-Message" )
      hbIN = heartB.recv( zmq.NOBLOCK );
      if len( hbIN ) > 0:
         pass;          print( "ACK: [Chapel] Heart-Beat-Message .recv()-ed" )
         break
      else:
         time.sleep( 0.5 )
#                                  -
for request in range(10):
    pass;               print( "INF: Sending a request %s to [Chapel] ..." % request )
    dataAB.send( str( "Yo" ) )
    pass;               print( "INF: a blocking .recv(), [Chapel] is to answer ..." )
    message = dataAB.recv()
    pass;               print( "INF: [Chapel] said %s" % message )
#                                  -
dataAB.close()
heartB.close()
context.term()
#                                  -

一些进一步的try:/except:/finally:构造应该为来自无限while()-loops等的KeyboardInterrupt-s服务,但是为了清楚起见,这里省略了这些。在


方面,我们将尽力跟上API的步伐,如下所示:

如果对.send()/.recv()方法的调用隐式总是阻塞的,而您的代码假定它是在阻塞模式下运行的(对于任何分布式系统设计,我总是强烈建议不要使用这种模式),文档也不能帮助决定用户代码是否可以控制,阻塞是一个糟糕的实践-more on this here)。在

While the C-level call zmq_send() may be a blocking call (depending on the socket type and flag arguments), it is desirable that a semantically-blocking call to Socket.send() allow other Chapel tasks to be scheduled on the OS thread as supported by the tasking layer. Internally, the ZMQ module uses non-blocking calls to zmq_send() and zmq_recv() to transfer data, and yields to the tasking layer via chpl_task_yield() when the call would otherwise block.

Source

^{pr2}$

相关问题 更多 >