アクセスの競合状態

withdrawやdepositを同時に行った場合、データの不整合が発生する可能性がある。これをデータの競合状態という。

class BankAccount(ABC):

    balance: float

    def __init__(self, balance: float = 0):
        self.balance: float = balance

    # @abstractmethod
    def deposit(self, amount: float) -> None:
        ...

    # @abstractmethod
    def withdraw(self, amount: float) -> None:
        ...
import sys
import time
from threading import Thread
import typing as T

from bank_account import BankAccount
from unsynced_bank_account import UnsyncedBankAccount

THREAD_DELAY = 1e-16

class ATM(Thread):
    def __init__(self, bank_account: BankAccount):
        super().__init__()
        self.bank_account = bank_account

    def transaction(self) -> None:
        self.bank_account.deposit(10)
        time.sleep(0.001)
        self.bank_account.withdraw(10)

    def run(self) -> None:
        self.transaction()

def test_atms(account: BankAccount, atm_number: int = 1000) -> None:
    atms: T.List[ATM] = []
    for _ in range(atm_number):
        atm = ATM(account)
        atms.append(atm)
        atm.start()

    for atm in atms:
        atm.join()

if __name__ == "__main__":
    atm_number = 1000
    sys.setswitchinterval(THREAD_DELAY)

    account = UnsyncedBankAccount()
    test_atms(account, atm_number=atm_number)

    print("Balance of unsynced account after concurrent transactions:")
    print(f"Actual : {account.balance}\nExpected: 0")
from bank_account import BankAccount

class UnsyncedBankAccount(BankAccount):
    def deposit(self, amount: float) -> None:
        if amount > 0:
            self.balance += amount
        else:
            raise ValueError("You can't deposit a negative amount of money")

    def withdraw(self, amount: float) -> None:
        if 0 < amount <= self.balance:
            self.balance -= amount
        else:
            raise ValueError("Account does not have sufficient funds")

Fork/Joinによる並列処理

データをワーカー分にchunkするのがポイントか。

import typing as T
import random
from multiprocessing.pool import ThreadPool

Summary = T.Mapping[int, int]

def process_votes(pile: T.List[int], worker_count: int = 4) -> Summary:
    vote_count = len(pile)
    vpw = vote_count // worker_count

    vote_piles = [
        pile[i * vpw:(i + 1) * vpw]
        for i in range(worker_count)
    ]

    with ThreadPool(worker_count) as pool:
        worker_summaries = pool.map(process_pile, vote_piles)
    
    total_summary = {}
    for worker_summary in worker_summaries:
        print(f"Votes from staff member: {worker_summary}")
        for candidate, count in worker_summary.items():
            if candidate in total_summary:
                total_summary[candidate] += count
            else:
                total_summary[candidate] = count
    
    return total_summary

def process_pile(pile: T.List[int]) -> Summary:
    summary = {}
    for vote in pile:
        if vote in summary:
            summary[vote] += 1
        else:
            summary[vote] = 1
    return summary

if __name__ == "__main__":
    num_candidates = 3
    num_voters = 100
    pile = [random.randint(1, num_candidates) for _ in range(num_voters)]
    counts = process_votes(pile)
    print(f"Total number of votes: {counts}")

$ python3 count_votes_sequential.py
Votes from staff member: {3: 9, 2: 13, 1: 3}
Votes from staff member: {1: 5, 3: 8, 2: 12}
Votes from staff member: {3: 10, 2: 5, 1: 10}
Votes from staff member: {1: 10, 3: 8, 2: 7}
Total number of votes: {3: 35, 2: 37, 1: 28}

これは凄い… これをRustで書く

use rand::Rng;
use std::collections::HashMap;
use std::sync::Mutex;

static Summaries: Mutex<Vec<HashMap<u32, u32>>> = Mutex::new(Vec::new());

