データをワーカー分にchunkするのがポイントか。
import typing as T import random from multiprocessing.pool import ThreadPool Summary = T.Mapping[int, int] def process_votes(pile: T.List[int], worker_count: int = 4) -> Summary: vote_count = len(pile) vpw = vote_count // worker_count vote_piles = [ pile[i * vpw:(i + 1) * vpw] for i in range(worker_count) ] with ThreadPool(worker_count) as pool: worker_summaries = pool.map(process_pile, vote_piles) total_summary = {} for worker_summary in worker_summaries: print(f"Votes from staff member: {worker_summary}") for candidate, count in worker_summary.items(): if candidate in total_summary: total_summary[candidate] += count else: total_summary[candidate] = count return total_summary def process_pile(pile: T.List[int]) -> Summary: summary = {} for vote in pile: if vote in summary: summary[vote] += 1 else: summary[vote] = 1 return summary if __name__ == "__main__": num_candidates = 3 num_voters = 100 pile = [random.randint(1, num_candidates) for _ in range(num_voters)] counts = process_votes(pile) print(f"Total number of votes: {counts}")
$ python3 count_votes_sequential.py
Votes from staff member: {3: 9, 2: 13, 1: 3}
Votes from staff member: {1: 5, 3: 8, 2: 12}
Votes from staff member: {3: 10, 2: 5, 1: 10}
Votes from staff member: {1: 10, 3: 8, 2: 7}
Total number of votes: {3: 35, 2: 37, 1: 28}
これは凄い… これをRustで書く
use rand::Rng; use std::collections::HashMap; use std::sync::Mutex; static Summaries: Mutex<Vec<HashMap<u32, u32>>> = Mutex::new(Vec::new()); fn process_vote(pile: Vec<u32>, worker_count: u32) { let vote_count = pile.len(); let vpw = vote_count / worker_count as usize; println!("{}", vpw); let mut vote_piles : Vec<Vec<u32>> = Vec::new(); for i in 0..worker_count { let chunk: Vec<u32> = (&pile[i as usize*vpw ..(i+1)as usize * vpw]).to_vec(); vote_piles.push(chunk) } println!("{:?}", vote_piles); let pool = rayon::ThreadPoolBuilder::new().num_threads(worker_count as usize).build().unwrap(); for vote_pile in vote_piles { pool.install(move || { let result = process_pile(vote_pile); Summaries.lock().unwrap().push(result); }); } println!("{:?}", Summaries); let mut total_summary = HashMap::new(); for summary in Summaries.lock().unwrap().clone().into_iter() { for (candidate, count) in summary { if total_summary.get(&candidate) != None { let n = total_summary.get(&candidate).unwrap(); total_summary.insert(candidate, count + n); } else { total_summary.insert(candidate, count); } } } println!("{:?}", total_summary); } fn process_pile(pile: Vec<u32>) -> HashMap<u32, u32> { let mut summary = HashMap::new(); for vote in pile { if summary.get(&vote) != None { let count = summary.get(&vote).unwrap(); summary.insert(vote, count + 1); } else { summary.insert(vote, 1); } } summary } fn main(){ let num_voters = 500; let num_candidate = 3; let mut pile: Vec<u32> = Vec::new(); let mut rnd = rand::thread_rng(); for _ in 0..num_voters { pile.push(rnd.gen_range(1..(num_candidate + 1))) } let summary = process_pile(pile.clone()); println!("{:?}", summary); process_vote(pile, 4); }
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
Running `target/debug/parallel`
{2: 172, 1: 172, 3: 156}
125
Mutex { data: [{1: 49, 2: 39, 3: 37}, {2: 49, 1: 38, 3: 38}, {2: 43, 1: 39, 3: 43}, {2: 41, 1: 46, 3: 38}], poisoned: false, .. }
{2: 172, 3: 156, 1: 172}
おおおおおおおおお、なんかすげえええええええええええええ