在子进程运行和通信时终止子进程,这些子进程通过队列进行清晰的通信

托马斯·克里斯托弗·戴维斯

我正在做一个较大的项目,其中有2个线程(相同的进程)和一个单独的进程。其中一个线程是gui,另一个线程是哨兵线程,它观察子进程,并且该子进程正在使用神经网络进行一些繁重的工作。该架构看起来像这样:

通讯架构

我需要能够取消神经网络的过程并分别结束前哨线程。我创建了一个小示例,该示例总体上展示了体系结构以及我的工作方式。

from multiprocessing import Process, Queue
from threading import Thread
from time import sleep
 
 
class Worker(Process):
    # The worker resembles the neural network. It does some calculations and shares
    # the information via the queue.
    def __init__(self, queue: Queue):
        Process.__init__(self)
        self.queue = queue
 
    def run(self):
        i = 0
        while True:
            self.queue.put(i)
            i += 1
 
    def stop(self):
        # I used the stop function for trying out some things, like using a joinable 
        # queue and block execution as long as the queue is not empty, which is not 
        # working
        self.queue.put(None)
        self.terminate()
 
 
class Listener(Thread):
    # This class resembles the sentinel thread. It checks in an infinite loop for
    # messages. In the real application I send signals via the signals and slots
    # design pattern to the gui and display the sent information.
 
    def __init__(self):
        Thread.__init__(self)
        self.queue = Queue()
        self.worker = Worker(self.queue)
 
    def run(self):
        self.worker.start()
        while True:
            data = self.queue.get()
            if data is not None:
                print(data)
            else:
                break
        print("broken")
 
    def stop(self):
        self.worker.stop()
 
 
class System:
    # This class resembles the gui
 
    def __init__(self):
        self.listener = Listener()
 
    def start(self):
        self.listener.start()
 
    def stop(self):
        self.listener.stop()
 
 
if __name__ == "__main__":
    system = System()
    system.start()
    sleep(0.1)
    system.stop()

问题是什么?

只要某个进程读取或写入队列和/或未正确清空队列,一个或两个进程就会变成僵尸进程,从某种意义上说,这基本上是一个死锁。因此,我需要找到一种在终止进程时正确处理队列的方法,从而使进程无错误地终止。

到目前为止我尝试过的是:

  1. 对每个task_done()使用Joinable Queue和join()

  2. 重写SIGTERM信号处理程序以等待队列被清空

  3. 使用Joinable队列并且仅在SIGTERM信号处理程序内使用join()

结果

  1. 处理速度大大降低,但终止工作正常

  2. 3.终止不能像我实现它的方式那样起作用。有时它起作用,有时却不起作用。因此,这种方法没有可靠的输出和知识

对(3)的尝试如下:

class Worker(Process):
 
    def __init__(self, queue: Queue):
        Process.__init__(self)
        self.queue = queue
        self.abort = False
        self.lock = Lock()
        signal(SIGTERM, self.stop)
 
    def run(self):
        i = 0
        while True:
            self.lock.acquire()
            if self.abort:
                break
            else:
                self.queue.put(i)
                i += 1
            self.lock.release()
        exit(0)
 
    def stop(self, sig, frame):
        self.abort = True
        self.queue.put(None)
        self.queue.join()
        exit(0)
达克诺

有可能的多种方法,但如果你的目标的性能和稳定性之间的妥协,我建议你使用信号处理程序只设置.running-标志上的工人,让它与检查while self.runningworker.run()循环中断后,您从worker发送sendinel-value 这样可以确保sentinel-value始终是队列中的最后一个值,并且所有值都由侦听器读取。此布局一起允许正常关闭工作程序,同时仍避免使用更昂贵的同步来检查退出条件。

from multiprocessing import Process, Queue
from functools import partial
from threading import Thread
from time import sleep
import signal


SENTINEL = 'SENTINEL'


def sigterm_handler(signum, frame, worker):
    worker.shutdown()


def register_sigterm(worker):
    global sigterm_handler
    sigterm_handler = partial(sigterm_handler, worker=worker)
    signal.signal(signal.SIGTERM, sigterm_handler)


class Worker(Process):

    def __init__(self, queue: Queue):
        Process.__init__(self)
        self.queue = queue
        self.running = False

    def run(self):
        register_sigterm(self)
        self.running = True
        i = 0
        while self.running:
            self.queue.put(i)
            i += 1
        self.queue.put(SENTINEL)

    def stop(self):  # called by parent
        self.terminate()

    def shutdown(self):  # called by child from signal-handler
        self.running = False


class Listener(Thread):

    def __init__(self):
        Thread.__init__(self)
        self.queue = Queue()
        self.worker = Worker(self.queue)

    def run(self):
        self.worker.start()
        for data in iter(self.queue.get, SENTINEL):
            print(data)

    def stop(self):
        self.worker.stop()
        self.worker.join()


class System:

    def __init__(self):
        self.listener = Listener()

    def start(self):
        self.listener.start()

    def stop(self):
        self.listener.stop()


if __name__ == "__main__":

    system = System()
    system.start()
    sleep(0.1)
    system.stop()

考虑以下实验。

我们的想法是猴子补丁队列实例中的孩子那样,从收到SIGTERM后,下一次queue.put()被调用时,传递的值和指定定点值为发送,queue.close()并且sys.exit()被调用。这样可以进行干净关闭,同时避免重复进行标志检查。

multiprocessing.Queue()实际上只是一个方法multiprocessing.context.BaseContext,返回的预配置实例multiprocessing.queues.Queue为了不干扰它,我将继承放在了合成之上。到目前为止的测试表明它工作正常。

stqueue.py

import sys
import time
import signal
from functools import partial
from multiprocessing import current_process as curr_p