fn process_vote(pile: Vec<u32>, worker_count: u32) {
    let vote_count = pile.len();
    let vpw = vote_count / worker_count as usize;
    println!("{}", vpw);
    let mut vote_piles : Vec<Vec<u32>> = Vec::new();
    for i in 0..worker_count {
        let chunk: Vec<u32> = (&pile[i as usize*vpw ..(i+1)as usize * vpw]).to_vec();
        vote_piles.push(chunk)
    }
    println!("{:?}", vote_piles);

    let pool = rayon::ThreadPoolBuilder::new().num_threads(worker_count as usize).build().unwrap();
    for vote_pile in vote_piles {
        pool.install(move || {
            let result = process_pile(vote_pile); 
            Summaries.lock().unwrap().push(result);
        });
    }
    println!("{:?}", Summaries);
    let mut total_summary = HashMap::new();
    for summary in Summaries.lock().unwrap().clone().into_iter() {
        for (candidate, count) in summary {
            if total_summary.get(&candidate) != None {
                let n = total_summary.get(&candidate).unwrap();
                total_summary.insert(candidate, count + n);
            } else {
                total_summary.insert(candidate, count);
            }
        }
    }
    println!("{:?}", total_summary);
}

fn process_pile(pile: Vec<u32>) -> HashMap<u32, u32> {
    let mut summary = HashMap::new();
    for vote in pile {
        if summary.get(&vote) != None {
            let count = summary.get(&vote).unwrap();
            summary.insert(vote, count + 1);
        } else {
            summary.insert(vote, 1);
        }
    }
    summary
}

