Future

from __future__ import annotations

import typing as T 
from collections import deque
from random import randint

Result = T.Any
Burger = Result
Coroutine = T.Callable[[], 'Future']

class Future:
    def __init__(self) -> None:
        self.done = False
        self.coroutine = None
        self.result = None

    def set_coroutine(self, coroutine: Coroutine) -> None:
        self.coroutine = coroutine

    def set_result(self, result: Result) -> None:
        self.done = True
        self.result = result

    def __iter__(self) -> Future:
        return self

    def __next__(self) -> Result:
        if not self.done:
            raise StopIteration
        return self.result

class EventLoop:
    def __init__(self) -> None:
        self.tasks: T.Deque[Coroutine] = deque()

    def add_coroutine(self, coroutine: Coroutine) -> None:
        self.tasks.append(coroutine)

    def run_coroutine(self, task: T.Callable) -> None:
        future = task()
        future.set_coroutine(task)
        try:
            next(future)
            if not future.done:
                future.set_coroutine(task)
                self.add_coroutine(task)
        except StopIteration:
            return

    def run_forever(self) -> None:
        while self.tasks:
            self.run_coroutine(self.tasks.popleft())

def cook(on_done: T.Callable[[Burger], None]) -> None:
    burger: str = f"Burger #{randint(1, 10)}"
    print(f"{burger} is cooked!")
    on_done(burger)

def cashier(burger: Burger, on_done: T.Callable[[Burger], None]) -> None:
    print("Burger is ready for pick up!")
    on_done(burger)

def order_burger() -> Future:
    order = Future()

    def on_cook_done(burger: Burger) -> None:
        cashier(burger, on_cashier_done)

    def on_cashier_done(burger: Burger) -> None:
        print(f"{burger}? That's me! Mmmmmm!")
        order.set_result(burger)
    
    cook(on_cook_done)
    return order

if __name__ == "__main__":
    event_loop = EventLoop()
    event_loop.add_coroutine(order_burger)
    event_loop.run_forever()

$ python3 future_burger.py
Burger #10 is cooked!
Burger is ready for pick up!
Burger #10? That’s me! Mmmmmm!

コルーチン

from collections import deque
import typing as T

Coroutine = T.Generator[None, None, int]

class EventLoop:
    def __init__(self) -> None:
        self.tasks: T.Deque[Coroutine] = deque()

    def add_coroutine(self, task: Coroutine) -> None:
        self.tasks.append(task)
    
    def run_coroutine(self, task: Coroutine) -> None:
        try:
            task.send(None)
            self.add_coroutine(task)
        except StopIteration:
            print("Task completed")

    def run_forever(self) -> None:
        while self.tasks:
            print("Event loop cycle.")
            self.run_coroutine(self.tasks.popleft())

def fibonacci(n: int) -> Coroutine:
    a, b = 0, 1
    for i in range(n):
        a, b = b, a + b
        print(f"Fibonacci({i}): {a}")
        yield
    return a

if __name__ == "__main__":
    event_loop = EventLoop()
    event_loop.add_coroutine(fibonacci(5))
    event_loop.run_forever()

$ python3 coroutine.py
Event loop cycle.
Fibonacci(0): 1
Event loop cycle.
Fibonacci(1): 1
Event loop cycle.
Fibonacci(2): 2
Event loop cycle.
Fibonacci(3): 3
Event loop cycle.
Fibonacci(4): 5
Event loop cycle.
Task completed

イベント

from __future__ import annotations 

from collections import deque
from time import sleep
import typing as T

class Event:
    def __init__(self, name: str, action: T.Callable[..., None],
            next_event: T.Optional[Event] = None) -> None:
        self.name = name
        self._action = action
        self._next_event = next_event

    def execute_action(self) -> None:
        self._action(self)
        if self._next_event:
            event_loop.register_event(self._next_event)

class EventLoop:
    def __init__(self) -> None:
        self._events: deque[Event] = deque()

    def register_event(self, event: Event) -> None:
        self._events.append(event)

    def run_forever(self) -> None:
        print(f"Queue running with {len(self._events)} event")

        while True:
            try:
                event = self._events.popleft()
            except IndexError:
                continue
            event.execute_action()

def knock(event: Event) -> None:
    print(event.name)
    sleep(1)

