diff --git a/output/manager.py b/output/manager.py index e00b65b..79519c7 100644 --- a/output/manager.py +++ b/output/manager.py @@ -31,6 +31,7 @@ class OutputManager(): } _writer: "List[IOutput]" = [] _thread: "Optional[threading.Thread]"= None + _should_exit = threading.Event() def __init__(self): _config = config()['output']['use'] @@ -107,14 +108,22 @@ class OutputManager(): writer.error_output(message.method, message.payload, e) def start_loop(self): + self._should_exit.clear() self._thread = threading.Thread(target=self._handle) self._thread.start() def _handle(self): while True: message = MESSAGE_QUEUE.get() + if self._should_exit.is_set(): + break + if message is None: + continue self.decode_payload(message) def terminate(self): + self._should_exit.set() + MESSAGE_QUEUE.put(None) + for writer in self._writer: writer.terminate() diff --git a/proxy/queues.py b/proxy/queues.py index ac037ba..a9cf669 100644 --- a/proxy/queues.py +++ b/proxy/queues.py @@ -1,4 +1,8 @@ from queue import SimpleQueue -from proxy.common import MessagePayload +from typing import TYPE_CHECKING -MESSAGE_QUEUE: "SimpleQueue[MessagePayload]" = SimpleQueue() +if TYPE_CHECKING: + from typing import Optional + from proxy.common import MessagePayload + +MESSAGE_QUEUE: "SimpleQueue[Optional[MessagePayload]]" = SimpleQueue()