Fork/Joinによる並列処理

データをワーカー分にchunkするのがポイントか。

import typing as T
import random
from multiprocessing.pool import ThreadPool

Summary = T.Mapping[int, int]

def process_votes(pile: T.List[int], worker_count: int = 4) -> Summary:
    vote_count = len(pile)
    vpw = vote_count // worker_count

    vote_piles = [
        pile[i * vpw:(i + 1) * vpw]
        for i in range(worker_count)
    ]

    with ThreadPool(worker_count) as pool:
        worker_summaries = pool.map(process_pile, vote_piles)
    
    total_summary = {}
    for worker_summary in worker_summaries:
        print(f"Votes from staff member: {worker_summary}")
        for candidate, count in worker_summary.items():
            if candidate in total_summary:
                total_summary[candidate] += count
            else:
                total_summary[candidate] = count
    
    return total_summary

def process_pile(pile: T.List[int]) -> Summary:
    summary = {}
    for vote in pile:
        if vote in summary:
            summary[vote] += 1
        else:
            summary[vote] = 1
    return summary

if __name__ == "__main__":
    num_candidates = 3
    num_voters = 100
    pile = [random.randint(1, num_candidates) for _ in range(num_voters)]
    counts = process_votes(pile)
    print(f"Total number of votes: {counts}")

$ python3 count_votes_sequential.py
Votes from staff member: {3: 9, 2: 13, 1: 3}
Votes from staff member: {1: 5, 3: 8, 2: 12}
Votes from staff member: {3: 10, 2: 5, 1: 10}
Votes from staff member: {1: 10, 3: 8, 2: 7}
Total number of votes: {3: 35, 2: 37, 1: 28}

これは凄い… これをRustで書く

use rand::Rng;
use std::collections::HashMap;
use std::sync::Mutex;

static Summaries: Mutex<Vec<HashMap<u32, u32>>> = Mutex::new(Vec::new());

fn process_vote(pile: Vec<u32>, worker_count: u32) {
    let vote_count = pile.len();
    let vpw = vote_count / worker_count as usize;
    println!("{}", vpw);
    let mut vote_piles : Vec<Vec<u32>> = Vec::new();
    for i in 0..worker_count {
        let chunk: Vec<u32> = (&pile[i as usize*vpw ..(i+1)as usize * vpw]).to_vec();
        vote_piles.push(chunk)
    }
    println!("{:?}", vote_piles);

    let pool = rayon::ThreadPoolBuilder::new().num_threads(worker_count as usize).build().unwrap();
    for vote_pile in vote_piles {
        pool.install(move || {
            let result = process_pile(vote_pile); 
            Summaries.lock().unwrap().push(result);
        });
    }
    println!("{:?}", Summaries);
    let mut total_summary = HashMap::new();
    for summary in Summaries.lock().unwrap().clone().into_iter() {
        for (candidate, count) in summary {
            if total_summary.get(&candidate) != None {
                let n = total_summary.get(&candidate).unwrap();
                total_summary.insert(candidate, count + n);
            } else {
                total_summary.insert(candidate, count);
            }
        }
    }
    println!("{:?}", total_summary);
}

fn process_pile(pile: Vec<u32>) -> HashMap<u32, u32> {
    let mut summary = HashMap::new();
    for vote in pile {
        if summary.get(&vote) != None {
            let count = summary.get(&vote).unwrap();
            summary.insert(vote, count + 1);
        } else {
            summary.insert(vote, 1);
        }
    }
    summary
}

fn main(){    
    
    let num_voters = 500;
    let num_candidate = 3;
    
    let mut pile: Vec<u32> = Vec::new();
    let mut rnd = rand::thread_rng();
    for _ in 0..num_voters {
        pile.push(rnd.gen_range(1..(num_candidate + 1)))
    }
    let summary = process_pile(pile.clone());
    println!("{:?}", summary);

    process_vote(pile, 4);
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
Running `target/debug/parallel`
{2: 172, 1: 172, 3: 156}
125
Mutex { data: [{1: 49, 2: 39, 3: 37}, {2: 49, 1: 38, 3: 38}, {2: 43, 1: 39, 3: 43}, {2: 41, 1: 46, 3: 38}], poisoned: false, .. }
{2: 172, 3: 156, 1: 172}

おおおおおおおおお、なんかすげえええええええええええええ