def who(event: Event) -> None:
    print(event.name)
    sleep(1)

if __name__ == "__main__":
    event_loop = EventLoop()
    replying = Event("Who's there?", who)
    knocking = Event("Knock-knock", knock, replying)
    for _ in range(2):
        event_loop.register_event(knocking)
    event_loop.run_forever()

$ python3 event_loop.py
Queue running with 2 event
Knock-knock
Knock-knock
Who’s there?
Who’s there?

import typing as T
import select
from socket import socket, create_server

Data = bytes
Action = T.Union[T.Callable[[socket], None],
        T.Tuple[T.Callable[[socket, Data], None], str]]
Mask = int

class EventLoop:
    def __init__(self) -> None:
        self.writers = {}
        self.readers = {}

    def register_event(self, source: socket, event: Mask,
            action: Action) -> None:
        key = source.fileno()
        if event & select.POLLIN:
            self.readers[key] = (source, event, action)
        elif event & select.POLLOUT:
            self.writers[key] = (source, event, action)

    def unregister_event(self, source: socket) -> None:
        key = source.fileno()
        if self.readers.get(key):
            del self.readers[key]
        if self.writers.get(key):
            del self.writers[key]

    def run_forever(self) -> None:
        while True:
            readers, writers, _ = select.select(
                self.readers, self.writers, [])
            for reader in readers:
                source, event, action = self.readers.pop(reader)
                action(source)
            for writer in writers:
                source, event, action = self.writers.pop(writer)
                action, msg = action
                action(source, msg)

socketによるサーバ機能

from socket import socket, create_server

BUFFER_SIZE = 1024
ADDRESS = ("127.0.0.1", 12345)

class Server:
    def __init__(self) -> None:
        try:
            print(f"Starting up at: {ADDRESS}")
            self.server_socket: socket = create_server(ADDRESS)
        except OSError:
            self.server_socket.close()
            print("\nServer stopped.")

    def accept(self) -> socket:
        conn, client_address = self.server_socket.accept()
        print(f"Connected to {client_address}")
        return conn

    def serve(self, conn: socket) -> None:
        try:
            while True:
                data = conn.recv(BUFFER_SIZE)
                if not data:
                    break
                try:
                    order = int(data.decode())
                    response = f"Thank you for ordering {order} pizzas!\n"
                except ValueError:
                    response = "Wrong number of pizzas, please try again\n"
                print(f"Sending message to {conn.getpeername()}")
                conn.send(response.encode())
        finally:
            print(f"Connection with {conn.getpeername()} has been closed")
            conn.close()
        
    def start(self) -> None:
        print("Server listening for incoming connections")
        try:
            while True:
                conn = self.accept()
                self.serve(conn)

        finally:
            self.server_socket.close()
            print("\nServer stopped.")

if __name__ == "__main__":
    server = Server()
    server.start()

$ python3 pizza_server.py
Starting up at: (‘127.0.0.1’, 12345)
Server listening for incoming connections
Connected to (‘127.0.0.1’, 55292)
Sending message to (‘127.0.0.1’, 55292)
^CConnection with (‘127.0.0.1’, 55292) has been closed

$ nc 127.0.0.1 12345
10
Thank you for ordering 10 pizzas!

### サーバの並列化

from socket import socket, create_server
from threading import Thread

BUFFER_SIZE = 1024
ADDRESS = ("127.0.0.1", 12345)

class Handler(Thread):
    def __init__(self, conn: socket):
        super().__init__()
        self.conn = conn
    
    def run(self) -> None:
        print(f"Connected to {self.conn.getpeername()}")
        try:
            while True:
                data = self.conn.recv(BUFFER_SIZE)
                if not data:
                    break
                try:
                    order = int(data.decode())
                    response = f"Thank you for ordering {order} pizzas!\n"
                except ValueError:
                    response = "Wrong number of pizzas, please try again\n"
                print(f"Sending message to {self.conn.getpeername()}")
                self.conn.send(response.encode())
        finally:
            print(f"Connection with {self.conn.getpeername()} has been closed")
            self.conn.close()

class Server:
    def __init__(self) -> None:
        try:
            print(f"Starting up at: {ADDRESS}")
            self.server_socket: socket = create_server(ADDRESS)
        except OSError:
            self.server_socket.close()
            print("\nServer stopped.")
        
    def start(self) -> None:
        print("Server listening for incoming connections")
        try:
            while True:
                conn, address = self.server_socket.accept()
                print(f"Client connection request from {address}")
                thread = Handler(conn)
                thread.start()

        finally:
            self.server_socket.close()
            print("\nServer stopped.")

