【Rust】Producer, Consumer問題をRustで書いてみる…

スレッド自体はProducerとConsumerで同時に走ってるんだけど、なんか全然違うなぁ…

pub static SIZE: u32 = 5;
static slot: Mutex<Vec<i32>> = Mutex::new(Vec::new());

fn work() {
    for i in 0..10 {
        let producer_handle = thread::spawn(move || {
            if slot.lock().unwrap().len() < SIZE.try_into().unwrap() {
                slot.lock().unwrap().push(1);
                println!("Producer {} work: {:?}", i, slot);
                thread::sleep(Duration::from_millis(500));
            } else {
                for j in 0..5 {
                    if slot.lock().unwrap()[j] == 0 {
                        slot.lock().unwrap()[j] = 1;
                        println!("Producer {} work: {:?}", i, slot);
                        thread::sleep(Duration::from_millis(500));
                    } 
                }
            }
            
        });

        let consumer_handle = thread::spawn(move || {
            if slot.lock().unwrap().len() > 0 {
                if slot.lock().unwrap()[0] == 1 {
                    slot.lock().unwrap()[0] = 0;
                    thread::sleep(Duration::from_millis(700));
                    println!("Consumer {} work: {:?}", i, slot);
                } else if slot.lock().unwrap()[0] == 1 {
                    slot.lock().unwrap()[1] = 0;
                    thread::sleep(Duration::from_millis(700));
                    println!("Consumer {} work: {:?}", i, slot);
                }
            }
        });
        
        producer_handle.join().unwrap();
        consumer_handle.join().unwrap();
    }
}

