import time import queue import typing as T from threading import Thread, current_thread Callback = T.Callable[..., None] Task = T.Tuple[Callback, T.Any, T.Any] TaskQueue = queue.Queue class Worker(Thread): def __init__(self, tasks: queue.Queue[Task]): super().__init__() self.tasks = tasks def run(self) -> None: while True: func, args, kargs = self.tasks.get() try: func(*args, **kargs) except Exception as e: print(e) self.tasks.task_done() class ThreadPool: def __init__(self, num_threads: int): self.tasks: TaskQueue = queue.Queue(num_threads) self.num_threads = num_threads for _ in range(self.num_threads): worker = Worker(self.tasks) worker.setDaemon(True) worker.start() def submit(self, func: Callback, *args, **kargs) -> None: self.tasks.put((func, args, kargs)) def wait_completion(self) -> None: self.tasks.join() def cpu_waster(i: int) -> None: name = current_thread().getName() print(f"{name} doing {i} work") time.sleep(3) def main() -> None: pool = ThreadPool(num_threads=5) for i in range(20): pool.submit(cpu_waster, i) print("All work requests sent") pool.wait_completion() print("All work completed") if __name__ == "__main__": main()
name = current_thread().getName()
Thread-2 doing 0 work
Thread-5 doing 1 work
Thread-4 doing 2 work
Thread-1 doing 3 work
Thread-3 doing 4 work
Thread-2 doing 5 work
Thread-5 doing 6 work
Thread-1 doing 8 work
Thread-4 doing 7 work
Thread-3 doing 9 work
Thread-5 doing 10 work
Thread-1 doing 11 work
Thread-4 doing 12 work
Thread-2 doing 14 work
Thread-3 doing 13 work
All work requests sent
Thread-1 doing 15 work
Thread-4 doing 16 work
Thread-2 doing 17 work
Thread-3 doing 18 work
Thread-5 doing 19 work
All work completed
スレッドプールを実装してそれぞれのスレッドで並列処理を行なっているのはわかるんだが、
callbackとqueue, typingの使い方がイマイチよくわからん…