【並列処理】プロデューサー/コンシューマー問題

produceとcomsumeが同じではないので、結果的に、速度が遅い方に引っ張られることになる。

import time
from threading import Thread, Semaphore, Lock

SIZE = 5
BUFFER = ["" for i in range(SIZE)]
producer_idx: int = 0

mutex = Lock()
empty = Semaphore(SIZE)
full = Semaphore(0)

class Producer(Thread):
    def __init__(self, name: str, maximum_items: int = 5):
        super().__init__()
        self.counter = 0
        self.name = name
        self.maximum_items = maximum_items

    def next_index(self, index: int) -> int:
        return (index + 1) % SIZE

    def run(self) -> None:
        global producer_idx
        while self.counter < self.maximum_items:
            empty.acquire()
            mutex.acquire()
            self.counter += 1
            BUFFER[producer_idx] = f"{self.name}-{self.counter}"
            print(f"{self.name} produced: "
                f"'{BUFFER[producer_idx]}' into slot {producer_idx}")
            producer_idx = self.next_index(producer_idx)
            mutex.release()
            full.release()
            time.sleep(1)

class Consumer(Thread):
    def __init__(self, name: str, maximum_items: int = 10):
        super().__init__()
        self.name = name
        self.idx = 0
        self.counter = 0
        self.maximum_items = maximum_items
    
    def next_index(self) -> int:
        return (self.idx + 1) % SIZE

    def run(self) -> None:
        while self.counter < self.maximum_items:
            full.acquire()
            mutex.acquire()
            item = BUFFER[self.idx]
            print(f"{self.name} consumed item: "
                f"'{item}' from slot {self.idx}")
            self.idx = self.next_index()
            self.counter += 1
            mutex.release()
            empty.release()
            time.sleep(2)

if __name__ == "__main__":
    threads = [
        Producer("SpongeBob"),
        Producer("Patrick"),
        Consumer("Squidward")
    ]

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

$ python3 producer_comsumer.py
SpongeBob produced: ‘SpongeBob-1’ into slot 0
Patrick produced: ‘Patrick-1’ into slot 1
Squidward consumed item: ‘SpongeBob-1’ from slot 0
SpongeBob produced: ‘SpongeBob-2’ into slot 2
Patrick produced: ‘Patrick-2’ into slot 3
Squidward consumed item: ‘Patrick-1’ from slot 1
Patrick produced: ‘Patrick-3’ into slot 4
SpongeBob produced: ‘SpongeBob-3’ into slot 0
Patrick produced: ‘Patrick-4’ into slot 1
Squidward consumed item: ‘SpongeBob-2’ from slot 2
SpongeBob produced: ‘SpongeBob-4’ into slot 2
Squidward consumed item: ‘Patrick-2’ from slot 3
Patrick produced: ‘Patrick-5’ into slot 3
Squidward consumed item: ‘Patrick-3’ from slot 4
SpongeBob produced: ‘SpongeBob-5’ into slot 4
Squidward consumed item: ‘SpongeBob-3’ from slot 0

【並列処理】飢饉状態

スケジューリングの優先順位に偏りがあると、スレッドの実行が平等にならない。

import time
from threading import Thread
from lock_with_name import LockWithName
dumplings = 120

class Philosopher(Thread):
    def __init__(self, name: str,
            left_chopstick: LockWithName,
            right_chopstick: LockWithName):
        super().__init__()
        self.name = name
        self.left_chopstick = left_chopstick
        self.right_chopstick = right_chopstick

    def run(self) -> None:
        global dumplings

        dumplings_eaten = 0
        while dumplings > 0:
            self.left_chopstick.acquire()
            self.right_chopstick.acquire()
            if dumplings > 0:
                dumplings -= 1
                dumplings_eaten += 1
                time.sleep(1e-16)
            self.right_chopstick.release()
            self.left_chopstick.release()
        print(f"{self.name} took {dumplings_eaten} pieces")