if __name__ == "__main__":
    server = Server()
    server.start()

### ノンブロッキングモード

import typing as T
from socket import socket, create_server

BUFFER_SIZE = 1024
ADDRESS = ("127.0.0.1", 12345)

class Server:
    clients: T.Set[socket] = set()

    def __init__(self) -> None:
        try:
            print(f"Starting up at: {ADDRESS}")
            self.server_socket: socket = create_server(ADDRESS)
            self.server_socket.setblocking(False)
        except OSError:
            self.server_socket.close()
            print("\nServer stopped.")

    def accept(self) -> socket:
        try:
            conn, address = self.server_socket.accept()
            print(f"Connected to {address}")
            conn.setblocking(False)
            self.clients.add(conn)
        except BlockingIOError:
            pass

    def serve(self, conn: socket) -> None:
        try:
            while True:
                data = conn.recv(BUFFER_SIZE)
                if not data:
                    break
                try:
                    order = int(data.decode())
                    response = f"Thank you for ordering {order} pizzas!\n"
                except ValueError:
                    response = "Wrong number of pizzas, please try again\n"
                print(f"Sending message to {conn.getpeername()}")
                conn.send(response.encode())
        except BlockingIOError:
            pass
        
    def start(self) -> None:
        print("Server listening for incoming connections")
        try:
            while True:
                self.accept()
                for conn in self.clients.copy():
                    self.serve(conn)

        finally:
            self.server_socket.close()
            print("\nServer stopped.")

if __name__ == "__main__":
    server = Server()
    server.start()

【Rust】Reader Writer Lock

use std::sync::RwLock の Read, Write
https://doc.rust-lang.org/stable/std/sync/struct.RwLock.html

概念は結構複雑なんだけど、なんかすげ〜簡単に書いてるな…

use std::sync::{Arc, RwLock};
use std::thread;

struct User {
    name: String,
    age: u32,
}

