import time
import queue
import typing as T
from threading import Thread, current_thread
Callback = T.Callable[..., None]
Task = T.Tuple[Callback, T.Any, T.Any]
TaskQueue = queue.Queue
class Worker(Thread):
def __init__(self, tasks: queue.Queue[Task]):
super().__init__()
self.tasks = tasks
def run(self) -> None:
while True:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception as e:
print(e)
self.tasks.task_done()
class ThreadPool:
def __init__(self, num_threads: int):
self.tasks: TaskQueue = queue.Queue(num_threads)
self.num_threads = num_threads
for _ in range(self.num_threads):
worker = Worker(self.tasks)
worker.setDaemon(True)
worker.start()
def submit(self, func: Callback, *args, **kargs) -> None:
self.tasks.put((func, args, kargs))
def wait_completion(self) -> None:
self.tasks.join()
def cpu_waster(i: int) -> None:
name = current_thread().getName()
print(f"{name} doing {i} work")
time.sleep(3)
def main() -> None:
pool = ThreadPool(num_threads=5)
for i in range(20):
pool.submit(cpu_waster, i)
print("All work requests sent")
pool.wait_completion()
print("All work completed")
if __name__ == "__main__":
main()
name = current_thread().getName()
Thread-2 doing 0 work
Thread-5 doing 1 work
Thread-4 doing 2 work
Thread-1 doing 3 work
Thread-3 doing 4 work
Thread-2 doing 5 work
Thread-5 doing 6 work
Thread-1 doing 8 work
Thread-4 doing 7 work
Thread-3 doing 9 work
Thread-5 doing 10 work
Thread-1 doing 11 work
Thread-4 doing 12 work
Thread-2 doing 14 work
Thread-3 doing 13 work
All work requests sent
Thread-1 doing 15 work
Thread-4 doing 16 work
Thread-2 doing 17 work
Thread-3 doing 18 work
Thread-5 doing 19 work
All work completed
スレッドプールを実装してそれぞれのスレッドで並列処理を行なっているのはわかるんだが、
callbackとqueue, typingの使い方がイマイチよくわからん…