if __name__ == "__main__":
    chopstick_a = LockWithName("chopstick_a")
    chopstick_b = LockWithName("chopstick_b")

    threads = []
    for i in range(10):
        threads.append(
            Philosopher(f"Philosopher #{i}", chopstick_a, chopstick_b))
    
    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

$ python3 starvation.py
Philosopher #4 took 51 pieces
Philosopher #9 took 0 pieces
Philosopher #1 took 0 pieces
Philosopher #3 took 0 pieces
Philosopher #7 took 0 pieces
Philosopher #8 took 0 pieces
Philosopher #6 took 0 pieces
Philosopher #2 took 0 pieces
Philosopher #5 took 0 pieces
Philosopher #0 took 69 pieces

【並列処理】ライブロック

状態に応じて処理を変えているが、これも結局、両方のスレッドが待ち状態になってしまい、ロックがかかってしまう。
一般的に、starvation(飢饉状態)という。

import time
from threading import Thread
from lock_with_name import LockWithName
dumplings = 20

class Philosopher(Thread):
    def __init__(self, name: str,
            left_chopstick: LockWithName,
            right_chopstick: LockWithName):
        super().__init__()
        self.name = name
        self.left_chopstick = left_chopstick
        self.right_chopstick = right_chopstick

    def run(self) -> None:
        global dumplings

        while dumplings > 0:
            self.left_chopstick.acquire()
            print(f"{self.left_chopstick.name} chopstick "
                f"grabbed by {self.name}")
            if self.right_chopstick.locked():
                print(f"{self.name} can not get the "
                    f"{self.right_chopstick.name} chopstick, "
                    f"politely concedes...")
            else:
                self.right_chopstick.acuire()
                print(f"{self.right_chopstick.name} chopstick "
                f"grabbed by {self.name}")
                dumplings -= 1
                print(f"{self.name} eats dumpling. Dumplings "
                    f"left: {dumplings}")
                time.sleep(1)
                self.right_chopstick.release()
            self.left_chopstick.release()

if __name__ == "__main__":
    chopstick_a = LockWithName("chopstick_a")
    chopstick_b = LockWithName("chopstick_b")

    philosopher_1 = Philosopher("Philosopher #1", chopstick_a, chopstick_b)
    philosopher_2 = Philosopher("Philosopher #2", chopstick_b, chopstick_a)

    philosopher_1.start()
    philosopher_2.start()

Philosopher #2 can not get the chopstick_a chopstick, politely concedes…
chopstick_b chopstick grabbed by Philosopher #2
Philosopher #2 can not get the chopstick_a chopstick, politely concedes…
chopstick_b chopstick grabbed by Philosopher #2
Philosopher #2 can not get the chopstick_a chopstick, politely concedes…
Philosopher #1 can not get the chopstick_b chopstick, politely concedes…
chopstick_a chopstick grabbed by Philosopher #1
Philosopher #1 can not get the chopstick_b chopstick, politely concedes…
chopstick_a chopstick grabbed by Philosopher #1
Philosopher #1 can not get the chopstick_b chopstick, politely concedes…
chopstick_a chopstick grabbed by Philosopher #1
Philosopher #1 can not get the chopstick_b chopstick, politely concedes…
chopstick_a chopsti^C

【並列処理】デッドロックの発生

デッドロックは他のタスクによって占有されて実行できない状態
left_chopstickとright_chopstickを別々に処理しているのでこのようなことが起きる。left_chopstickとright_chopstickを同時にacuqire, releaseすればロジック上、ロックされない。

from threading import Lock
from typing import Any, Tuple

class LockWithName:

    def __init__(self, name: str):
        self.name = name
        self._lock = Lock()

    def acquire(self) -> None:
        self._lock.acquire()

    def release(self) -> None:
        self._lock.release()

    def locked(self) -> bool:
        return self._lock.locked()

    def __enter__(self) -> None:
        self.acquire()

    def __exit__(self, *args: Tuple[Any]) -> None:
        self.release()
import time
from threading import Thread

from lock_with_name import LockWithName
dumplings = 20

