<p>在<a href="/questions/tagged/chapel" class="post-tag" title="show questions tagged 'chapel'" rel="tag">chapel</a>团队解决并重新确认之前,请仅使用<code>int</code>有效负载测试任何{a1}ZMQ模块服务,并可能避免{<cd2>}原型(由于字符串匹配悬而未决的问题)。在</p>
<p>由于<strong><a href="https://stackoverflow.com/users/8464160/nick">@Nick</a></strong>最近<a href="https://stackoverflow.com/a/45683168">disclosed here,</a>要实现<code>ZMQ</code>服务以满足ZeroMQ API合规性,并为异构分布式系统完全打开交叉兼容大门,还有一条路要走:</p>
<blockquote>
<p>To send a string, Chapel sends one message with the string size followed by another message with the byte buffer; receiving works similarly.<br/><br/> That means that your one call to <code><aSocket>.recv( string )</code> was actually making two back-to-back calls to <code>zmq_recv()</code> under the hood. With the <strong><code>REQ/REP</code></strong> pattern, those two back-to-back <code>zmq_recv()</code> calls put the ZeroMQ state machine into an invalid state, hence the error message.</p>
<p>This is definitely a bug with Chapel's <code>ZMQ</code> module.</p>
</blockquote>
<hr/>
<h2>几个步骤可以让现场更加明朗:</h2>
<p>在诊断根本原因之前,让我提出一些要采取的措施。ZeroMQ是一个非常强大的框架,在这个框架中,人们很难选择比<strong><code>REQ/REP</code></strong>更难(和更脆弱)的消息传递原型。在</p>
<p>内部的有限状态自动机(事实上,分布式FSA)都是阻塞的(通过设计,在连接的对等点之间强制执行类似钟摆的消息传递(不需要仅仅是前2个),这样[a]-<code>.send()</code>-<code>.recv()</code>-<code>.send()</code>-<code>.recv()</code>-。。。在一侧[A]匹配[B]-<code>.recv()</code>-<code>.send()</code>-<code>.recv()</code>-…)如果由于某种原因,双方都进入等待状态,此时[a]和[B]都期望从信道的另一端接收下一条消息,那么这个dFSA也存在一个主要无法挽救的相互死锁。在</p>
<p>这就是说,我的建议是首先进行一个最简单的测试-使用一对不受限制的单纯形通道(可以是[a]<code>PUSH</code>/[B]<code>PULL</code>+[B]<code>PUSH</code>/[a]<code>PULL</code>,或者使用{<cd2>}的更复杂的方案)。在</p>
<p>不是为一个完全网格化的多代理基础设施建立,而是这个基础设施的简化版本(不需要也不打算使用<code>ROUTER/DEALER</code>通道,但是如果扩展模型方案,可能会复制(反转)<code>PUSH/PULL</code>-s):</p>
<p><a href="https://i.stack.imgur.com/2NsKh.gif" rel="nofollow noreferrer"><img src="https://i.stack.imgur.com/2NsKh.gif" alt="enter image description here"/></a></p>
<p>对于由当前的<a href="/questions/tagged/chapel" class="post-tag" title="show questions tagged 'chapel'" rel="tag">chapel</a>实现约束产生的隐含限制,还需要花费更多的精力:</p>
<blockquote>
<p>In Chapel, sending or receiving messages on a <kbd><code>Socket</code></kbd> uses <strong>multipart</strong> messages and the <kbd><code>Reflection</code></kbd> module to serialize primitive and user-defined data types whenever possible. Currently, the <strong><code>ZMQ</code></strong> module serializes primitive numeric types, strings, and records composed of these types. Strings are encoded as a length (as <em>int</em>) followed by the character array (in bytes).</p>
</blockquote>
<p>如果这些备注不仅仅是线级别的内部特性,并且扩展到顶层的ZeroMQ消息传递/信令层(参见管理订阅的详细信息,其中ZeroMQ主题筛选器匹配基于左侧与接收到的消息的精确匹配,则应该进行一些调整 ). 在</p>
<hr/>
<h2><a href="/questions/tagged/python" class="post-tag" title="show questions tagged 'python'" rel="tag">python</a>侧享有更大的设计自由:</h2>
<pre><code>#
# python
# #########
import time
import zmq; context = zmq.Context()
print( "INF: This Agent uses ZeroMQ v.{0:}".format( zmq.__version__ ) )
dataAB = context.socket( zmq.REQ )
dataAB.setsockopt( zmq.LINGER, 0 ) # ( a must in pre v4.0+ )
dataAB.connect( "tcp://localhost:5555" )
heartB = context.socket( zmq.SUB )
heartB.setsockopt( zmq.LINGER, 0 ) # ( a must in pre v4.0+ )
heartB.setsockopt( zmq.CONFLATE, 0 ) # ( ignore history, keep just last )
heartB.connect( "tcp://localhost:6666" )
heartB.setsockopt( zmq.SUBSCRIBE, "[chapel2python.HB]" )
heartB.setsockopt( zmq.SUBSCRIBE, "" ) # in case [Chapel] complicates serialisation
# -
while ( True ):
pass; print( "INF: waiting for a [Chapel] HeartBeat-Message" )
hbIN = heartB.recv( zmq.NOBLOCK );
if len( hbIN ) > 0:
pass; print( "ACK: [Chapel] Heart-Beat-Message .recv()-ed" )
break
else:
time.sleep( 0.5 )
# -
for request in range(10):
pass; print( "INF: Sending a request %s to [Chapel] ..." % request )
dataAB.send( str( "Yo" ) )
pass; print( "INF: a blocking .recv(), [Chapel] is to answer ..." )
message = dataAB.recv()
pass; print( "INF: [Chapel] said %s" % message )
# -
dataAB.close()
heartB.close()
context.term()
# -
</code></pre>
<p>一些进一步的<code>try:/except:/finally:</code>构造应该为来自无限<code>while()</code>-loops等的<code>KeyboardInterrupt</code>-s服务,但是为了清楚起见,这里省略了这些。在</p>
<hr/>
<h2>在<a href="/questions/tagged/chapel" class="post-tag" title="show questions tagged 'chapel'" rel="tag">chapel</a>方面,我们将尽力跟上API的步伐,如下所示:</h2>
<p>如果对<code>.send()</code>/<code>.recv()</code>方法的调用隐式总是阻塞的,而您的代码假定它是在阻塞模式下运行的(对于任何分布式系统设计,我总是强烈建议不要使用这种模式),文档也不能帮助决定用户代码是否可以控制,阻塞是一个糟糕的实践-<a href="https://stackoverflow.com/a/45534028">more on this <strong>here</strong></a>)。在</p>
<blockquote>
<p>While the C-level call <kbd><code>zmq_send()</code></kbd> may be a blocking call (depending on the socket type and flag arguments), it is desirable that a semantically-blocking call to <kbd><strong><code>Socket.send()</code></strong></kbd> allow other Chapel tasks to be scheduled on the OS thread as supported by the tasking layer. Internally, the ZMQ module uses non-blocking calls to <kbd><code>zmq_send()</code></kbd> and <kbd><code>zmq_recv()</code></kbd> to transfer data, and yields to the tasking layer via <em>chpl_task_yield()</em> when the call would otherwise block.</p>
<p><a href="http://chapel.cray.com/docs/1.15/modules/packages/ZMQ.html#using-sockets" rel="nofollow noreferrer">Source</a></p>
</blockquote>
^{pr2}$