用Python控制TCP服务器

2024-09-27 19:17:45 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用unittests来检查TCP客户端通信的正确性,所以我需要一些TCP服务器,我可以控制它,它可以发送到客户端(以及何时)。
一个简单的测试应该是:

server = Server("localhost", 5555)
server.start()
client.connect()
self.assertTrue("login_message" in server.received_data)
server.send_to_client(reject_messages)
self.assertTrue("login_again" in server.received_data)
time.sleep(10)
self.assertTrue("login_again_and_again" in server.newest_received_data)
server.stop()
self.assertTrue("login failed" in client.logs) 

我需要全流程控制,什么可以用来实现服务器?
现在我尝试使用threaded SocketServer,但我既不能访问数据,也不能控制它。。在


Tags: inself服务器client客户端dataserverlogin
1条回答
网友
1楼 · 发布于 2024-09-27 19:17:45

我不知道是否有人需要这个,但不管怎样: 我用的是gevent服务器

from gevent import sleep, socket
from gevent.server import StreamServer

class Connection:
    def __init__(self, host, port):
        self.server = StreamServer((host, port), self.handle)
        self.data = []
        self.socks = []
        self.pointer = 0

    def handle(self, sock, address):
        self.socks.append(sock)
        while True:
            line = sock.recv(1024)
            if line:
                self.data += [line]
            else:
                break
        sock.close()
        self.socks.remove(sock)

    def send(self, msg):
        if self.socks:
            sock2send = self.socks[-1]
            try:
                sock2send.send(msg)
            except IOError, e:
                print "Can't send message '%s'! Exception:" % msg, e
        else:
            print "No sockets to send the message to"

    def start(self):
        self.server.start()

    def serve_forever(self):
        self.server.serve_forever()

    def close(self):
        self.server.stop()
        for sock in self.socks:
            sock.close()

    def new_data(self):
        newest = self.data[self.pointer:]
        self.pointer = len(self.data)
        return newest

然后unittest看起来像这样:

^{pr2}$

相关问题 更多 >

    热门问题