Python中peer[Errno 104]重置连接

2024-10-01 15:28:09 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个分布式程序。网络中的每个节点(虚拟机)向每个其他节点发送数据(通过传出连接)和从每个其他节点接收数据(通过传入连接)。在发送数据之前,所有节点都会向每个其他节点(包括单个源节点)打开一个套接字。延迟3秒后,源开始向网络中的每个其他节点发送不同的文件块。每个节点在第一个包到达后开始转发接收块。在

程序多次成功完成,没有任何错误。但是,有时一个随机节点会重置传入连接(同时仍然通过其传出连接发送数据)。在

每个节点都有n-2个发送方线程和n-1个接收方线程。在

发送功能:

def relaySegment_Parallel(self):
        connectionInfoList = []
        seenSegments = []
        readyServers = []
        BUFFER_SIZE = Node.bufferSize
        while len(readyServers) < self.connectingPeersNum-len(Node.sources) and self.isMainThreadActive(): #Data won't be relayed to the sources
            try:
                tempIp = None
                for ip in Node.IPAddresses:
                    if ip not in readyServers and ip != self.ip and ip not in self.getSourcesIp():
                        tempIp = ip
                        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                        s.connect((ip, Node.dataPort))
                        connectionInfoList.append((s, ip))
                        readyServers.append(ip)
                        if Node.debugLevel2Enable:
                            print "RelayHandler: Outgoing connection established with IP:  " + str(ip)
            except socket.error, v:
                errorcode = v[0]
                if  errorcode == errno.ECONNRESET:
                    print "(RelayHandler) Connection reset ! Node's IP: " + str(tempIp)
                if errorcode == errno.ECONNREFUSED:
                    print "(RelayHandler) Node " + str(tempIp) + " are not ready yet!"
                continue
            except:
                print "Error: Cannot connect to IP: " + str (tempIp)
                continue
            print "(RelayHandler) Ready to relay data to " + str(len(readyServers)) + " numeber of servers."
        try:
            pool = ThreadPool(processes = Node.threadPoolSize)
            while Node.terminateFlag == 0 and not self.isDistributionDone() and self.isMainThreadActive():
                if len(self.toSendTupleList) > 0:
                    self.toSendLock.acquire()
                    segmentNo, segmentSize, segmentStartingOffset, data = self.toSendTupleList.pop(0)
                    self.toSendLock.release()
                    if len(data) > 0:
                        if segmentNo not in seenSegments:
                            #Type: 0 = From Sourece , 1 = From Rlayer
                            #Sender Type/Segment No./Segment Size/Segment Starting Offset/
                            tempList = []
                            for s, ip in connectionInfoList:
                                tempData = "1/" + str(self.fileSize) + "/"  + str(segmentNo) + "/" + str(segmentSize) + "/" + str(segmentStartingOffset) + "/"
                                tempList.append((s, ip, tempData))
                            pool.map(self.relayWorker, tempList)
                            seenSegments.append(segmentNo)
                        relayList = []
                        for s, ip in connectionInfoList:
                            relayList.append((s, ip, data))
                        pool.map(self.relayWorker, relayList)
            for s, ip in connectionInfoList:
                s.shutdown(1)# 0:Further receives are disallowed -- 1: Further  sends are disallow / sends -- 2: Further sends and receives are disallowed.
                s.close()
            pool.close()
            pool.join()
        except socket.error, v:
            errorcode=v[0]
            if errorcode==errno.ECONNREFUSED:
                print "(RelayHandler) Error: Connection Refused in RelaySegment function. It can not connect to: ", ip
            else:
                print "\n(RelayHandler) Error1 in relaying segments (Parallel) to ", ip, " !!! ErrorCode: ", errorcode
            traceback.print_exception(*sys.exc_info())
        except:
            print "\n(RelayHandler) Error2 in relaying segments (Parallel) to ", ip
            traceback.print_exception(*sys.exc_info())

接收功能:

^{pr2}$

所有节点的发送线程打印出该节点连接到所有其他节点(包括随机故障节点)。然而,随机节点的接收函数等待

s.accept()

并且不接受任何连接,只接受来自最后一个要连接的源的连接。随机节点只是等待而不引发任何异常。在

看来

s.listen()

随机节点的(TCP协议)使发送方认为它们是连接的,而

s.accept()

最后一个不接受任何一个。然后,由于某些原因,它会重置连接,这就是为什么其他人(发送者)在尝试发送数据时引发“通过对等方重置连接”异常。唯一一个完成任务没有任何错误的发送方是最后一个连接的源。在

错误:

Traceback (most recent call last):
File "/home/ubuntu/DCDataDistribution/Node.py", line 137, in relayWorker
socketConn.sendall(data)
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 104] Connection reset by peer

为什么会这样?在

仅供参考: 我在amazonec2实例上运行我的程序。每个实例的类型是t2.micro(1个vCPU,2.5 GHz,Intel Xeon系列(最高3.3 GHz)和1 GiB内存)。Ubuntu服务器14.04lts(HVM)正在每个实例上运行。在


Tags: andtoinselfipnodeif节点
1条回答
网友
1楼 · 发布于 2024-10-01 15:28:09
            for s, ip in connectionInfoList:
                s.shutdown(1)# 0:Further receives are disallowed   1: Further  sends are disallow / sends   2: Further sends and receives are disallowed.
                s.close()
            pool.close()
            pool.join()

pool中的某些relayWorker线程仍可能未完成时,shutdown连接。颠倒顺序:

^{pr2}$

相关问题 更多 >

    热门问题