データをワーカー分にchunkするのがポイントか。
import typing as T import random from multiprocessing.pool import ThreadPool Summary = T.Mapping[int, int] def process_votes(pile: T.List[int], worker_count: int = 4) -> Summary: vote_count = len(pile) vpw = vote_count // worker_count vote_piles = [ pile[i * vpw:(i + 1) * vpw] for i in range(worker_count) ] with ThreadPool(worker_count) as pool: worker_summaries = pool.map(process_pile, vote_piles) total_summary = {} for worker_summary in worker_summaries: print(f"Votes from staff member: {worker_summary}") for candidate, count in worker_summary.items(): if candidate in total_summary: total_summary[candidate] += count else: total_summary[candidate] = count return total_summary def process_pile(pile: T.List[int]) -> Summary: summary = {} for vote in pile: if vote in summary: summary[vote] += 1 else: summary[vote] = 1 return summary if __name__ == "__main__": num_candidates = 3 num_voters = 100 pile = [random.randint(1, num_candidates) for _ in range(num_voters)] counts = process_votes(pile) print(f"Total number of votes: {counts}")
$ python3 count_votes_sequential.py
Votes from staff member: {3: 9, 2: 13, 1: 3}
Votes from staff member: {1: 5, 3: 8, 2: 12}
Votes from staff member: {3: 10, 2: 5, 1: 10}
Votes from staff member: {1: 10, 3: 8, 2: 7}
Total number of votes: {3: 35, 2: 37, 1: 28}
これは凄い…