多线程Python套接字发送器/Clien

2024-06-28 20:04:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个Twisted应用程序,它监听Int32StringReceiver消息,然后将它们重新发送到另一个应用程序。基本上,它是一个路由器,但它有一些智能,可以自省数据的去向。在

我的问题是出站端,收到很多错误消息等等

入站是类接收器(Int32StringReceiver):

def doActualForwarding(self, data):         
    self.stats.recvBits  += 8 * (4 + len(data))
    self.stats.recvMsgs += 1
    dlen = len(data) 
    if dlen > 1024*256:
        self.logger.info("router.Receiver.doActualForwarding(): data len: %s" % (dlen))
    self.router.forward(data)

def stringReceived(self, data):
    d = threads.deferToThread(self.doActualForwarding, data)
    d.addCallback(self.forwardingDoneOkay)
    d.addErrback(self.forwardingDoneError)

在自动路由器是需要以相同格式通过套接字通信发送这些消息的实例化对象。所以,它只会在路由器类中执行以下操作:

^{pr2}$

问题:

  1. Python的套接字库是线程安全的吗?也就是说,在功能上,两个或多个线程有一个指向对象路由器的指针。两个线程都在调用self.sock.sendall公司(msg)我担心他们会踩到对方。

  2. 一个症状是,可能是连续的消息被彼此附加在一起。我不确定,但看起来是这样。

  3. 我看到很多资源临时工。unavail(意味着目的地正忙)、大约相同数量的断开管道和少量错误的文件描述符。在

    • [Errno 9]错误的文件描述符
    • [错误号11]资源暂时不可用
    • [错误32]管道破裂

这些消息可能相当于经过这个事件的消息数的0.5%(0.005)。在

  1. 我试图让每个send都执行connect/sendall/shutdown/close操作,但这导致了大量关于“peer重置连接”的消息。在

似乎每个人都专注于处理套接字上的多线程接收的代码,但对套接字上多线程发送的评论却不多。在

  1. 我还试图使用(可能是错误的):

    导入线程 自锁= 穿线。锁定() 与自锁: 索克·森达尔(消息)

但这导致了错误的消息。在

  1. 有人能给我指出一些好例子的方向吗(或者提供一些?!?!?)演示多线程套接字sendall()?

Tags: self应用程序消息datalendefstats错误
1条回答
网友
1楼 · 发布于 2024-06-28 20:04:47

我想说,如果进程不需要相互通信,那么最好的解决方案就是生成一个新进程来处理每个传入的连接。这样您就不必担心锁定,因为每个连接都将单独处理。在

简单的实施方法是:

import socket
import multiprocessing
import pdb
import random
from pycurl import Curl
import os
import time
import re

class query(object):
    pid, addr, conn, url, ua, ref = [None for i in range(6)]
    compression = True

    def __init__(self, conn, addr):
        self.pid = addr[1]
        self.addr = addr
        self.conn = conn
        self.process()

    def process(self):
        #do your socket stuff here

class ProxyServer(object):
    def __init__(self, host, port):
        self.host = host
        self.port = port

    def start(self):
        logging.info("Server started on %s:%i" % (self.host, self.port))
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.bind((self.host, self.port))
        self.sock.listen(0)

        while True:
            conn, addr = self.sock.accept()
            logging.info('Connection made from %s' % conn)
            proc = multiprocessing.Process(target=query, args=(conn, addr))
            proc.daemon = True
            proc.start()
            logging.info('Started processing query %r for %s' % (proc, addr))

if __name__ == "__main__":
    serv = ProxyServer(host, port)
    try:
        serv.start()
    except:
    finally:
        for proc in multiprocessing.active_children():
            proc.terminate()
            proc.join()

请记住,这是我从旧的概念验证代码中截取的一个示例,在它准备投入生产之前,您必须对它进行一点调整。在

相关问题 更多 >