Python:如何在同一时刻触发多个进程

2024-10-01 00:15:27 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试运行一个进程,该进程执行一个httppost,然后将向服务器发送一个警报(发送警报所用的时间以纳秒计)。我试图测试服务器在毫秒内处理警报的能力。根据给定的标准,服务器每秒处理6000个警报。在

我使用多处理模块创建了一段代码,该模块发送6000个警报,但我使用的是for循环,因此执行for循环所花费的时间超过一秒钟。因此,所有6000个进程不是在同一时刻触发的。在

有没有一种方法可以同时触发多个(N个数字)进程?在

这是我的代码:流动测试.py那是一个图书馆。接下来是我的脚本,在“##################

导入json 导入httplib2

类流测试(): definit(自身、公司ID、设备IP): 自身公司ID=公司ID self.deviceIp=设备IP

def generate_savedSearchName(self, randNum):
    self.randMsgId = randNum
    self.savedSearchName = "TEST %s risk31 more than 3" % self.randMsgId

def def_request_body_dict(self):
    self.reqBody_dict = \
        { "Header" : {"agid" : "Agent1",
                      "mid": self.randMsgId,
                      "ts" : 1253125001
        },
          "mp":
              {
                  "host" : self.deviceIp,
                  "index" : self.companyId,
                  "savedSearchName" : self.savedSearchName,
              }
        }
    self.req_body = json.dumps(self.reqBody_dict)

def get_default_hdrs(self):
    self.hdrs = {'Content-type': 'application/json',
                 'Accept-Language': 'en-US,en;q=0.8'}

def send_request(self, sIp, method="POST"):
    self.sIp = sIp
    self.url = "http://%s:8080/agent/splunk/messages" % self.sIp

    http_cli = httplib2.Http(timeout=180, disable_ssl_certificate_validation=True)
    rsp, rsp_body = http_cli.request(uri=self.url, method=method, headers=self.hdrs, body=self.req_body)
    print "rsp: %s and rsp_body: %s" % (rsp, rsp_body)

# My testScript
from flowTesting import flowTesting
import random
import multiprocessing

deviceIp = "10.31.421.35"
companyId = "CPY0000909"
noMsgToBeSent = 1000
sIp = "10.31.44.235"
uniq_msg_id_list = random.sample(xrange(1,10000), noMsgToBeSent)

def runner(companyId, deviceIp, uniq_msg_id):
    proc = flowTesting(companyId, deviceIp)
    proc.generate_savedSearchName(uniq_msg_id)
    proc.def_request_body_dict()
    proc.get_default_hdrs()
    proc.send_request(sIp)

process_list = []
for uniq_msg_id in uniq_msg_id_list:
    savedSearchName = "TEST-1000 %s risk31 more than 3" % uniq_msg_id

    process = multiprocessing.Process(target=runner, args=(companyId,deviceIp,uniq_msg_id,))
    process.start()
    process.join()
    process_list.append(process)

print "Process list: %s" % process_list
print "Unique Message Id: %s" % uniq_msg_id_list

Tags: selfidrequestdefbodymsg警报process
2条回答

您需要使用进程间同步原语。在Linux上,您将使用Sys-V信号量,在Windows上,您将使用Win32事件。在

您的6000个进程将等待这个信号量/事件,并从另一个进程触发它,从而将所有6000个进程从等待状态释放到就绪状态,然后操作系统将尽快开始执行它们。在

让它们同时发生显然是不可能的,除非你有一个6000核的机器和一个操作系统内核,它的调度器能够完美地处理它们(你没有),你不可能一次运行6000个代码。在

而且,即使你这么做了,他们都想在套接字上发送消息。即使你的内核是如此疯狂的并行,除非你有6000个独立的网卡,它们最终还是会在网卡缓冲区中序列化。IP就是这样工作的:一个包接一个包。当然,在路径上有所有的路由器,服务器的网卡,服务器的操作系统等等,即使IP不碍事,字节通过电缆传输也需要时间。因此,在同一时刻做到这一点的唯一方法,即使在理论上,也就是每边有6000个网卡,并用相同的光纤将它们直接连接起来。在

然而,你并不真的需要他们在同一时刻,只是更接近对方比他们。您没有向我们展示您的代码,但您可能刚刚启动了6000个立即尝试发送消息的Process。这意味着您将进程启动时间(特别是在Windows上)包括在倾斜时间中,这可能非常慢。在

可以通过使用线程而不是进程来减少这种情况。这似乎有违直觉,但Python非常擅长处理I/O绑定的线程,而且每一个现代操作系统都非常擅长启动新线程。在

但实际上,您需要的是在您的线程或进程上使用^{},让它们在任何线程或进程尝试执行任何工作之前完成所有设置工作(包括进程启动)。在

它可能还不够紧,但它会比你现在可能要紧得多。在


你将面临的下一个限制是上下文切换时间。现代操作系统很擅长调度,但不能同时处理6000个任务。所以,实际上,你想将这个过程减少到N个进程,每个进程只需尽可能快地按顺序发送6000个/N个连接。这将使它们进入内核/NIC的速度比一次尝试6000次并让操作系统为您执行序列化要快得多。(事实上,在某些平台上,根据硬件的不同,一个进程连续执行6000个进程可能比N执行6000个/N要好。两种方法都要测试。)


socket库本身仍有一些开销。为了解决这个问题,您需要预先构建所有IP包,然后创建一个原始套接字并发送这些包。从每个连接发送第一个数据包,然后从每个连接发送第二个数据包,等等

相关问题 更多 >