データをワーカー分にchunkするのがポイントか。
import typing as T
import random
from multiprocessing.pool import ThreadPool
Summary = T.Mapping[int, int]
def process_votes(pile: T.List[int], worker_count: int = 4) -> Summary:
vote_count = len(pile)
vpw = vote_count // worker_count
vote_piles = [
pile[i * vpw:(i + 1) * vpw]
for i in range(worker_count)
]
with ThreadPool(worker_count) as pool:
worker_summaries = pool.map(process_pile, vote_piles)
total_summary = {}
for worker_summary in worker_summaries:
print(f"Votes from staff member: {worker_summary}")
for candidate, count in worker_summary.items():
if candidate in total_summary:
total_summary[candidate] += count
else:
total_summary[candidate] = count
return total_summary
def process_pile(pile: T.List[int]) -> Summary:
summary = {}
for vote in pile:
if vote in summary:
summary[vote] += 1
else:
summary[vote] = 1
return summary
if __name__ == "__main__":
num_candidates = 3
num_voters = 100
pile = [random.randint(1, num_candidates) for _ in range(num_voters)]
counts = process_votes(pile)
print(f"Total number of votes: {counts}")
$ python3 count_votes_sequential.py
Votes from staff member: {3: 9, 2: 13, 1: 3}
Votes from staff member: {1: 5, 3: 8, 2: 12}
Votes from staff member: {3: 10, 2: 5, 1: 10}
Votes from staff member: {1: 10, 3: 8, 2: 7}
Total number of votes: {3: 35, 2: 37, 1: 28}
これは凄い… これをRustで書く
use rand::Rng;
use std::collections::HashMap;
use std::sync::Mutex;
static Summaries: Mutex<Vec<HashMap<u32, u32>>> = Mutex::new(Vec::new());
fn process_vote(pile: Vec<u32>, worker_count: u32) {
let vote_count = pile.len();
let vpw = vote_count / worker_count as usize;
println!("{}", vpw);
let mut vote_piles : Vec<Vec<u32>> = Vec::new();
for i in 0..worker_count {
let chunk: Vec<u32> = (&pile[i as usize*vpw ..(i+1)as usize * vpw]).to_vec();
vote_piles.push(chunk)
}
println!("{:?}", vote_piles);
let pool = rayon::ThreadPoolBuilder::new().num_threads(worker_count as usize).build().unwrap();
for vote_pile in vote_piles {
pool.install(move || {
let result = process_pile(vote_pile);
Summaries.lock().unwrap().push(result);
});
}
println!("{:?}", Summaries);
let mut total_summary = HashMap::new();
for summary in Summaries.lock().unwrap().clone().into_iter() {
for (candidate, count) in summary {
if total_summary.get(&candidate) != None {
let n = total_summary.get(&candidate).unwrap();
total_summary.insert(candidate, count + n);
} else {
total_summary.insert(candidate, count);
}
}
}
println!("{:?}", total_summary);
}
fn process_pile(pile: Vec<u32>) -> HashMap<u32, u32> {
let mut summary = HashMap::new();
for vote in pile {
if summary.get(&vote) != None {
let count = summary.get(&vote).unwrap();
summary.insert(vote, count + 1);
} else {
summary.insert(vote, 1);
}
}
summary
}
fn main(){
let num_voters = 500;
let num_candidate = 3;
let mut pile: Vec<u32> = Vec::new();
let mut rnd = rand::thread_rng();
for _ in 0..num_voters {
pile.push(rnd.gen_range(1..(num_candidate + 1)))
}
let summary = process_pile(pile.clone());
println!("{:?}", summary);
process_vote(pile, 4);
}
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
Running `target/debug/parallel`
{2: 172, 1: 172, 3: 156}
125
Mutex { data: [{1: 49, 2: 39, 3: 37}, {2: 49, 1: 38, 3: 38}, {2: 43, 1: 39, 3: 43}, {2: 41, 1: 46, 3: 38}], poisoned: false, .. }
{2: 172, 3: 156, 1: 172}
おおおおおおおおお、なんかすげえええええええええええええ