class Philosopher(Thread):
    def __init__(self, name: str,
            left_chopstick: LockWithName,
            right_chopstick: LockWithName):
        super().__init__()
        self.name = name
        self.left_chopstick = left_chopstick
        self.right_chopstick = right_chopstick

    def run(self) -> None:
        global dumplings

        while dumplings > 0:
            self.left_chopstick.acquire()
            print(f"{self.left_chopstick.name} grabbed by {self.name} "
                f"now needs {self.right_chopstick.name}")
            self.right_chopstick.acquire()
            print(f"{self.right_chopstick.name} grabbed by {self.name}")
            dumplings -= 1
            print(f"{self.name} eats a dumpling. "
                f"Dumplings left: {dumplings}")
            self.right_chopstick.release()
            print(f"{self.right_chopstick.name} released by {self.name}")
            self.left_chopstick.release()
            print(f"{self.left_chopstick.name} released by {self.name}")
            print(f"{self.name} is thinking...")
            time.sleep(0.1)

if __name__ == "__main__":
    chopstick_a = LockWithName("chopstick_a")
    chopstick_b = LockWithName("chopstick_b")

    philosopher_1 = Philosopher("Philosopher #1", chopstick_a, chopstick_b)
    philosopher_2 = Philosopher("Philosopher #2", chopstick_b, chopstick_a)

    philosopher_1.start()
    philosopher_2.start()

$ python3 deadlock.py
chopstick_a grabbed by Philosopher #1 now needs chopstick_b
chopstick_b grabbed by Philosopher #2 now needs chopstick_a

止まってしまう

ここではwaiterというclassを追加しているが、別にwaiterではなく、whileのループの中で同時にacquire, releaseできれば良い。

import time
from threading import Thread, Lock
from lock_with_name import LockWithName
dumplings = 20

class Waiter:
    def __init__(self) -> None:
        self.mutex = Lock()

    def ask_for_chopsticks(self, 
                left_chopstick: LockWithName,
                right_chopstick: LockWithName) -> None:
        with self.mutex:
            left_chopstick.acquire()
            print(f"{left_chopstick.name} grabbed")
            right_chopstick.acquire()
            print(f"{right_chopstick.name} grabbed")

    def release_chopsticks(self, 
                left_chopstick: LockWithName,
                right_chopstick: LockWithName) -> None:
        
            right_chopstick.release()
            print(f"{right_chopstick.name} released")
            left_chopstick.release()
            print(f"{left_chopstick.name} released")


class Philosopher(Thread):
    def __init__(self, name: str, waiter: Waiter,
            left_chopstick: LockWithName,
            right_chopstick: LockWithName):
        super().__init__()
        self.name = name
        self.left_chopstick = left_chopstick
        self.right_chopstick = right_chopstick
        self.waiter = waiter

    def run(self) -> None:
        global dumplings

        while dumplings > 0:
            print(f"{self.name} asks waiter for chopsticks")
            self.waiter.ask_for_chopsticks (
                self.left_chopstick, self.right_chopstick)
            
            dumplings -= 1
            print(f"{self.name} eats a dumpling. "
                f"Dumpling left: {dumplings}")
            print(f"{self.name} returns chopsticks to waiter")
            self.waiter.release_chopsticks (
                self.left_chopstick, self.right_chopstick)
            time.sleep(0.1)

if __name__ == "__main__":
    chopstick_a = LockWithName("chopstick_a")
    chopstick_b = LockWithName("chopstick_b")

    waiter = Waiter()
    philosopher_1 = Philosopher("Philosopher #1", waiter, chopstick_a, chopstick_b)
    philosopher_2 = Philosopher("Philosopher #2", waiter, chopstick_b, chopstick_a)

    philosopher_1.start()
    philosopher_2.start()

$ python3 deadlock_arbitrator.py
Philosopher #1 asks waiter for chopsticks
chopstick_a grabbed
chopstick_b grabbed
Philosopher #1 eats a dumpling. Dumpling left: 19
Philosopher #1 returns chopsticks to waiter
chopstick_b released
chopstick_a released
….

【Rust】Mutexの使い方を理解したい