def _shutdown(self):
    self._xput = self.put
    self.put = self.final_put


def _final_put(self, obj):
    self._xput(obj)
    self._xput(self._xsentinel)
    self.close()
    sys.exit(0)


def _sigterm_handler(signum, frame, queue):
    print(f"[{time.ctime()}, {curr_p().name}] --- handling signal")
    queue.shutdown()


def register_sigterm_queue(queue, sentinel):
    """Monkey-patch queue-instance to shutdown process
    after next call to `queue.put()` upon receipt of SIGTERM.
    """
    queue._xsentinel = sentinel
    queue.shutdown = _shutdown.__get__(queue)
    queue.final_put = _final_put.__get__(queue)
    global _sigterm_handler
    _sigterm_handler = partial(_sigterm_handler, queue=queue)
    signal.signal(signal.SIGTERM, _sigterm_handler)

main.py

import time
from threading import Thread
import multiprocessing as mp
from multiprocessing import Process, Queue, current_process as curr_p

import numpy as np

from stqueue import register_sigterm_queue


SENTINEL = 'SENTINEL'


class Worker(Process):

    def __init__(self, queue: Queue):
        Process.__init__(self)
        self.queue = queue

    def run(self):
        register_sigterm_queue(self.queue, SENTINEL)  # <<<
        while True:
            print(f"[{time.ctime()}, {curr_p().name}] --- starting numpy")
            r = np.sum(
                np.unique(np.random.randint(0, 2500, 100_000_000))
            )
            print(f"[{time.ctime()}, {curr_p().name}] --- ending numpy")
            self.queue.put(r)

    def stop(self):  # called by parent
        self.terminate()

...


if __name__ == "__main__":

    import logging
    mp.log_to_stderr(logging.DEBUG)

    system = System()
    system.start()
    time.sleep(10)
    print(f"[{time.ctime()}, {curr_p().name}] --- sending signal")
    system.stop()
    print(f"[{time.ctime()}, {curr_p().name}] --- signal send")

示例输出:

[DEBUG/MainProcess] created semlock with handle 140000699432960
[DEBUG/MainProcess] created semlock with handle 140000699428864
[DEBUG/MainProcess] created semlock with handle 140000664752128
[DEBUG/MainProcess] Queue._after_fork()
[Sat Oct 24 21:59:58 2020, Worker-1] --- starting numpy
[DEBUG/Worker-1] recreated blocker with handle 140000699432960
[DEBUG/Worker-1] recreated blocker with handle 140000699428864
[DEBUG/Worker-1] recreated blocker with handle 140000664752128
[DEBUG/Worker-1] Queue._after_fork()
[INFO/Worker-1] child process calling self.run()
[DEBUG/Worker-1] Queue._start_thread()
[DEBUG/Worker-1] doing self._thread.start()
[DEBUG/Worker-1] starting thread to feed data to pipe
[DEBUG/Worker-1] ... done self._thread.start()
[Sat Oct 24 22:00:04 2020, Worker-1] --- ending numpy
[Sat Oct 24 22:00:04 2020, Worker-1] --- starting numpy
3123750
[Sat Oct 24 22:00:08 2020, MainProcess] --- sending signal
[Sat Oct 24 22:00:10 2020, Worker-1] --- handling signal
[DEBUG/Worker-1] telling queue thread to quit
[INFO/Worker-1] process shutting down
[DEBUG/Worker-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Worker-1] running the remaining "atexit" finalizers
[DEBUG/Worker-1] joining queue thread
[DEBUG/Worker-1] feeder thread got sentinel -- exiting
[DEBUG/Worker-1] ... queue thread joined
[INFO/Worker-1] process exiting with exitcode 0
[Sat Oct 24 22:00:10 2020, Worker-1] --- ending numpy
3123750
[Sat Oct 24 22:00:10 2020, MainProcess] --- signal send
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

Process finished with exit code 0

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

asyncio在超时时终止子进程

来自分类Dev

在承诺结束时终止子进程

来自分类Dev

当另一个子进程完成时,如何终止子进程?

来自分类Dev

终止子进程的父进程的输出是什么?

来自分类Dev

Ptrace-与子进程的通信

来自分类Dev

当父进程被“ kill -9”杀死时,如何终止子进程

来自分类Dev

如何通过子进程在NodeJS和Python(pynput)之间进行通信

来自分类Dev

超时终止进程和任何子进程

来自分类Dev

子进程和父进程之间使用pipe()进行管道通信

来自分类Dev

使用两个管道在父进程和子进程之间进行通信

来自分类Dev

在一段时间后终止子进程

来自分类Dev

父鱼壳进程终止后立即终止子进程?

来自分类Dev

使用stdout / stderr以外的管道与子进程进行通信

来自分类Dev

使用管道与多线程程序中的子进程进行通信

来自分类Dev

如何生成子进程并与其进行通信Deno?

来自分类Dev

Python-通过套接字与子进程通信

来自分类Dev

如何使用python通过子进程与Excel文件通信?

来自分类Dev

Python子进程多个非阻塞通信

来自分类Dev

读取输出时,Python子进程通信冻结

来自分类Dev

Python子进程通信杀死了我的进程

来自分类Dev

当父进程等待子进程终止时,子进程如何杀死父进程?

来自分类Dev

python子进程中的多个输入和输出进行通信

来自分类Dev

自动终止进程和多处理池的子进程

来自分类Dev

自动终止进程和多处理池的子进程

来自分类Dev

通过子进程运行命令/程序

来自分类Dev

通过子进程运行Python脚本失败

来自分类Dev

子进程如何杀死其他子进程然后终止?

来自分类Dev

检查每个进程和子进程的内存

来自分类Dev

如何同时杀死进程和子进程?

Related 相关文章

热门标签

归档