fn main(){    
    
    let num_voters = 500;
    let num_candidate = 3;
    
    let mut pile: Vec<u32> = Vec::new();
    let mut rnd = rand::thread_rng();
    for _ in 0..num_voters {
        pile.push(rnd.gen_range(1..(num_candidate + 1)))
    }
    let summary = process_pile(pile.clone());
    println!("{:?}", summary);

    process_vote(pile, 4);
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
Running `target/debug/parallel`
{2: 172, 1: 172, 3: 156}
125
Mutex { data: [{1: 49, 2: 39, 3: 37}, {2: 49, 1: 38, 3: 38}, {2: 43, 1: 39, 3: 43}, {2: 41, 1: 46, 3: 38}], poisoned: false, .. }
{2: 172, 3: 156, 1: 172}

おおおおおおおおお、なんかすげえええええええええええええ

ランダムな連想配列の計測

選挙投票のカウントを想定したもの

import typing as T
import random

Summary = T.Mapping[int, int]

def process_votes(pile: T.List[int]) -> Summary:
    summary = {}
    for vote in pile:
        if vote in summary:
            summary[vote] += 1
        else:
            summary[vote] = 1
    return summary

if __name__ == "__main__":
    num_candidates = 3
    num_voters = 100
    pile = [random.randint(1, num_candidates) for _ in range(num_voters)]
    print(pile)
    counts = process_votes(pile)
    print(f"Total number of votes: {counts}")

$ python3 count_votes_sequential.py
[1, 1, 2, 1, 2, 3, 2, 2, 1, 2, 2, 3, 3, 2, 3, 1, 2, 2, 2, 3, 1, 1, 2, 3, 1, 2, 3, 3, 2, 2, 3, 2, 1, 3, 2, 2, 3, 3, 2, 3, 3, 3, 2, 3, 2, 1, 1, 3, 2, 3, 3, 2, 2, 1, 1, 1, 1, 2, 3, 3, 1, 2, 2, 3, 2, 1, 2, 2, 1, 3, 1, 1, 2, 1, 1, 2, 3, 3, 1, 2, 1, 2, 3, 3, 3, 1, 2, 2, 2, 1, 1, 3, 2, 1, 2, 2, 3, 2, 3, 1]
Total number of votes: {1: 29, 2: 40, 3: 31}

Rustで書き直す
範囲指定のランダムは rnd.gen_range となる。

use std::time::Instant;
use threadpool::ThreadPool;
use rand::Rng;
use std::collections::HashMap;

fn process_votes(pile: Vec<u32>) -> HashMap<u32, u32> {
    let mut summary = HashMap::new();
    for vote in pile {
        if summary.get(&vote) != None {
            let count = summary.get(&vote).unwrap();
            summary.insert(vote, count + 1);
        } else {
            summary.insert(vote, 1);
        }
    }
    summary
}


fn main(){    
    
    let num_voters = 100;
    let num_candidate = 3;
    
    let mut pile: Vec<u32> = Vec::new();
    let mut rnd = rand::thread_rng();
    for _ in 0..num_voters {
        pile.push(rnd.gen_range(1..(num_candidate + 1)))
    }
    let summary = process_votes(pile);
    println!("{:?}", summary);
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.29s
Running `target/debug/parallel`
{3: 42, 2: 33, 1: 25}

ここまではOK

【Rust】ThreadPoolによる文字検索

全然早くないが使い方間違ってる?

use std::env;
use std::fs;
use std::path::Path;
use std::fs::File;
use std::io::{self, BufRead, BufReader};
use std::time::Instant;
use threadpool::ThreadPool;

fn search_file(filename: String, search_string: String) -> Result<bool, Box<dyn std::error::Error>> {
    for line in BufReader::new(File::open(filename.clone())?).lines() {
        if line.unwrap().contains(&search_string) {
            return Ok(true)
        }
    }
    Ok(false)
}

fn get_files(dirname: &str) -> io::Result<Vec<String>> {
    let mut entries: Vec<String> = Vec::new();
    let dir = Path::new(dirname);
    if dir.is_dir(){
        for entry in fs::read_dir(dirname)? {
            let e = entry?;
            let p = e.path().file_name().unwrap().to_string_lossy().into_owned();
            entries.push(p);
        }
    }
    Ok(entries)
}

fn search_files_sequentially(file_locations: String, search_string: String) {
    let entries: Vec<String> = get_files(&file_locations).unwrap();
    for entry in entries {
        let pass = format!("{}{}", file_locations, entry);
        if search_file(pass, search_string.clone()).unwrap() {
            println!("Found word in file: {}", entry);
        }
    }
}

fn search_files_concurrent(file_locations: String, search_string: String) {
    let entries: Vec<String> = get_files(&file_locations).unwrap();
    let pool = rayon::ThreadPoolBuilder::new().num_threads(4).build().unwrap();
    for entry in entries {
        let pass = format!("{}{}", file_locations, entry);
        let mut value = search_string.clone();
        pool.install(move || {
            if search_file(pass, value).unwrap() {
                println!("Found word in file: {}", entry);
            }
        });
    }
}

fn main(){    
    let now = Instant::now();
    search_files_sequentially("./src/".to_string(), "queue".to_string());
    println!("{:?}", now.elapsed());

    let now = Instant::now();
    search_files_concurrent("./src/".to_string(), "queue".to_string());
    println!("{:?}", now.elapsed());
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.02s
Running `target/debug/parallel queue`
Found word in file: pipeline.rs
Found word in file: main.rs
422.486µs
Found word in file: pipeline.rs
Found word in file: main.rs
701.462µs

【並列処理】ループ処理をスレッドで実行する

import os
import time
import glob
import typing as T
from multiprocessing.pool import ThreadPool

def search_file(file_location: str, search_string: str) -> bool:
    with open(file_location, "r", encoding="utf8") as file:
        return search_string in file.read()

def search_files_concurrently(file_locations: T.List[str], search_string: str) -> None:
    with ThreadPool() as pool:
        results = pool.starmap(search_file,
            ((file_location, search_string) for file_location in file_locations))
        for result, file_name in zip(results, file_locations):
            if result:
                print(f"Found string in file: `{file_name}`")

def search_files_sequentially(file_locations: T.List[str], search_string: str) -> None:
    for file_name in file_locations:
        result = search_file(file_name, search_string)
        if result:
            print(f"Found word in file: `{file_name}`")

if __name__ == "__main__":
    file_locations = list(
        glob.glob(f"{os.path.abspath(os.getcwd())}/*.py"))
    search_string = input("what word are you trying to find?: ")

    start_time = time.perf_counter()
    search_files_concurrently(file_locations, search_string)
    process_time = time.perf_counter() - start_time
    print(f"PROCESS TIME: {process_time}")

vagrant@vagrant:~/dev/rust/parallel/python$ python3 find_files_sequential.py
what word are you trying to find?: queue
Found word in file: `/home/vagrant/dev/rust/parallel/python/thread_pool.py`
Found word in file: `/home/vagrant/dev/rust/parallel/python/pipeline.py`
Found word in file: `/home/vagrant/dev/rust/parallel/python/message_queue.py`
PROCESS TIME: 0.004676494048908353
vagrant@vagrant:~/dev/rust/parallel/python$ python3 find_files_concurrent.py
what word are you trying to find?: queue
Found string in file: `/home/vagrant/dev/rust/parallel/python/thread_pool.py`
Found string in file: `/home/vagrant/dev/rust/parallel/python/pipeline.py`
Found string in file: `/home/vagrant/dev/rust/parallel/python/message_queue.py`
PROCESS TIME: 0.011579621117562056

速度的にはスレッドの方が遅いような気がするが、どうなんだろうか…

【Rust】特定のディレクトリから検索文字列を探す

use std::fs;
use std::path::Path;
use std::fs::File;
use std::io::{self, BufRead, BufReader};

fn search_file(filename: String, search_string: String) -> Result<bool, Box<dyn std::error::Error>> {
    for line in BufReader::new(File::open(filename.clone())?).lines() {
        if line.unwrap().contains(&search_string) {
            return Ok(true)
        }
    }
    Ok(false)
}

fn get_files(dirname: &str) -> io::Result<Vec<String>> {
    let mut entries: Vec<String> = Vec::new();
    let dir = Path::new(dirname);
    if dir.is_dir(){
        for entry in fs::read_dir(dirname)? {
            let e = entry?;
            let p = e.path().file_name().unwrap().to_string_lossy().into_owned();
            entries.push(p);
        }
    }
    Ok(entries)
}

fn search_files_sequentially(file_locations: String, search_string: String) {
    let entries: Vec<String> = get_files(&file_locations).unwrap();
    for entry in entries {
        let pass = format!("{}{}", file_locations, entry);
        if search_file(pass, search_string.clone()).unwrap() {
            println!("Found word in file: {}", entry);
        }
    }
}

fn main(){    
    search_files_sequentially("./src/".to_string(), "password".to_string());
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.22s
Running `target/debug/parallel queue`
Found word in file: password_cracking.rs
Found word in file: main.rs
Found word in file: password_cracking_parallel.rs

【Rust】ディレクトリのパス一覧をStringでVectorに入れる

時間かかったー

fn main() -> Result<(), Box<dyn std::error::Error>>{    

    let mut entries: Vec<String> = Vec::new();
    let dir = Path::new("./src");
    if dir.is_dir(){
        for entry in fs::read_dir(dir)? {
            let e = entry?;
            let p = e.path().file_name().unwrap().to_string_lossy().into_owned();
            entries.push(p);
        }
    }
    println!("{:?}", entries);
    Ok(())
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.31s
Running `target/debug/parallel queue`
[“pipeline.rs”, “unix_domain_socket.rs”, “password_cracking.rs”, “shared_ipc.rs”, “main.rs”, “child_processes.rs”, “multithreading.rs”, “queue.rs”, “thread_pool.rs”, “password_cracking_parallel.rs”, “unixstream.rs”, “pipe.rs”]

【Rust】コマンドライン引数を取得する

use std::env;

fn main() {    
    let args: Vec<String> = env::args().collect();
    println!("{:?}", args[1]);
}

$ cargo run queue
Compiling parallel v0.1.0 (/home/vagrant/dev/rust/parallel)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.15s
Running `target/debug/parallel queue`
“queue”

【Python】ファイルから文字列を検索

短いコードなんだけと、割と高度なことをやっている…

import os
import time
import glob
import typing as T

def search_file(file_location: str, search_string: str) -> bool:
    with open(file_location, "r", encoding="utf8") as file:
        return search_string in file.read()
    
def search_files_sequentially(file_locations: T.List[str], search_string: str) -> None:
    for file_name in file_locations:
        result = search_file(file_name, search_string)
        if result:
            print(f"Found word in file: `{file_name}`")

if __name__ == "__main__":
    file_locations = list(
        glob.glob(f"{os.path.abspath(os.getcwd())}/*.py"))
    search_string = input("what word are you trying to find?: ")

    start_time = time.perf_counter()
    search_files_sequentially(file_locations, search_string)
    process_time = time.perf_counter() - start_time
    print(f"PROCESS TIME: {process_time}")

$ python3 find_files_sequential.py
what word are you trying to find?: queue
Found word in file: `/home/vagrant/dev/rust/parallel/python/thread_pool.py`
Found word in file: `/home/vagrant/dev/rust/parallel/python/pipeline.py`
Found word in file: `/home/vagrant/dev/rust/parallel/python/message_queue.py`
PROCESS TIME: 0.004676494048908353

OK, これをRustで書きたい…

【Rust】複数スレッドを同時にループ処理で走らせる【並列処理】

見た目はかなり悪いが、、、thread::spawnで同時にスレッドを走らせることができる。
各スレッドでは全てループ処理で待ち受けておいて、目的を達成したらbreakする。

static Washer_queue: Mutex<VecDeque<u32>> = Mutex::new(VecDeque::new());
static Dryer_queue: Mutex<VecDeque<u32>> = Mutex::new(VecDeque::new());
static Folder_queue: Mutex<VecDeque<u32>> = Mutex::new(VecDeque::new());
static Done_queue: Mutex<VecDeque<u32>> = Mutex::new(VecDeque::new());

fn assemble_laundry(n: u32) {
    for i in 1..(n+1) {
        Washer_queue.lock().unwrap().push_back(i);
    }
}

fn washer() {
    let w = Washer_queue.lock().unwrap().pop_front();
    if w != None {
        println!("washing {:?}...", w.unwrap());
        thread::sleep(Duration::from_millis(300));
        Dryer_queue.lock().unwrap().push_back(w.unwrap());
    }
}

fn dryer() {
    let d = Dryer_queue.lock().unwrap().pop_front();
    if d != None {
        println!("Drying {:?}...", d.unwrap());
        thread::sleep(Duration::from_millis(200));
        Folder_queue.lock().unwrap().push_back(d.unwrap());
    }
}

fn folder() {
    let f = Folder_queue.lock().unwrap().pop_front();
    if f != None {
        println!("Folding {:?}...", f.unwrap());
        thread::sleep(Duration::from_millis(100));
        Done_queue.lock().unwrap().push_back(f.unwrap());
    }
}

fn main() {    
    assemble_laundry(4);
    println!("{:?}", Washer_queue);
    let wash_handle = thread::spawn(|| {
        loop {
            if Washer_queue.lock().unwrap().len() == 0 {
                break;
            }
            washer();
        }
    });
    let dry_handle = thread::spawn(|| {
        loop {
            if Done_queue.lock().unwrap().len() == 4 {
                break;
            }
            dryer();
        }
    });
    let fold_handle = thread::spawn(|| {
        loop {
            if Done_queue.lock().unwrap().len() == 4{
                break;
            }
            folder();
        }
    });
    wash_handle.join().unwrap();
    dry_handle.join().unwrap();
    fold_handle.join().unwrap();
    println!("Washer {:?}", Washer_queue);
    println!("Dryer {:?}", Dryer_queue);
    println!("Folder {:?}", Folder_queue);
    println!("All work finished");
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.20s
Running `target/debug/parallel`
Mutex { data: [1, 2, 3, 4], poisoned: false, .. }
washing 1…
washing 2…
Drying 1…
Folding 1…
washing 3…
Drying 2…
Folding 2…
washing 4…
Drying 3…
Folding 3…
Drying 4…
Folding 4…
Mutex { data: [], poisoned: false, .. }
Mutex { data: [], poisoned: false, .. }
Mutex { data: [], poisoned: false, .. }
All work finished

もうちょっとうまい書き方をしたいが、やりたいこと自体はできている。。