シングルスレッド
mutexのデータにアクセスするには、lockメソッドを使用してロックを獲得する

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m= {:?}", m);
}

スレッド間でsemaforeを利用しようとするとエラーになる。

use std::sync::{Mutex, Arc};
use std::thread;
use rand::Rng;
use std_semaphore::Semaphore;
use std::time::Duration;

static TOTAL_SPOTS: u32 = 3;

fn main() {
    let parked_cars = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    let sem = Semaphore::new(TOTAL_SPOTS.try_into().unwrap());

    for _ in 0..10 {

        let counter = Arc::clone(&parked_cars);

        let handle = thread::spawn(move || {

            let mut rng = rand::thread_rng(); 
            // enter
            sem.acquire();
            let mut num = counter.lock().unwrap();
            *num += 1;
            thread::sleep(Duration::from_millis(rng.gen_range(1..10)));

            // exit
            sem.release();
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *parked_cars.lock().unwrap());
}

15 | for _ in 0..10 {
| ————– inside of this loop

19 | let handle = thread::spawn(move || {
| ^^^^^^^ value moved into closure here, in previous iteration of loop

23 | sem.acquire();
| — use occurs due to use in closure

中々奥が深い..

【Rust】rustでセマフォ(semaphore)を使いたい

mainじゃなくて関数として使いたいな…

use std_semaphore::Semaphore;
use std::thread;
use std::time::Duration;

static TOTAL_SPOTS: u32 = 3;

fn main() {
    let sem = Semaphore::new(TOTAL_SPOTS.try_into().unwrap());

    let mut parked_cars: Vec<u32> = Vec::new();

    let car: u32 = 1;

    // enter
    sem.acquire();
    parked_cars.push(car);
    thread::sleep(Duration::from_millis(500));
    println!("{:?}", parked_cars);

    // exit
    parked_cars.clear();
    sem.release();

    println!("{:?}", parked_cars);
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.17s
Running `target/debug/parallel`
[1]
[]

【並列処理】セマフォ(semaphore)とmutex Lock

import typing as T
import time
import random
from threading import Thread, Semaphore, Lock

TOTAL_SPOTS = 3

class Garage:

    def __init__(self) -> None:
        self.semaphore = Semaphore(TOTAL_SPOTS)
        self.cars_lock = Lock()
        self.parked_cars: T.LIST[str] = []

    def count_parked_cars(self) -> int:
        return len(self.parked_cars)

    def enter(self, car_name: str) -> None:
        self.semaphore.acquire()
        self.cars_lock.acquire()
        self.parked_cars.append(car_name)
        print(f"{car_name} parked")
        self.cars_lock.release()

    def exit(self, car_name: str) -> None:
        self.cars_lock.acquire()
        self.parked_cars.remove(car_name)
        print(f"{car_name} leaving")
        self.semaphore.release()
        self.cars_lock.release()

def park_car(garage: Garage, car_name: str) -> None:
    garage.enter(car_name)
    time.sleep(random.uniform(1, 2))
    garage.exit(car_name)

def test_garage(garage: Garage, number_of_cars: int = 10) -> None:
    threads = []
    for car_num in range(number_of_cars):
        t = Thread(target=park_car,
            args=(garage, f"Car #{car_num}"))
        threads.append(t)
        t.start()

    for thread in threads:
        thread.join()

if __name__ == "__main__":
    number_of_cars = 10
    garage = Garage()
    test_garage(garage, number_of_cars)

    print("Number of parked cars after a busy day:")
    print(f"Actual: {garage.count_parked_cars()}\nExpected: 0")

e$ python3 semaphore.py
Car #0 parked
Car #1 parked
Car #2 parked
Car #1 leaving
Car #3 parked
Car #2 leaving
Car #4 parked
Car #0 leaving
Car #5 parked
Car #3 leaving
Car #6 parked
Car #4 leaving
Car #7 parked
Car #5 leaving
Car #8 parked
Car #6 leaving
Car #9 parked
Car #7 leaving
Car #8 leaving
Car #9 leaving
Number of parked cars after a busy day:
Actual: 0
Expected: 0

Mutexによるデットロック

from threading import Lock
from unsynced_bank_account import UnsyncedBankAccount

class SyncedBankAccount(UnsynceedBankAccount):
    def __init__(self, balance: float = 0):
        super().__init__(balance)
        self.mutex = Lock()

    def deposit(self, amount: float) -> None:
        self.mutex.acquire()
        super().deposit(amount)
        self.mutex.release()

    def withdraw(self, amount: float) -> None:
        self.mutex.acquire()
        super().withdraw(amount)
        self.mutex.release()
import sys
import time
from threading import Thread
import typing as T

from bank_account import BankAccount
from unsynced_bank_account import UnsyncedBankAccount
from synced_bank_account import SyncedBankAccount 

THREAD_DELAY = 1e-16

class ATM(Thread):
    def __init__(self, bank_account: BankAccount):
        super().__init__()
        self.bank_account = bank_account

    def transaction(self) -> None:
        self.bank_account.deposit(10)
        time.sleep(0.001)
        self.bank_account.withdraw(10)

    def run(self) -> None:
        self.transaction()

def test_atms(account: BankAccount, atm_number: int = 1000) -> None:
    atms: T.List[ATM] = []
    for _ in range(atm_number):
        atm = ATM(account)
        atms.append(atm)
        atm.start()

    for atm in atms:
        atm.join()

if __name__ == "__main__":
    atm_number = 1000
    sys.setswitchinterval(THREAD_DELAY)

    account = UnsyncedBankAccount()
    test_atms(account, atm_number=atm_number)

    print("Balance of unsynced account after concurrent transactions:")
    print(f"Actual : {account.balance}\nExpected: 0")

    account = SyncedBankAccount()
    test_atms(account, atm_number=atm_number)

    print("Balance of synced account after concurrent transactions:")
    print(f"Actual : {account.balance}\nExpected: 0")

アクセスの競合状態

withdrawやdepositを同時に行った場合、データの不整合が発生する可能性がある。これをデータの競合状態という。

class BankAccount(ABC):

    balance: float

    def __init__(self, balance: float = 0):
        self.balance: float = balance

    # @abstractmethod
    def deposit(self, amount: float) -> None:
        ...

    # @abstractmethod
    def withdraw(self, amount: float) -> None:
        ...
import sys
import time
from threading import Thread
import typing as T

from bank_account import BankAccount
from unsynced_bank_account import UnsyncedBankAccount

THREAD_DELAY = 1e-16

class ATM(Thread):
    def __init__(self, bank_account: BankAccount):
        super().__init__()
        self.bank_account = bank_account

    def transaction(self) -> None:
        self.bank_account.deposit(10)
        time.sleep(0.001)
        self.bank_account.withdraw(10)

    def run(self) -> None:
        self.transaction()

def test_atms(account: BankAccount, atm_number: int = 1000) -> None:
    atms: T.List[ATM] = []
    for _ in range(atm_number):
        atm = ATM(account)
        atms.append(atm)
        atm.start()

    for atm in atms:
        atm.join()

if __name__ == "__main__":
    atm_number = 1000
    sys.setswitchinterval(THREAD_DELAY)

    account = UnsyncedBankAccount()
    test_atms(account, atm_number=atm_number)

    print("Balance of unsynced account after concurrent transactions:")
    print(f"Actual : {account.balance}\nExpected: 0")
from bank_account import BankAccount

class UnsyncedBankAccount(BankAccount):
    def deposit(self, amount: float) -> None:
        if amount > 0:
            self.balance += amount
        else:
            raise ValueError("You can't deposit a negative amount of money")

    def withdraw(self, amount: float) -> None:
        if 0 < amount <= self.balance:
            self.balance -= amount
        else:
            raise ValueError("Account does not have sufficient funds")

Fork/Joinによる並列処理

データをワーカー分にchunkするのがポイントか。

import typing as T
import random
from multiprocessing.pool import ThreadPool

Summary = T.Mapping[int, int]

def process_votes(pile: T.List[int], worker_count: int = 4) -> Summary:
    vote_count = len(pile)
    vpw = vote_count // worker_count

    vote_piles = [
        pile[i * vpw:(i + 1) * vpw]
        for i in range(worker_count)
    ]

    with ThreadPool(worker_count) as pool:
        worker_summaries = pool.map(process_pile, vote_piles)
    
    total_summary = {}
    for worker_summary in worker_summaries:
        print(f"Votes from staff member: {worker_summary}")
        for candidate, count in worker_summary.items():
            if candidate in total_summary:
                total_summary[candidate] += count
            else:
                total_summary[candidate] = count
    
    return total_summary

def process_pile(pile: T.List[int]) -> Summary:
    summary = {}
    for vote in pile:
        if vote in summary:
            summary[vote] += 1
        else:
            summary[vote] = 1
    return summary

if __name__ == "__main__":
    num_candidates = 3
    num_voters = 100
    pile = [random.randint(1, num_candidates) for _ in range(num_voters)]
    counts = process_votes(pile)
    print(f"Total number of votes: {counts}")

$ python3 count_votes_sequential.py
Votes from staff member: {3: 9, 2: 13, 1: 3}
Votes from staff member: {1: 5, 3: 8, 2: 12}
Votes from staff member: {3: 10, 2: 5, 1: 10}
Votes from staff member: {1: 10, 3: 8, 2: 7}
Total number of votes: {3: 35, 2: 37, 1: 28}

これは凄い… これをRustで書く

use rand::Rng;
use std::collections::HashMap;
use std::sync::Mutex;

static Summaries: Mutex<Vec<HashMap<u32, u32>>> = Mutex::new(Vec::new());

fn process_vote(pile: Vec<u32>, worker_count: u32) {
    let vote_count = pile.len();
    let vpw = vote_count / worker_count as usize;
    println!("{}", vpw);
    let mut vote_piles : Vec<Vec<u32>> = Vec::new();
    for i in 0..worker_count {
        let chunk: Vec<u32> = (&pile[i as usize*vpw ..(i+1)as usize * vpw]).to_vec();
        vote_piles.push(chunk)
    }
    println!("{:?}", vote_piles);

    let pool = rayon::ThreadPoolBuilder::new().num_threads(worker_count as usize).build().unwrap();
    for vote_pile in vote_piles {
        pool.install(move || {
            let result = process_pile(vote_pile); 
            Summaries.lock().unwrap().push(result);
        });
    }
    println!("{:?}", Summaries);
    let mut total_summary = HashMap::new();
    for summary in Summaries.lock().unwrap().clone().into_iter() {
        for (candidate, count) in summary {
            if total_summary.get(&candidate) != None {
                let n = total_summary.get(&candidate).unwrap();
                total_summary.insert(candidate, count + n);
            } else {
                total_summary.insert(candidate, count);
            }
        }
    }
    println!("{:?}", total_summary);
}

fn process_pile(pile: Vec<u32>) -> HashMap<u32, u32> {
    let mut summary = HashMap::new();
    for vote in pile {
        if summary.get(&vote) != None {
            let count = summary.get(&vote).unwrap();
            summary.insert(vote, count + 1);
        } else {
            summary.insert(vote, 1);
        }
    }
    summary
}

fn main(){    
    
    let num_voters = 500;
    let num_candidate = 3;
    
    let mut pile: Vec<u32> = Vec::new();
    let mut rnd = rand::thread_rng();
    for _ in 0..num_voters {
        pile.push(rnd.gen_range(1..(num_candidate + 1)))
    }
    let summary = process_pile(pile.clone());
    println!("{:?}", summary);

    process_vote(pile, 4);
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
Running `target/debug/parallel`
{2: 172, 1: 172, 3: 156}
125
Mutex { data: [{1: 49, 2: 39, 3: 37}, {2: 49, 1: 38, 3: 38}, {2: 43, 1: 39, 3: 43}, {2: 41, 1: 46, 3: 38}], poisoned: false, .. }
{2: 172, 3: 156, 1: 172}

おおおおおおおおお、なんかすげえええええええええええええ