Python负载平衡脚本运行缓慢

2024-10-02 14:23:29 发布

您现在位置:Python中文网/ 问答频道 /正文

我用python实现了一个负载平衡脚本,它将流量定向到网络内的服务器,并在没有可用服务器的情况下将流量引导到备用服务器。它可以工作,但问题是它运行缓慢。就像在非常慢的时候。通常需要0.8秒才能加载的页面使用此脚本加载需要13秒以上。在

#!/usr/bin/env python
# This is a simple load balancing python script, used to reduce the load on my raspberry pi servers.
import socket, select, time, sys, os, multiprocessing

def checkAdmin():
        import ctypes
        try:
                is_admin = os.getuid() == 0
        except AttributeError:
                is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0
        return is_admin

if not checkAdmin():
        print 'ERROR: You need to run this script with admin/root permissions'
        sys.exit(0)

# Changing the buffer_size and delay, you can improve the speed and bandwidth.
# But when buffer get to high or delay go too down, you can broke things
buffer_size = 4096
delay = 0.0001
forwards = {
        80: {
                'example.com' : {
                        'forwardifavailable':[
                                ('192.168.1.3',80)
                                ('192.168.1.4',80)
                                ('192.168.1.5',80)
                                ('192.168.1.6',80)
                        ],
                        'forwardifunavailable':[
                                ('127.0.0.1',1337)
                        ],
                        'last_svr':0
                },
                'subdomain.example.com' : {
                        'forwardifavailable':[
                                ('192.168.1.3',80)
                                ('192.168.1.5',80)
                                ('192.168.1.8',8080)
                        ],
                        'forwardifunavailable':[
                                ('127.0.0.1',1337)
                        ], 
                        'last_svr':0
                },
                'example2.com' : {
                        'forwardifavailable':[
                                ('192.168.1.3',80)
                        ],
                        'forwardifunavailable':[
                                ('127.0.0.1',1337)
                        ],
                        'last_svr':0
                },
                'unknown' : {
                        'forwardifavailable':[
                                ('192.168.1.3',80)
                        ],
                        'forwardifunavailable':[
                                ('127.0.0.1', 1337)
                        ],
                        'last_svr':0
                }
        },
        8080: {
                'example.com' : {
                        'forwardifavailable':[
                                ('192.168.1.3',80)
                                ('192.168.1.4',80)
                                ('192.168.1.5',80)
                                ('192.168.1.6',80)
                        ],
                        'forwardifunavailable':[
                                ('127.0.0.1',1337)
                        ],
                        'last_svr':0
                },
                'subdomain.example.com' : {
                        'forwardifavailable':[
                                ('192.168.1.3',80)
                                ('192.168.1.5',80)
                                ('192.168.1.8',8080)
                        ],
                        'forwardifunavailable':[
                                ('127.0.0.1',1337)
                        ], 
                        'last_svr':0
                },
                'example2.com' : {
                        'forwardifavailable':[
                                ('192.168.1.3',80)
                        ],
                        'forwardifunavailable':[
                                ('127.0.0.1',1337)
                        ],
                        'last_svr':0
                },
                'unknown' : {
                        'forwardifavailable':[
                                ('192.168.1.3',80)
                        ],
                        'forwardifunavailable':[
                                ('127.0.0.1', 1337)
                        ],
                        'last_svr':0
                }
        }
}

available = (multiprocessing.Manager()).dict()

def checkserver(address,port):
        global available
        available[(address,port)] = False
        while True:
                s = socket.socket()
                try:
                       s.connect((address, port))
                       available[(address,port)] = True
                except socket.error, e:
                       available[(address,port)] = False
                time.sleep(5) # 5 Second wait before second check; Reduces load and makes this much faster


sockets = []
def set_up_servers():
        ipandports = set([])
        global sockets
        for xport,sites in forwards.iteritems():
                currsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                currsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)
                currsocket.bind(('',xport))
                currsocket.listen(200)
                sockets.append(currsocket)
                for site,forwardslist in sites.iteritems():
                        for xtuple in forwardslist['forwardifavailable']:
                                ipandports.add(xtuple)
        # Now we have a list of ips and ports we need to constantly check if up
        for xtuple in ipandports:
                print xtuple
                (multiprocessing.Process(
                        target=checkserver,
                        args=xtuple
                        )).start()