fn main() {
    work();
    println!("{:?}", slot);
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.31s
Running `target/debug/parallel`
Producer 0 work: Mutex { data: [1], poisoned: false, .. }
Producer 1 work: Mutex { data: [0, 1], poisoned: false, .. }
Consumer 1 work: Mutex { data: [0, 1], poisoned: false, .. }
Producer 2 work: Mutex { data: [0, 1, 1], poisoned: false, .. }
Producer 3 work: Mutex { data: [0, 1, 1, 1], poisoned: false, .. }
Producer 4 work: Mutex { data: [0, 1, 1, 1, 1], poisoned: false, .. }
Producer 5 work: Mutex { data: [0, 1, 1, 1, 1], poisoned: false, .. }
Consumer 5 work: Mutex { data: [0, 1, 1, 1, 1], poisoned: false, .. }
Producer 6 work: Mutex { data: [1, 1, 1, 1, 1], poisoned: false, .. }
Producer 7 work: Mutex { data: [1, 1, 1, 1, 1], poisoned: false, .. }
Consumer 7 work: Mutex { data: [1, 1, 1, 1, 1], poisoned: false, .. }
Producer 8 work: Mutex { data: [1, 1, 1, 1, 1], poisoned: false, .. }
Consumer 8 work: Mutex { data: [1, 1, 1, 1, 1], poisoned: false, .. }
Producer 9 work: Mutex { data: [1, 1, 1, 1, 1], poisoned: false, .. }
Consumer 9 work: Mutex { data: [1, 1, 1, 1, 1], poisoned: false, .. }

【Rust】vending machine(自販機)のプログラム

### 前準備
コンソールからの入力の受け取り

fn main() {
    println!("文字を入力してください:");

    let mut word = String::new();
    std::io::stdin().read_line(&mut word).ok();
    let answer = word.trim().to_string();

    println!("{}", answer);
}

文字を入力してください:
aaa
aaa

### お札、硬貨の枚数も合わせて返却する。
Hashmapにinsertするところは関数化したい

use std::collections::HashMap;
use itertools::Itertools;

fn cash_change(mut change: u32) -> HashMap<u32, u32> {
    let mut change_map: HashMap<u32, u32> = HashMap::new();
    if change / 5000 > 0 {
        change_map.insert(5000, change/5000);
        change = change % 5000;
    }
    if change / 1000 > 0 {
        change_map.insert(1000, change/1000);
        change = change % 1000;
    }
    if change / 500 > 0 {
        change_map.insert(500, change/500);
        change = change % 500;
    }
    if change / 100 > 0 {
        change_map.insert(100, change/100);
        change = change % 100;
    }
    if change / 50 > 0 {
        change_map.insert(50, change/50);
        change = change % 50;
    }
    if change / 10 > 0 {
        change_map.insert(10, change/10);
        change = change % 10;
    }
    if change > 0 {
        change_map.insert(1, change);
    }
    change_map
}

fn main() {
    println!("投入する金額を入力してください:");

    let mut word = String::new();
    std::io::stdin().read_line(&mut word).ok();
    let cash: u32 = word.trim().parse().expect("entered string was not a number");


    println!("商品の金額を入力してください:");
    let mut word = String::new();
    std::io::stdin().read_line(&mut word).ok();
    let price: u32 = word.trim().parse().expect("entered string was not a number");

    println!("お釣りは[{}]です。", cash - price);
    let mut change_map = cash_change(cash - price);
    let mut v: Vec<_> = change_map.into_iter().collect();
    v.sort_by(|x,y| y.0.cmp(&x.0));
    println!("{:?}", v);
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.25s
Running `target/debug/rust`
投入する金額を入力してください:
1000
商品の金額を入力してください:
333
お釣りは[667]です。
[(500, 1), (100, 1), (50, 1), (10, 1), (1, 7)]

### もっと簡潔に

use std::collections::HashMap;
use itertools::Itertools;

fn cash_change(mut change: u32) -> HashMap<u32, u32> {
    let mut change_map: HashMap<u32, u32> = HashMap::new();
    let coin = vec![5000, 1000, 500, 100, 50, 10, 1];
    for i in coin {
        if change / i > 0 {
            change_map.insert(i, change/i);
            change = change % i;
        }
    }
    change_map
}

fn main() {
    println!("投入する金額を入力してください:");

    let mut word = String::new();
    std::io::stdin().read_line(&mut word).ok();
    let cash: u32 = word.trim().parse().expect("entered string was not a number");


    println!("商品の金額を入力してください:");
    let mut word = String::new();
    std::io::stdin().read_line(&mut word).ok();
    let price: u32 = word.trim().parse().expect("entered string was not a number");

    println!("お釣りは[{}]です。", cash - price);
    let mut change_map = cash_change(cash - price);
    let mut v: Vec<_> = change_map.into_iter().collect();
    v.sort_by(|x,y| y.0.cmp(&x.0));
    println!("{:?}", v);
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.01s
Running `target/debug/rust`
投入する金額を入力してください:
3000
商品の金額を入力してください:
200
お釣りは[2800]です。
[(1000, 2), (500, 1), (100, 3)]

おおおおおおお、なるほど〜。同じ処理のところはイテレータにしてforループで回せば、より簡潔に書けますね。

【並列処理】リーダー(Reader)とライター(Writer)の問題

import time
import random
from threading import Thread
from rwlock import RWLock

counter = 0
lock = RWLock()

class User(Thread):
    def __init__(self, idx: int):
        super().__init__()
        self.idx = idx

    def run(self) -> None:
        while True:
            lock.acquire_read()
            print(f"User {self.idx} reading: {counter}")
            time.sleep(random.randrange(1, 3))
            lock.release_read()
            time.sleep(0.5)

class Librarian(Thread):
    def run(self) -> None:
        global counter
        while True:
            lock.acquire_write()
            print("Librarian writing...")
            counter += 1
            print(f"New Value: {counter}")
            time.sleep(random.randrange(1, 3))
            lock.release_write()

if __name__ == "__main__":
    threads = [
        User(0),
        User(1),
        Librarian()
    ]

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

$ python3 reader_writer.py
User 0 reading: 0
User 1 reading: 0
User 0 reading: 0
Librarian writing…
New Value: 1
Librarian writing…
New Value: 2
Librarian writing…
New Value: 3
Librarian writing…
New Value: 4
Librarian writing…

ちょっと期待している挙動と異なるな…

【並列処理】プロデューサー/コンシューマー問題

produceとcomsumeが同じではないので、結果的に、速度が遅い方に引っ張られることになる。

import time
from threading import Thread, Semaphore, Lock

SIZE = 5
BUFFER = ["" for i in range(SIZE)]
producer_idx: int = 0

mutex = Lock()
empty = Semaphore(SIZE)
full = Semaphore(0)

class Producer(Thread):
    def __init__(self, name: str, maximum_items: int = 5):
        super().__init__()
        self.counter = 0
        self.name = name
        self.maximum_items = maximum_items

    def next_index(self, index: int) -> int:
        return (index + 1) % SIZE

    def run(self) -> None:
        global producer_idx
        while self.counter < self.maximum_items:
            empty.acquire()
            mutex.acquire()
            self.counter += 1
            BUFFER[producer_idx] = f"{self.name}-{self.counter}"
            print(f"{self.name} produced: "
                f"'{BUFFER[producer_idx]}' into slot {producer_idx}")
            producer_idx = self.next_index(producer_idx)
            mutex.release()
            full.release()
            time.sleep(1)

class Consumer(Thread):
    def __init__(self, name: str, maximum_items: int = 10):
        super().__init__()
        self.name = name
        self.idx = 0
        self.counter = 0
        self.maximum_items = maximum_items
    
    def next_index(self) -> int:
        return (self.idx + 1) % SIZE

    def run(self) -> None:
        while self.counter < self.maximum_items:
            full.acquire()
            mutex.acquire()
            item = BUFFER[self.idx]
            print(f"{self.name} consumed item: "
                f"'{item}' from slot {self.idx}")
            self.idx = self.next_index()
            self.counter += 1
            mutex.release()
            empty.release()
            time.sleep(2)

if __name__ == "__main__":
    threads = [
        Producer("SpongeBob"),
        Producer("Patrick"),
        Consumer("Squidward")
    ]

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

$ python3 producer_comsumer.py
SpongeBob produced: ‘SpongeBob-1’ into slot 0
Patrick produced: ‘Patrick-1’ into slot 1
Squidward consumed item: ‘SpongeBob-1’ from slot 0
SpongeBob produced: ‘SpongeBob-2’ into slot 2
Patrick produced: ‘Patrick-2’ into slot 3
Squidward consumed item: ‘Patrick-1’ from slot 1
Patrick produced: ‘Patrick-3’ into slot 4
SpongeBob produced: ‘SpongeBob-3’ into slot 0
Patrick produced: ‘Patrick-4’ into slot 1
Squidward consumed item: ‘SpongeBob-2’ from slot 2
SpongeBob produced: ‘SpongeBob-4’ into slot 2
Squidward consumed item: ‘Patrick-2’ from slot 3
Patrick produced: ‘Patrick-5’ into slot 3
Squidward consumed item: ‘Patrick-3’ from slot 4
SpongeBob produced: ‘SpongeBob-5’ into slot 4
Squidward consumed item: ‘SpongeBob-3’ from slot 0

【並列処理】飢饉状態

スケジューリングの優先順位に偏りがあると、スレッドの実行が平等にならない。

import time
from threading import Thread
from lock_with_name import LockWithName
dumplings = 120

class Philosopher(Thread):
    def __init__(self, name: str,
            left_chopstick: LockWithName,
            right_chopstick: LockWithName):
        super().__init__()
        self.name = name
        self.left_chopstick = left_chopstick
        self.right_chopstick = right_chopstick

    def run(self) -> None:
        global dumplings

        dumplings_eaten = 0
        while dumplings > 0:
            self.left_chopstick.acquire()
            self.right_chopstick.acquire()
            if dumplings > 0:
                dumplings -= 1
                dumplings_eaten += 1
                time.sleep(1e-16)
            self.right_chopstick.release()
            self.left_chopstick.release()
        print(f"{self.name} took {dumplings_eaten} pieces")

if __name__ == "__main__":
    chopstick_a = LockWithName("chopstick_a")
    chopstick_b = LockWithName("chopstick_b")

    threads = []
    for i in range(10):
        threads.append(
            Philosopher(f"Philosopher #{i}", chopstick_a, chopstick_b))
    
    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

$ python3 starvation.py
Philosopher #4 took 51 pieces
Philosopher #9 took 0 pieces
Philosopher #1 took 0 pieces
Philosopher #3 took 0 pieces
Philosopher #7 took 0 pieces
Philosopher #8 took 0 pieces
Philosopher #6 took 0 pieces
Philosopher #2 took 0 pieces
Philosopher #5 took 0 pieces
Philosopher #0 took 69 pieces

【並列処理】ライブロック

状態に応じて処理を変えているが、これも結局、両方のスレッドが待ち状態になってしまい、ロックがかかってしまう。
一般的に、starvation(飢饉状態)という。

import time
from threading import Thread
from lock_with_name import LockWithName
dumplings = 20

class Philosopher(Thread):
    def __init__(self, name: str,
            left_chopstick: LockWithName,
            right_chopstick: LockWithName):
        super().__init__()
        self.name = name
        self.left_chopstick = left_chopstick
        self.right_chopstick = right_chopstick

    def run(self) -> None:
        global dumplings

        while dumplings > 0:
            self.left_chopstick.acquire()
            print(f"{self.left_chopstick.name} chopstick "
                f"grabbed by {self.name}")
            if self.right_chopstick.locked():
                print(f"{self.name} can not get the "
                    f"{self.right_chopstick.name} chopstick, "
                    f"politely concedes...")
            else:
                self.right_chopstick.acuire()
                print(f"{self.right_chopstick.name} chopstick "
                f"grabbed by {self.name}")
                dumplings -= 1
                print(f"{self.name} eats dumpling. Dumplings "
                    f"left: {dumplings}")
                time.sleep(1)
                self.right_chopstick.release()
            self.left_chopstick.release()

if __name__ == "__main__":
    chopstick_a = LockWithName("chopstick_a")
    chopstick_b = LockWithName("chopstick_b")

    philosopher_1 = Philosopher("Philosopher #1", chopstick_a, chopstick_b)
    philosopher_2 = Philosopher("Philosopher #2", chopstick_b, chopstick_a)

    philosopher_1.start()
    philosopher_2.start()

Philosopher #2 can not get the chopstick_a chopstick, politely concedes…
chopstick_b chopstick grabbed by Philosopher #2
Philosopher #2 can not get the chopstick_a chopstick, politely concedes…
chopstick_b chopstick grabbed by Philosopher #2
Philosopher #2 can not get the chopstick_a chopstick, politely concedes…
Philosopher #1 can not get the chopstick_b chopstick, politely concedes…
chopstick_a chopstick grabbed by Philosopher #1
Philosopher #1 can not get the chopstick_b chopstick, politely concedes…
chopstick_a chopstick grabbed by Philosopher #1
Philosopher #1 can not get the chopstick_b chopstick, politely concedes…
chopstick_a chopstick grabbed by Philosopher #1
Philosopher #1 can not get the chopstick_b chopstick, politely concedes…
chopstick_a chopsti^C

【並列処理】デッドロックの発生

デッドロックは他のタスクによって占有されて実行できない状態
left_chopstickとright_chopstickを別々に処理しているのでこのようなことが起きる。left_chopstickとright_chopstickを同時にacuqire, releaseすればロジック上、ロックされない。

from threading import Lock
from typing import Any, Tuple

class LockWithName:

    def __init__(self, name: str):
        self.name = name
        self._lock = Lock()

    def acquire(self) -> None:
        self._lock.acquire()

    def release(self) -> None:
        self._lock.release()

    def locked(self) -> bool:
        return self._lock.locked()

    def __enter__(self) -> None:
        self.acquire()

    def __exit__(self, *args: Tuple[Any]) -> None:
        self.release()
import time
from threading import Thread

from lock_with_name import LockWithName
dumplings = 20

class Philosopher(Thread):
    def __init__(self, name: str,
            left_chopstick: LockWithName,
            right_chopstick: LockWithName):
        super().__init__()
        self.name = name
        self.left_chopstick = left_chopstick
        self.right_chopstick = right_chopstick

    def run(self) -> None:
        global dumplings

        while dumplings > 0:
            self.left_chopstick.acquire()
            print(f"{self.left_chopstick.name} grabbed by {self.name} "
                f"now needs {self.right_chopstick.name}")
            self.right_chopstick.acquire()
            print(f"{self.right_chopstick.name} grabbed by {self.name}")
            dumplings -= 1
            print(f"{self.name} eats a dumpling. "
                f"Dumplings left: {dumplings}")
            self.right_chopstick.release()
            print(f"{self.right_chopstick.name} released by {self.name}")
            self.left_chopstick.release()
            print(f"{self.left_chopstick.name} released by {self.name}")
            print(f"{self.name} is thinking...")
            time.sleep(0.1)

if __name__ == "__main__":
    chopstick_a = LockWithName("chopstick_a")
    chopstick_b = LockWithName("chopstick_b")

    philosopher_1 = Philosopher("Philosopher #1", chopstick_a, chopstick_b)
    philosopher_2 = Philosopher("Philosopher #2", chopstick_b, chopstick_a)

    philosopher_1.start()
    philosopher_2.start()

$ python3 deadlock.py
chopstick_a grabbed by Philosopher #1 now needs chopstick_b
chopstick_b grabbed by Philosopher #2 now needs chopstick_a

止まってしまう

ここではwaiterというclassを追加しているが、別にwaiterではなく、whileのループの中で同時にacquire, releaseできれば良い。

import time
from threading import Thread, Lock
from lock_with_name import LockWithName
dumplings = 20

class Waiter:
    def __init__(self) -> None:
        self.mutex = Lock()

    def ask_for_chopsticks(self, 
                left_chopstick: LockWithName,
                right_chopstick: LockWithName) -> None:
        with self.mutex:
            left_chopstick.acquire()
            print(f"{left_chopstick.name} grabbed")
            right_chopstick.acquire()
            print(f"{right_chopstick.name} grabbed")

    def release_chopsticks(self, 
                left_chopstick: LockWithName,
                right_chopstick: LockWithName) -> None:
        
            right_chopstick.release()
            print(f"{right_chopstick.name} released")
            left_chopstick.release()
            print(f"{left_chopstick.name} released")


class Philosopher(Thread):
    def __init__(self, name: str, waiter: Waiter,
            left_chopstick: LockWithName,
            right_chopstick: LockWithName):
        super().__init__()
        self.name = name
        self.left_chopstick = left_chopstick
        self.right_chopstick = right_chopstick
        self.waiter = waiter

    def run(self) -> None:
        global dumplings

        while dumplings > 0:
            print(f"{self.name} asks waiter for chopsticks")
            self.waiter.ask_for_chopsticks (
                self.left_chopstick, self.right_chopstick)
            
            dumplings -= 1
            print(f"{self.name} eats a dumpling. "
                f"Dumpling left: {dumplings}")
            print(f"{self.name} returns chopsticks to waiter")
            self.waiter.release_chopsticks (
                self.left_chopstick, self.right_chopstick)
            time.sleep(0.1)

if __name__ == "__main__":
    chopstick_a = LockWithName("chopstick_a")
    chopstick_b = LockWithName("chopstick_b")

    waiter = Waiter()
    philosopher_1 = Philosopher("Philosopher #1", waiter, chopstick_a, chopstick_b)
    philosopher_2 = Philosopher("Philosopher #2", waiter, chopstick_b, chopstick_a)

    philosopher_1.start()
    philosopher_2.start()

$ python3 deadlock_arbitrator.py
Philosopher #1 asks waiter for chopsticks
chopstick_a grabbed
chopstick_b grabbed
Philosopher #1 eats a dumpling. Dumpling left: 19
Philosopher #1 returns chopsticks to waiter
chopstick_b released
chopstick_a released
….

【Rust】Mutexの使い方を理解したい

シングルスレッド
mutexのデータにアクセスするには、lockメソッドを使用してロックを獲得する

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m= {:?}", m);
}

スレッド間でsemaforeを利用しようとするとエラーになる。

use std::sync::{Mutex, Arc};
use std::thread;
use rand::Rng;
use std_semaphore::Semaphore;
use std::time::Duration;

static TOTAL_SPOTS: u32 = 3;

fn main() {
    let parked_cars = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    let sem = Semaphore::new(TOTAL_SPOTS.try_into().unwrap());

    for _ in 0..10 {

        let counter = Arc::clone(&parked_cars);

        let handle = thread::spawn(move || {

            let mut rng = rand::thread_rng(); 
            // enter
            sem.acquire();
            let mut num = counter.lock().unwrap();
            *num += 1;
            thread::sleep(Duration::from_millis(rng.gen_range(1..10)));

            // exit
            sem.release();
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *parked_cars.lock().unwrap());
}

15 | for _ in 0..10 {
| ————– inside of this loop

19 | let handle = thread::spawn(move || {
| ^^^^^^^ value moved into closure here, in previous iteration of loop

23 | sem.acquire();
| — use occurs due to use in closure

中々奥が深い..

【Rust】rustでセマフォ(semaphore)を使いたい

mainじゃなくて関数として使いたいな…

use std_semaphore::Semaphore;
use std::thread;
use std::time::Duration;

static TOTAL_SPOTS: u32 = 3;

fn main() {
    let sem = Semaphore::new(TOTAL_SPOTS.try_into().unwrap());

    let mut parked_cars: Vec<u32> = Vec::new();

    let car: u32 = 1;

    // enter
    sem.acquire();
    parked_cars.push(car);
    thread::sleep(Duration::from_millis(500));
    println!("{:?}", parked_cars);

    // exit
    parked_cars.clear();
    sem.release();

    println!("{:?}", parked_cars);
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.17s
Running `target/debug/parallel`
[1]
[]

【並列処理】セマフォ(semaphore)とmutex Lock

import typing as T
import time
import random
from threading import Thread, Semaphore, Lock

TOTAL_SPOTS = 3

class Garage:

    def __init__(self) -> None:
        self.semaphore = Semaphore(TOTAL_SPOTS)
        self.cars_lock = Lock()
        self.parked_cars: T.LIST[str] = []

    def count_parked_cars(self) -> int:
        return len(self.parked_cars)

    def enter(self, car_name: str) -> None:
        self.semaphore.acquire()
        self.cars_lock.acquire()
        self.parked_cars.append(car_name)
        print(f"{car_name} parked")
        self.cars_lock.release()

    def exit(self, car_name: str) -> None:
        self.cars_lock.acquire()
        self.parked_cars.remove(car_name)
        print(f"{car_name} leaving")
        self.semaphore.release()
        self.cars_lock.release()

def park_car(garage: Garage, car_name: str) -> None:
    garage.enter(car_name)
    time.sleep(random.uniform(1, 2))
    garage.exit(car_name)

def test_garage(garage: Garage, number_of_cars: int = 10) -> None:
    threads = []
    for car_num in range(number_of_cars):
        t = Thread(target=park_car,
            args=(garage, f"Car #{car_num}"))
        threads.append(t)
        t.start()

    for thread in threads:
        thread.join()

if __name__ == "__main__":
    number_of_cars = 10
    garage = Garage()
    test_garage(garage, number_of_cars)

    print("Number of parked cars after a busy day:")
    print(f"Actual: {garage.count_parked_cars()}\nExpected: 0")

e$ python3 semaphore.py
Car #0 parked
Car #1 parked
Car #2 parked
Car #1 leaving
Car #3 parked
Car #2 leaving
Car #4 parked
Car #0 leaving
Car #5 parked
Car #3 leaving
Car #6 parked
Car #4 leaving
Car #7 parked
Car #5 leaving
Car #8 parked
Car #6 leaving
Car #9 parked
Car #7 leaving
Car #8 leaving
Car #9 leaving
Number of parked cars after a busy day:
Actual: 0
Expected: 0