僵尸进程,我们又来了

2024-09-30 16:22:04 发布

您现在位置:Python中文网/ 问答频道 /正文

我在多处理/线程/子处理方面做了很多工作。我基本上想做的是执行计算机上的每一个二进制文件,为此我编写了一个python脚本。但我一直在使用僵尸进程(“已失效”),如果我的4个工作人员都处于这种状态,这些进程就会陷入死锁。 我尝试了很多不同的方法,但似乎没有任何效果:(

以下是该体系结构的外观:

|   \_ python -m dataset --generate
|       \_ worker1
|       |   \_ [thread1] firejail bin1
|       \_ worker2
|       |   \_ [thread1] firejail bin1
|       |   \_ [thread2] firejail bin2
|       |   \_ [thread3] firejail bin3
|       \_ worker3
|       |   \_ [thread1] [firejail] <defunct>
|       \_ worker4
|       |   \_ [thread1] [firejail] <defunct>

我创建了4个工人:

# spawn mode prevents deadlocks https://codewithoutrules.com/2018/09/04/python-multiprocessing/
with get_context("spawn").Pool() as pool:

    results = []

    for binary in binaries:
        result = pool.apply_async(legit.analyse, args=(binary,),
                                  callback=_binary_analysis_finished_callback,
                                  error_callback=error_callback)
        results.append(result)

(注意,我使用了一个“繁殖”池,但现在我想知道它是否有用…)

每个工作线程将创建多个线程,如下所示:

threads = []
executions = []

def thread_wrapper(*args):
    flows, output, returncode = _exec_using_firejail(*args)
    executions.append(Execution(*args, flows, is_malware=False))

for command_line in potentially_working_command_lines:
    thread = Thread(target=thread_wrapper, args=(command_line,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

每个线程将在firejail沙盒中启动一个新进程:

process = subprocess.Popen(FIREJAIL_COMMAND +
                           ["strace", "-o", output_filename, "-ff", "-xx", "-qq", "-s", "1000"] + command_line,
                           stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid)

try:
    out, errs = process.communicate(timeout=5, input=b"Y\nY\nY\nY\nY\nY\nY\nY\nY\nY\nY\nY\nY\nY\nY\nY\n")
    # print("stdout:", out)
    # print("stderr:", errs)

except subprocess.TimeoutExpired:
    # print(command_line, "timed out")
    os.killpg(os.getpgid(process.pid), signal.SIGKILL)
    out, errs = process.communicate()

我使用os.killpg()而不是process.kill(),因为由于某些原因,我的Popen进程的子进程没有被终止。。。 这是可能的,因为preexec_fn=os.setsid设置了所有子代的gid。但即使使用此方法,某些进程(如zsh)也会引发僵尸进程,因为它看起来zsh更改了gid,因此我的os.killpg无法按预期工作

我正在寻找一种100%确信所有进程都会死掉的方法


Tags: 方法进程oslinecallbackargsout线程
1条回答
网友
1楼 · 发布于 2024-09-30 16:22:04

如果要为此使用subprocess模块,则应直接使用process对象的.kill方法,而不是使用os模块。使用communicate是一种阻塞动作;因此Python将等待响应。使用timeout参数会有所帮助,但对于许多进程来说速度会很慢

import subprocess

cmd_list = (
    FIREJAIL_COMMAND 
    + ["strace", "-o", output_filename, "-ff", "-xx", "-qq", "-s", "1000"] 
    + command_line
) 
proc = subprocess.Popen(
    cmd_list,
    stdout=subprocess.PIPE, 
    stderr=subprocess.PIPE, 
    preexec_fn=os.setsid
)

try:
    out, errs = proc.communicate(timeout=5, input=b"Y\n" * 16)
except subprocess.TimeoutExpired:
    proc.kill()
    out, errs = None, None

ret_code = process.wait()

如果您想在一组进程上的非阻塞循环中运行它,那就是使用poll。这里有一个例子。这假设您有一个filenames列表和相应的command_lines列表,您希望将其提供给流程创建

import subprocess
import time

def create_process(output_filename, command_line):
    cmd_list = (
        FIREJAIL_COMMAND 
        + ["strace", "-o", output_filename, "-ff", "-xx", "-qq", "-s", "1000"] 
        + command_line
    ) 
    proc = subprocess.Popen(
        cmd_list,
        stdout=subprocess.PIPE, 
        stderr=subprocess.PIPE, 
        preexec_fn=os.setsid
    )
    return {proc: (output_filename, command_line)}

processes = [create_process for f, c in zip(filenames, command_lines)]

TIMEOUT = 5
WAIT = 0.25  # how long to wait between checking the processes
finished = []
for _ in range(round(TIMEOUT / WAIT)):
    finished_new = []
    if not processes:
        break
    for proc in processes:
        if proc.poll():
            finished_new.append(proc)
    # cleanup
    for proc in finished_new:
        process.remove(proc)
    finished.extend(finished_new)
    time.sleep(WAIT)

相关问题 更多 >