def getForwardForUrl(originalurl,port, path):
        starttime = time.time()
        import tldextract
        url = originalurl
        xtuple = tuple([url,port])
        subdomain = ''
        if ':' in url: # In order to correctly handle ports
                url = url.split(':')[0]
        if url in forwards[port]: # This is to, optionally, support a subdomain on a separate server
                xchksvr = forwards[port][url]
        else:
                extracteddomain = tldextract.extract(url)
                url = "{0}.{1}".format(extracteddomain.domain,extracteddomain.suffix)
                subdomain = extracteddomain.subdomain + '.'
                if url in forwards[port]:
                        xchksvr = forwards[port][url]
                else:
                        print 'Unknown domain: ' + originalurl + " with port: " + str(port)
                        xchksvr = forwards[port]['unknown']
        i = len(xchksvr['forwardifavailable'])
        seci = (xchksvr['last_svr'] + 1) % len(xchksvr['forwardifavailable'])
        while (i > 0):
                if available[xchksvr['forwardifavailable'][seci]]:
                        xchksvr['last_svr'] = seci
                        print "User requested site {0} (path: {1}) from port {2}. Directing user to server {3} with port {4}.".format(originalurl,path,port,xchksvr['forwardifavailable'][seci][0],xchksvr['forwardifavailable'][seci][1]) 
                        return xchksvr['forwardifavailable'][seci]
                seci = (seci + 1) % len(xchksvr['forwardifavailable'])
                i -= 1
        xchksvr['last_svr'] = (xchksvr['last_svr'] + 1) % len(xchksvr['forwardifunavailable'])
        print "No servers available for site {0} (path: {1}) from port {2}. Directing to fallback server {3} with port {4}.".format(originalurl,path,port,xchksvr['forwardifunavailable'][xchksvr['last_svr']][0],xchksvr['forwardifunavailable'][xchksvr['last_svr']][1])
        print str(time.time() - starttime)
        return xchksvr['forwardifunavailable'][xchksvr['last_svr']]

class Forward:
    def __init__(self):
        self.forward = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    def start(self, addresstuple):
        try:
            self.forward.connect(addresstuple)
            return self.forward
        except Exception, e:
            print e
            return False

class TheServer:
    input_list = []
    channel = {}

    def __init__(self):
        pass

    def main_loop(self,listeningsocket):
        self.input_list.append(listeningsocket)
        while 1:
                time.sleep(delay)
                try:
                    ss = select.select
                    inputready, outputready, exceptready = ss(self.input_list, [], [])
                    for self.s in inputready:
                            if self.s == listeningsocket: # If this is an externally listening socket...
                                    self.on_accept()
                                    break
                            try:
                                    self.data = self.s.recv(buffer_size)
                            except Exception:
                                    data=''
                            if len(self.data) == 0:
                                self.on_close()
                                break
                            else:
                                self.on_recv(self.data)
                except Exception,e:
                    pass

    def on_accept(self):
        forward = (Forward()).start(('127.0.0.1', 1337))
        clientsock, clientaddr = self.s.accept()
        if forward:
            self.input_list.append(clientsock)
            self.input_list.append(forward)
            self.channel[clientsock] = forward
            self.channel[forward] = clientsock
        else:
            print "Can't establish connection with remote server."
            clientsock.close()

    def on_close(self):
        #remove objects from input_list
        self.input_list.remove(self.s)
        if self.channel[self.s] in self.input_list: # Because I'm too lazy to fix a bug where the element doesn't exist sometimes
                self.input_list.remove(self.channel[self.s])
        out = self.channel[self.s]
        # close the connection with client
        self.channel[out].close()  # equivalent to do self.s.close()
        # close the connection with remote server
        self.channel[self.s].close()
        # delete both objects from channel dict
        del self.channel[out]
        del self.channel[self.s]

    def parse_data(self,data):
        #print data # used for debugging
        if (data.startswith("HTTP 1/1")):
                # Data sent from server
                pass
        else:
                # Data sent to server
                if (data.startswith("GET") or (data.startswith("POST")) or (data.startswith("PUT")) or (data.startswith("PATCH")) or (data.startswith("HEAD")) or (data.startswith("OPTIONS")) or (data.startswith("DELETE"))):
                        # GET/POST/PUT/PATCH/HEAD/OPTIONS/DELETE request; Check that if we need a special server for this URL
                        timestart = time.time()
                        newChannel = Forward().start(getForwardForUrl(data.splitlines()[1].split()[1], self.s.getsockname()[1], data.splitlines()[0].split()[1]))
                        self.input_list[self.input_list.index(self.s)] = newChannel
                        oldForward = self.channel[self.s]
                        self.channel[self.s] = newChannel
                        del self.channel[oldForward]
                        self.channel[newChannel] = self.s
                        self.input_list.remove(oldForward)
                        self.input_list.append(newChannel)
                        self.input_list.remove(self.channel[self.s])
        return data

    def on_recv(self,data):
        # here we can parse and/or modify the data before send forward
        # data can NOT be gzipped, or this will not work
        data = self.parse_data(data)
        self.channel[self.s].send(data)

if __name__ == '__main__':
        import signal
        set_up_servers()
        for xsocket in sockets:
            print "Creating server to listen on port ", xsocket.getsockname()[1]
            server = TheServer()
            (multiprocessing.Process(target=server.main_loop,args=(xsocket,))).start()
        while True:
            try:
                pass
            except keyboardInterrupt:
                print "Ctrl C - Stopping server"
                os.kill(os.getpid(), signal.SIGTERM)

我尝试过进行一些代码分析,但是一旦多处理模块启动了第二个进程,该分析就不会分析该进程,从而导致这些代码行缺少分析信息。任何帮助都将不胜感激。谢谢您!在

顺便说一句:这是运行在树莓皮2b,与树莓杰西。在


Tags: toselfinputdataifportdefchannel