fn main() {

    let user = Arc::new(RwLock::new(User {
        name: String::from("Alice"),
        age: 30,
    }));

    let mut handles = vec![];

    for i in 0..10 {
        let data_clone = Arc::clone(&user);
        let handle = thread::spawn(move || {
            let shared = data_clone.read().unwrap();
            println!("読み取りスレッド {}: {}は{}です", i, shared.name, shared.age);
        });
        handles.push(handle);
    }

    for i in 0..5 {
        let data_clone = Arc::clone(&user);
        let handle = thread::spawn(move|| {
            let mut shared = data_clone.write().unwrap();
            shared.age += 1;
            shared.name = format!("Alice({})", i);
            println!("書き込みスレッド {}: カウンターを更新しました", i);            
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    let final_state = user.read().unwrap();
    println!("最終状態: {} は{}歳です", final_state.name, final_state.age);
}

Compiling parallel v0.1.0 (/home/vagrant/dev/rust/parallel)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.37s
Running `target/debug/parallel`
読み取りスレッド 0: Aliceは30です
読み取りスレッド 4: Aliceは30です
読み取りスレッド 2: Aliceは30です
読み取りスレッド 5: Aliceは30です
書き込みスレッド 3: カウンターを更新しました
書き込みスレッド 4: カウンターを更新しました
書き込みスレッド 2: カウンターを更新しました
書き込みスレッド 1: カウンターを更新しました
書き込みスレッド 0: カウンターを更新しました
読み取りスレッド 1: Alice(0)は35です
読み取りスレッド 8: Alice(0)は35です
読み取りスレッド 9: Alice(0)は35です
読み取りスレッド 3: Alice(0)は35です
読み取りスレッド 7: Alice(0)は35です
読み取りスレッド 6: Alice(0)は35です
最終状態: Alice(0) は35歳です

【並列処理】リーダー(Reader)とライター(Writer)の問題

import time
import random
from threading import Thread
from rwlock import RWLock

counter = 0
lock = RWLock()

class User(Thread):
    def __init__(self, idx: int):
        super().__init__()
        self.idx = idx

    def run(self) -> None:
        while True:
            lock.acquire_read()
            print(f"User {self.idx} reading: {counter}")
            time.sleep(random.randrange(1, 3))
            lock.release_read()
            time.sleep(0.5)

class Librarian(Thread):
    def run(self) -> None:
        global counter
        while True:
            lock.acquire_write()
            print("Librarian writing...")
            counter += 1
            print(f"New Value: {counter}")
            time.sleep(random.randrange(1, 3))
            lock.release_write()

if __name__ == "__main__":
    threads = [
        User(0),
        User(1),
        Librarian()
    ]

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

$ python3 reader_writer.py
User 0 reading: 0
User 1 reading: 0
User 0 reading: 0
Librarian writing…
New Value: 1
Librarian writing…
New Value: 2
Librarian writing…
New Value: 3
Librarian writing…
New Value: 4
Librarian writing…

ちょっと期待している挙動と異なるな…

【並列処理】プロデューサー/コンシューマー問題

produceとcomsumeが同じではないので、結果的に、速度が遅い方に引っ張られることになる。

import time
from threading import Thread, Semaphore, Lock

SIZE = 5
BUFFER = ["" for i in range(SIZE)]
producer_idx: int = 0

mutex = Lock()
empty = Semaphore(SIZE)
full = Semaphore(0)

class Producer(Thread):
    def __init__(self, name: str, maximum_items: int = 5):
        super().__init__()
        self.counter = 0
        self.name = name
        self.maximum_items = maximum_items

    def next_index(self, index: int) -> int:
        return (index + 1) % SIZE

    def run(self) -> None:
        global producer_idx
        while self.counter < self.maximum_items:
            empty.acquire()
            mutex.acquire()
            self.counter += 1
            BUFFER[producer_idx] = f"{self.name}-{self.counter}"
            print(f"{self.name} produced: "
                f"'{BUFFER[producer_idx]}' into slot {producer_idx}")
            producer_idx = self.next_index(producer_idx)
            mutex.release()
            full.release()
            time.sleep(1)

class Consumer(Thread):
    def __init__(self, name: str, maximum_items: int = 10):
        super().__init__()
        self.name = name
        self.idx = 0
        self.counter = 0
        self.maximum_items = maximum_items
    
    def next_index(self) -> int:
        return (self.idx + 1) % SIZE

    def run(self) -> None:
        while self.counter < self.maximum_items:
            full.acquire()
            mutex.acquire()
            item = BUFFER[self.idx]
            print(f"{self.name} consumed item: "
                f"'{item}' from slot {self.idx}")
            self.idx = self.next_index()
            self.counter += 1
            mutex.release()
            empty.release()
            time.sleep(2)

if __name__ == "__main__":
    threads = [
        Producer("SpongeBob"),
        Producer("Patrick"),
        Consumer("Squidward")
    ]

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

$ python3 producer_comsumer.py
SpongeBob produced: ‘SpongeBob-1’ into slot 0
Patrick produced: ‘Patrick-1’ into slot 1
Squidward consumed item: ‘SpongeBob-1’ from slot 0
SpongeBob produced: ‘SpongeBob-2’ into slot 2
Patrick produced: ‘Patrick-2’ into slot 3
Squidward consumed item: ‘Patrick-1’ from slot 1
Patrick produced: ‘Patrick-3’ into slot 4
SpongeBob produced: ‘SpongeBob-3’ into slot 0
Patrick produced: ‘Patrick-4’ into slot 1
Squidward consumed item: ‘SpongeBob-2’ from slot 2
SpongeBob produced: ‘SpongeBob-4’ into slot 2
Squidward consumed item: ‘Patrick-2’ from slot 3
Patrick produced: ‘Patrick-5’ into slot 3
Squidward consumed item: ‘Patrick-3’ from slot 4
SpongeBob produced: ‘SpongeBob-5’ into slot 4
Squidward consumed item: ‘SpongeBob-3’ from slot 0

【並列処理】飢饉状態

スケジューリングの優先順位に偏りがあると、スレッドの実行が平等にならない。

import time
from threading import Thread
from lock_with_name import LockWithName
dumplings = 120

class Philosopher(Thread):
    def __init__(self, name: str,
            left_chopstick: LockWithName,
            right_chopstick: LockWithName):
        super().__init__()
        self.name = name
        self.left_chopstick = left_chopstick
        self.right_chopstick = right_chopstick

    def run(self) -> None:
        global dumplings

        dumplings_eaten = 0
        while dumplings > 0:
            self.left_chopstick.acquire()
            self.right_chopstick.acquire()
            if dumplings > 0:
                dumplings -= 1
                dumplings_eaten += 1
                time.sleep(1e-16)
            self.right_chopstick.release()
            self.left_chopstick.release()
        print(f"{self.name} took {dumplings_eaten} pieces")

if __name__ == "__main__":
    chopstick_a = LockWithName("chopstick_a")
    chopstick_b = LockWithName("chopstick_b")

    threads = []
    for i in range(10):
        threads.append(
            Philosopher(f"Philosopher #{i}", chopstick_a, chopstick_b))
    
    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

$ python3 starvation.py
Philosopher #4 took 51 pieces
Philosopher #9 took 0 pieces
Philosopher #1 took 0 pieces
Philosopher #3 took 0 pieces
Philosopher #7 took 0 pieces
Philosopher #8 took 0 pieces
Philosopher #6 took 0 pieces
Philosopher #2 took 0 pieces
Philosopher #5 took 0 pieces
Philosopher #0 took 69 pieces

【並列処理】ライブロック

状態に応じて処理を変えているが、これも結局、両方のスレッドが待ち状態になってしまい、ロックがかかってしまう。
一般的に、starvation(飢饉状態)という。

import time
from threading import Thread
from lock_with_name import LockWithName
dumplings = 20

class Philosopher(Thread):
    def __init__(self, name: str,
            left_chopstick: LockWithName,
            right_chopstick: LockWithName):
        super().__init__()
        self.name = name
        self.left_chopstick = left_chopstick
        self.right_chopstick = right_chopstick

    def run(self) -> None:
        global dumplings

        while dumplings > 0:
            self.left_chopstick.acquire()
            print(f"{self.left_chopstick.name} chopstick "
                f"grabbed by {self.name}")
            if self.right_chopstick.locked():
                print(f"{self.name} can not get the "
                    f"{self.right_chopstick.name} chopstick, "
                    f"politely concedes...")
            else:
                self.right_chopstick.acuire()
                print(f"{self.right_chopstick.name} chopstick "
                f"grabbed by {self.name}")
                dumplings -= 1
                print(f"{self.name} eats dumpling. Dumplings "
                    f"left: {dumplings}")
                time.sleep(1)
                self.right_chopstick.release()
            self.left_chopstick.release()

if __name__ == "__main__":
    chopstick_a = LockWithName("chopstick_a")
    chopstick_b = LockWithName("chopstick_b")

    philosopher_1 = Philosopher("Philosopher #1", chopstick_a, chopstick_b)
    philosopher_2 = Philosopher("Philosopher #2", chopstick_b, chopstick_a)

    philosopher_1.start()
    philosopher_2.start()

Philosopher #2 can not get the chopstick_a chopstick, politely concedes…
chopstick_b chopstick grabbed by Philosopher #2
Philosopher #2 can not get the chopstick_a chopstick, politely concedes…
chopstick_b chopstick grabbed by Philosopher #2
Philosopher #2 can not get the chopstick_a chopstick, politely concedes…
Philosopher #1 can not get the chopstick_b chopstick, politely concedes…
chopstick_a chopstick grabbed by Philosopher #1
Philosopher #1 can not get the chopstick_b chopstick, politely concedes…
chopstick_a chopstick grabbed by Philosopher #1
Philosopher #1 can not get the chopstick_b chopstick, politely concedes…
chopstick_a chopstick grabbed by Philosopher #1
Philosopher #1 can not get the chopstick_b chopstick, politely concedes…
chopstick_a chopsti^C

【並列処理】デッドロックの発生

デッドロックは他のタスクによって占有されて実行できない状態
left_chopstickとright_chopstickを別々に処理しているのでこのようなことが起きる。left_chopstickとright_chopstickを同時にacuqire, releaseすればロジック上、ロックされない。

from threading import Lock
from typing import Any, Tuple

class LockWithName:

    def __init__(self, name: str):
        self.name = name
        self._lock = Lock()

    def acquire(self) -> None:
        self._lock.acquire()

    def release(self) -> None:
        self._lock.release()

    def locked(self) -> bool:
        return self._lock.locked()

    def __enter__(self) -> None:
        self.acquire()

    def __exit__(self, *args: Tuple[Any]) -> None:
        self.release()
import time
from threading import Thread

from lock_with_name import LockWithName
dumplings = 20

class Philosopher(Thread):
    def __init__(self, name: str,
            left_chopstick: LockWithName,
            right_chopstick: LockWithName):
        super().__init__()
        self.name = name
        self.left_chopstick = left_chopstick
        self.right_chopstick = right_chopstick

    def run(self) -> None:
        global dumplings

        while dumplings > 0:
            self.left_chopstick.acquire()
            print(f"{self.left_chopstick.name} grabbed by {self.name} "
                f"now needs {self.right_chopstick.name}")
            self.right_chopstick.acquire()
            print(f"{self.right_chopstick.name} grabbed by {self.name}")
            dumplings -= 1
            print(f"{self.name} eats a dumpling. "
                f"Dumplings left: {dumplings}")
            self.right_chopstick.release()
            print(f"{self.right_chopstick.name} released by {self.name}")
            self.left_chopstick.release()
            print(f"{self.left_chopstick.name} released by {self.name}")
            print(f"{self.name} is thinking...")
            time.sleep(0.1)

if __name__ == "__main__":
    chopstick_a = LockWithName("chopstick_a")
    chopstick_b = LockWithName("chopstick_b")

    philosopher_1 = Philosopher("Philosopher #1", chopstick_a, chopstick_b)
    philosopher_2 = Philosopher("Philosopher #2", chopstick_b, chopstick_a)

    philosopher_1.start()
    philosopher_2.start()

$ python3 deadlock.py
chopstick_a grabbed by Philosopher #1 now needs chopstick_b
chopstick_b grabbed by Philosopher #2 now needs chopstick_a

止まってしまう

ここではwaiterというclassを追加しているが、別にwaiterではなく、whileのループの中で同時にacquire, releaseできれば良い。

import time
from threading import Thread, Lock
from lock_with_name import LockWithName
dumplings = 20

class Waiter:
    def __init__(self) -> None:
        self.mutex = Lock()

    def ask_for_chopsticks(self, 
                left_chopstick: LockWithName,
                right_chopstick: LockWithName) -> None:
        with self.mutex:
            left_chopstick.acquire()
            print(f"{left_chopstick.name} grabbed")
            right_chopstick.acquire()
            print(f"{right_chopstick.name} grabbed")

    def release_chopsticks(self, 
                left_chopstick: LockWithName,
                right_chopstick: LockWithName) -> None:
        
            right_chopstick.release()
            print(f"{right_chopstick.name} released")
            left_chopstick.release()
            print(f"{left_chopstick.name} released")


class Philosopher(Thread):
    def __init__(self, name: str, waiter: Waiter,
            left_chopstick: LockWithName,
            right_chopstick: LockWithName):
        super().__init__()
        self.name = name
        self.left_chopstick = left_chopstick
        self.right_chopstick = right_chopstick
        self.waiter = waiter

    def run(self) -> None:
        global dumplings

        while dumplings > 0:
            print(f"{self.name} asks waiter for chopsticks")
            self.waiter.ask_for_chopsticks (
                self.left_chopstick, self.right_chopstick)
            
            dumplings -= 1
            print(f"{self.name} eats a dumpling. "
                f"Dumpling left: {dumplings}")
            print(f"{self.name} returns chopsticks to waiter")
            self.waiter.release_chopsticks (
                self.left_chopstick, self.right_chopstick)
            time.sleep(0.1)

if __name__ == "__main__":
    chopstick_a = LockWithName("chopstick_a")
    chopstick_b = LockWithName("chopstick_b")

    waiter = Waiter()
    philosopher_1 = Philosopher("Philosopher #1", waiter, chopstick_a, chopstick_b)
    philosopher_2 = Philosopher("Philosopher #2", waiter, chopstick_b, chopstick_a)

    philosopher_1.start()
    philosopher_2.start()

$ python3 deadlock_arbitrator.py
Philosopher #1 asks waiter for chopsticks
chopstick_a grabbed
chopstick_b grabbed
Philosopher #1 eats a dumpling. Dumpling left: 19
Philosopher #1 returns chopsticks to waiter
chopstick_b released
chopstick_a released
….