【Rust】nonblocking

asyncでなくてもtcpstreamができて、nonblockingの設定もできるみたい。ただ、動かし方がよくわからん…
https://doc.rust-lang.org/std/net/struct.TcpStream.html#method.set_nonblocking
https://doc.rust-lang.org/std/net/struct.TcpListener.html#method.set_nonblocking

TcpStreamは接続で、TcpListenerはbindって理解で合ってるかな?

use std::io::{self, Read};
use std::net::TcpStream;

fn main() {
    let mut stream = TcpStream::connect("127.0.0.1:7878");
        expect("Couldn't connect to the server");
    stream.set_nonblocking(true).expect("set_nonblocking call failed");

    let mut buf = vec![];

    loop {
        match stream.read_to_end(&mut buf) {
            Ok(_) => break,
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                wait_for_fd();
            }
            Err(e) => panic!("encountered IO error: {e}"),
        };
    };
    println!("bytes: {buf:?}");
}

【Rust】TcpListenerとSocket

TcpListener
https://docs.rs/tokio/latest/tokio/net/struct.TcpListener.html

TcpListnerは元々Asyncにラップされているから、シングルスレッドでは使えない。

use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::TcpListener,
};

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let addr = "0.0.0.0:8080";
    let listener = TcpListener::bind(addr).await?;

    loop {
        match listener.accept().await {
            Ok((mut socket, _)) => {
                let mut buf = Vec::with_capacity(4096);
                socket.read_buf(&mut buf).await?;

                let msg = String::from_utf8(buf).expect("failed to convert str");
                println!("{msg}");

                socket.write(msg.as_bytes()).await?;
            }
            Err(err) => {
                println!("{err:?}");
            }   
        };
    }
}

Compiling parallel v0.1.0 (/home/vagrant/dev/rust/parallel)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.74s
Running `target/debug/parallel`
GET / HTTP/1.1
Host: 192.168.33.10:8080
User-Agent: curl/7.81.0
Accept: */*

ncコマンドで接続できる
$ nc 192.168.33.10 8080
hello
hello

【Rust】2進数への変換と2進数から10進数への変換

11を2進数にすると、
11/2 = 5 余り1
5/2 = 2 余り1
2/2 = 1 余り0
1/2 = 0 余り1

余りを先頭に足していく[1101]が答えになる。
それをそのままコードに落とし込む。こういう処理はfor文よりもwhileの方が向いている。なお、基数の値は、2進数から3、4と変えてもきちんと計算できる。

fn main() {
    let mut result = String::new();

    let mut target = 11;
    static cardinal: u32 = 2;
    
    while target >= cardinal {
        let rest = target % cardinal;
        result = format!("{}{}", rest.to_string(), result); 
        target = target / cardinal;
    }
    result = format!("{}{}", target.to_string(), result); 
    println!("{}", result);
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.23s
Running `target/debug/rust`
1011

2進数から10進数への変換

fn main() {
    to_cardinal(2, 55);

}

fn from_cardinal(cardinal: u32, t: u32){
    let target:String = t.to_string();
    let digit = target.chars().count();

    let mut result = 0;

    for i in 0..digit {
        let base = cardinal.pow((digit - i - 1).try_into().unwrap());
        let s = target.chars().nth(i).unwrap();
        let num = s.to_digit(10).unwrap() * base;
        result = result + num;
    }
    println!("{}", result);
}

うーん、to_cardinalがなんか違うような気がする
なお、rustには基数変換のライブラリが多数ある模様

Future

from __future__ import annotations

import typing as T 
from collections import deque
from random import randint

Result = T.Any
Burger = Result
Coroutine = T.Callable[[], 'Future']

class Future:
    def __init__(self) -> None:
        self.done = False
        self.coroutine = None
        self.result = None

    def set_coroutine(self, coroutine: Coroutine) -> None:
        self.coroutine = coroutine

    def set_result(self, result: Result) -> None:
        self.done = True
        self.result = result

    def __iter__(self) -> Future:
        return self

    def __next__(self) -> Result:
        if not self.done:
            raise StopIteration
        return self.result

class EventLoop:
    def __init__(self) -> None:
        self.tasks: T.Deque[Coroutine] = deque()

    def add_coroutine(self, coroutine: Coroutine) -> None:
        self.tasks.append(coroutine)

    def run_coroutine(self, task: T.Callable) -> None:
        future = task()
        future.set_coroutine(task)
        try:
            next(future)
            if not future.done:
                future.set_coroutine(task)
                self.add_coroutine(task)
        except StopIteration:
            return

    def run_forever(self) -> None:
        while self.tasks:
            self.run_coroutine(self.tasks.popleft())

def cook(on_done: T.Callable[[Burger], None]) -> None:
    burger: str = f"Burger #{randint(1, 10)}"
    print(f"{burger} is cooked!")
    on_done(burger)

def cashier(burger: Burger, on_done: T.Callable[[Burger], None]) -> None:
    print("Burger is ready for pick up!")
    on_done(burger)

def order_burger() -> Future:
    order = Future()

    def on_cook_done(burger: Burger) -> None:
        cashier(burger, on_cashier_done)

    def on_cashier_done(burger: Burger) -> None:
        print(f"{burger}? That's me! Mmmmmm!")
        order.set_result(burger)
    
    cook(on_cook_done)
    return order

if __name__ == "__main__":
    event_loop = EventLoop()
    event_loop.add_coroutine(order_burger)
    event_loop.run_forever()

$ python3 future_burger.py
Burger #10 is cooked!
Burger is ready for pick up!
Burger #10? That’s me! Mmmmmm!

コルーチン

from collections import deque
import typing as T

Coroutine = T.Generator[None, None, int]

class EventLoop:
    def __init__(self) -> None:
        self.tasks: T.Deque[Coroutine] = deque()

    def add_coroutine(self, task: Coroutine) -> None:
        self.tasks.append(task)
    
    def run_coroutine(self, task: Coroutine) -> None:
        try:
            task.send(None)
            self.add_coroutine(task)
        except StopIteration:
            print("Task completed")

    def run_forever(self) -> None:
        while self.tasks:
            print("Event loop cycle.")
            self.run_coroutine(self.tasks.popleft())

def fibonacci(n: int) -> Coroutine:
    a, b = 0, 1
    for i in range(n):
        a, b = b, a + b
        print(f"Fibonacci({i}): {a}")
        yield
    return a

if __name__ == "__main__":
    event_loop = EventLoop()
    event_loop.add_coroutine(fibonacci(5))
    event_loop.run_forever()

$ python3 coroutine.py
Event loop cycle.
Fibonacci(0): 1
Event loop cycle.
Fibonacci(1): 1
Event loop cycle.
Fibonacci(2): 2
Event loop cycle.
Fibonacci(3): 3
Event loop cycle.
Fibonacci(4): 5
Event loop cycle.
Task completed

イベント

from __future__ import annotations 

from collections import deque
from time import sleep
import typing as T

class Event:
    def __init__(self, name: str, action: T.Callable[..., None],
            next_event: T.Optional[Event] = None) -> None:
        self.name = name
        self._action = action
        self._next_event = next_event

    def execute_action(self) -> None:
        self._action(self)
        if self._next_event:
            event_loop.register_event(self._next_event)

class EventLoop:
    def __init__(self) -> None:
        self._events: deque[Event] = deque()

    def register_event(self, event: Event) -> None:
        self._events.append(event)

    def run_forever(self) -> None:
        print(f"Queue running with {len(self._events)} event")

        while True:
            try:
                event = self._events.popleft()
            except IndexError:
                continue
            event.execute_action()

def knock(event: Event) -> None:
    print(event.name)
    sleep(1)

def who(event: Event) -> None:
    print(event.name)
    sleep(1)

if __name__ == "__main__":
    event_loop = EventLoop()
    replying = Event("Who's there?", who)
    knocking = Event("Knock-knock", knock, replying)
    for _ in range(2):
        event_loop.register_event(knocking)
    event_loop.run_forever()

$ python3 event_loop.py
Queue running with 2 event
Knock-knock
Knock-knock
Who’s there?
Who’s there?

import typing as T
import select
from socket import socket, create_server

Data = bytes
Action = T.Union[T.Callable[[socket], None],
        T.Tuple[T.Callable[[socket, Data], None], str]]
Mask = int

class EventLoop:
    def __init__(self) -> None:
        self.writers = {}
        self.readers = {}

    def register_event(self, source: socket, event: Mask,
            action: Action) -> None:
        key = source.fileno()
        if event & select.POLLIN:
            self.readers[key] = (source, event, action)
        elif event & select.POLLOUT:
            self.writers[key] = (source, event, action)

    def unregister_event(self, source: socket) -> None:
        key = source.fileno()
        if self.readers.get(key):
            del self.readers[key]
        if self.writers.get(key):
            del self.writers[key]

    def run_forever(self) -> None:
        while True:
            readers, writers, _ = select.select(
                self.readers, self.writers, [])
            for reader in readers:
                source, event, action = self.readers.pop(reader)
                action(source)
            for writer in writers:
                source, event, action = self.writers.pop(writer)
                action, msg = action
                action(source, msg)

socketによるサーバ機能

from socket import socket, create_server

BUFFER_SIZE = 1024
ADDRESS = ("127.0.0.1", 12345)

class Server:
    def __init__(self) -> None:
        try:
            print(f"Starting up at: {ADDRESS}")
            self.server_socket: socket = create_server(ADDRESS)
        except OSError:
            self.server_socket.close()
            print("\nServer stopped.")

    def accept(self) -> socket:
        conn, client_address = self.server_socket.accept()
        print(f"Connected to {client_address}")
        return conn

    def serve(self, conn: socket) -> None:
        try:
            while True:
                data = conn.recv(BUFFER_SIZE)
                if not data:
                    break
                try:
                    order = int(data.decode())
                    response = f"Thank you for ordering {order} pizzas!\n"
                except ValueError:
                    response = "Wrong number of pizzas, please try again\n"
                print(f"Sending message to {conn.getpeername()}")
                conn.send(response.encode())
        finally:
            print(f"Connection with {conn.getpeername()} has been closed")
            conn.close()
        
    def start(self) -> None:
        print("Server listening for incoming connections")
        try:
            while True:
                conn = self.accept()
                self.serve(conn)

        finally:
            self.server_socket.close()
            print("\nServer stopped.")

if __name__ == "__main__":
    server = Server()
    server.start()

$ python3 pizza_server.py
Starting up at: (‘127.0.0.1’, 12345)
Server listening for incoming connections
Connected to (‘127.0.0.1’, 55292)
Sending message to (‘127.0.0.1’, 55292)
^CConnection with (‘127.0.0.1’, 55292) has been closed

$ nc 127.0.0.1 12345
10
Thank you for ordering 10 pizzas!

### サーバの並列化

from socket import socket, create_server
from threading import Thread

BUFFER_SIZE = 1024
ADDRESS = ("127.0.0.1", 12345)

class Handler(Thread):
    def __init__(self, conn: socket):
        super().__init__()
        self.conn = conn
    
    def run(self) -> None:
        print(f"Connected to {self.conn.getpeername()}")
        try:
            while True:
                data = self.conn.recv(BUFFER_SIZE)
                if not data:
                    break
                try:
                    order = int(data.decode())
                    response = f"Thank you for ordering {order} pizzas!\n"
                except ValueError:
                    response = "Wrong number of pizzas, please try again\n"
                print(f"Sending message to {self.conn.getpeername()}")
                self.conn.send(response.encode())
        finally:
            print(f"Connection with {self.conn.getpeername()} has been closed")
            self.conn.close()

class Server:
    def __init__(self) -> None:
        try:
            print(f"Starting up at: {ADDRESS}")
            self.server_socket: socket = create_server(ADDRESS)
        except OSError:
            self.server_socket.close()
            print("\nServer stopped.")
        
    def start(self) -> None:
        print("Server listening for incoming connections")
        try:
            while True:
                conn, address = self.server_socket.accept()
                print(f"Client connection request from {address}")
                thread = Handler(conn)
                thread.start()

        finally:
            self.server_socket.close()
            print("\nServer stopped.")

if __name__ == "__main__":
    server = Server()
    server.start()

### ノンブロッキングモード

import typing as T
from socket import socket, create_server

BUFFER_SIZE = 1024
ADDRESS = ("127.0.0.1", 12345)

class Server:
    clients: T.Set[socket] = set()

    def __init__(self) -> None:
        try:
            print(f"Starting up at: {ADDRESS}")
            self.server_socket: socket = create_server(ADDRESS)
            self.server_socket.setblocking(False)
        except OSError:
            self.server_socket.close()
            print("\nServer stopped.")

    def accept(self) -> socket:
        try:
            conn, address = self.server_socket.accept()
            print(f"Connected to {address}")
            conn.setblocking(False)
            self.clients.add(conn)
        except BlockingIOError:
            pass

    def serve(self, conn: socket) -> None:
        try:
            while True:
                data = conn.recv(BUFFER_SIZE)
                if not data:
                    break
                try:
                    order = int(data.decode())
                    response = f"Thank you for ordering {order} pizzas!\n"
                except ValueError:
                    response = "Wrong number of pizzas, please try again\n"
                print(f"Sending message to {conn.getpeername()}")
                conn.send(response.encode())
        except BlockingIOError:
            pass
        
    def start(self) -> None:
        print("Server listening for incoming connections")
        try:
            while True:
                self.accept()
                for conn in self.clients.copy():
                    self.serve(conn)

        finally:
            self.server_socket.close()
            print("\nServer stopped.")

if __name__ == "__main__":
    server = Server()
    server.start()

【Rust】Reader Writer Lock

use std::sync::RwLock の Read, Write
https://doc.rust-lang.org/stable/std/sync/struct.RwLock.html

概念は結構複雑なんだけど、なんかすげ〜簡単に書いてるな…

use std::sync::{Arc, RwLock};
use std::thread;

struct User {
    name: String,
    age: u32,
}

fn main() {

    let user = Arc::new(RwLock::new(User {
        name: String::from("Alice"),
        age: 30,
    }));

    let mut handles = vec![];

    for i in 0..10 {
        let data_clone = Arc::clone(&user);
        let handle = thread::spawn(move || {
            let shared = data_clone.read().unwrap();
            println!("読み取りスレッド {}: {}は{}です", i, shared.name, shared.age);
        });
        handles.push(handle);
    }

    for i in 0..5 {
        let data_clone = Arc::clone(&user);
        let handle = thread::spawn(move|| {
            let mut shared = data_clone.write().unwrap();
            shared.age += 1;
            shared.name = format!("Alice({})", i);
            println!("書き込みスレッド {}: カウンターを更新しました", i);            
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    let final_state = user.read().unwrap();
    println!("最終状態: {} は{}歳です", final_state.name, final_state.age);
}

Compiling parallel v0.1.0 (/home/vagrant/dev/rust/parallel)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.37s
Running `target/debug/parallel`
読み取りスレッド 0: Aliceは30です
読み取りスレッド 4: Aliceは30です
読み取りスレッド 2: Aliceは30です
読み取りスレッド 5: Aliceは30です
書き込みスレッド 3: カウンターを更新しました
書き込みスレッド 4: カウンターを更新しました
書き込みスレッド 2: カウンターを更新しました
書き込みスレッド 1: カウンターを更新しました
書き込みスレッド 0: カウンターを更新しました
読み取りスレッド 1: Alice(0)は35です
読み取りスレッド 8: Alice(0)は35です
読み取りスレッド 9: Alice(0)は35です
読み取りスレッド 3: Alice(0)は35です
読み取りスレッド 7: Alice(0)は35です
読み取りスレッド 6: Alice(0)は35です
最終状態: Alice(0) は35歳です

【Rust】Producer, Consumer問題をRustで書いてみる…

スレッド自体はProducerとConsumerで同時に走ってるんだけど、なんか全然違うなぁ…

pub static SIZE: u32 = 5;
static slot: Mutex<Vec<i32>> = Mutex::new(Vec::new());

fn work() {
    for i in 0..10 {
        let producer_handle = thread::spawn(move || {
            if slot.lock().unwrap().len() < SIZE.try_into().unwrap() {
                slot.lock().unwrap().push(1);
                println!("Producer {} work: {:?}", i, slot);
                thread::sleep(Duration::from_millis(500));
            } else {
                for j in 0..5 {
                    if slot.lock().unwrap()[j] == 0 {
                        slot.lock().unwrap()[j] = 1;
                        println!("Producer {} work: {:?}", i, slot);
                        thread::sleep(Duration::from_millis(500));
                    } 
                }
            }
            
        });

        let consumer_handle = thread::spawn(move || {
            if slot.lock().unwrap().len() > 0 {
                if slot.lock().unwrap()[0] == 1 {
                    slot.lock().unwrap()[0] = 0;
                    thread::sleep(Duration::from_millis(700));
                    println!("Consumer {} work: {:?}", i, slot);
                } else if slot.lock().unwrap()[0] == 1 {
                    slot.lock().unwrap()[1] = 0;
                    thread::sleep(Duration::from_millis(700));
                    println!("Consumer {} work: {:?}", i, slot);
                }
            }
        });
        
        producer_handle.join().unwrap();
        consumer_handle.join().unwrap();
    }
}

fn main() {
    work();
    println!("{:?}", slot);
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.31s
Running `target/debug/parallel`
Producer 0 work: Mutex { data: [1], poisoned: false, .. }
Producer 1 work: Mutex { data: [0, 1], poisoned: false, .. }
Consumer 1 work: Mutex { data: [0, 1], poisoned: false, .. }
Producer 2 work: Mutex { data: [0, 1, 1], poisoned: false, .. }
Producer 3 work: Mutex { data: [0, 1, 1, 1], poisoned: false, .. }
Producer 4 work: Mutex { data: [0, 1, 1, 1, 1], poisoned: false, .. }
Producer 5 work: Mutex { data: [0, 1, 1, 1, 1], poisoned: false, .. }
Consumer 5 work: Mutex { data: [0, 1, 1, 1, 1], poisoned: false, .. }
Producer 6 work: Mutex { data: [1, 1, 1, 1, 1], poisoned: false, .. }
Producer 7 work: Mutex { data: [1, 1, 1, 1, 1], poisoned: false, .. }
Consumer 7 work: Mutex { data: [1, 1, 1, 1, 1], poisoned: false, .. }
Producer 8 work: Mutex { data: [1, 1, 1, 1, 1], poisoned: false, .. }
Consumer 8 work: Mutex { data: [1, 1, 1, 1, 1], poisoned: false, .. }
Producer 9 work: Mutex { data: [1, 1, 1, 1, 1], poisoned: false, .. }
Consumer 9 work: Mutex { data: [1, 1, 1, 1, 1], poisoned: false, .. }

【Rust】vending machine(自販機)のプログラム

### 前準備
コンソールからの入力の受け取り

fn main() {
    println!("文字を入力してください:");

    let mut word = String::new();
    std::io::stdin().read_line(&mut word).ok();
    let answer = word.trim().to_string();

    println!("{}", answer);
}

文字を入力してください:
aaa
aaa

### お札、硬貨の枚数も合わせて返却する。
Hashmapにinsertするところは関数化したい

use std::collections::HashMap;
use itertools::Itertools;

fn cash_change(mut change: u32) -> HashMap<u32, u32> {
    let mut change_map: HashMap<u32, u32> = HashMap::new();
    if change / 5000 > 0 {
        change_map.insert(5000, change/5000);
        change = change % 5000;
    }
    if change / 1000 > 0 {
        change_map.insert(1000, change/1000);
        change = change % 1000;
    }
    if change / 500 > 0 {
        change_map.insert(500, change/500);
        change = change % 500;
    }
    if change / 100 > 0 {
        change_map.insert(100, change/100);
        change = change % 100;
    }
    if change / 50 > 0 {
        change_map.insert(50, change/50);
        change = change % 50;
    }
    if change / 10 > 0 {
        change_map.insert(10, change/10);
        change = change % 10;
    }
    if change > 0 {
        change_map.insert(1, change);
    }
    change_map
}

fn main() {
    println!("投入する金額を入力してください:");

    let mut word = String::new();
    std::io::stdin().read_line(&mut word).ok();
    let cash: u32 = word.trim().parse().expect("entered string was not a number");


    println!("商品の金額を入力してください:");
    let mut word = String::new();
    std::io::stdin().read_line(&mut word).ok();
    let price: u32 = word.trim().parse().expect("entered string was not a number");

    println!("お釣りは[{}]です。", cash - price);
    let mut change_map = cash_change(cash - price);
    let mut v: Vec<_> = change_map.into_iter().collect();
    v.sort_by(|x,y| y.0.cmp(&x.0));
    println!("{:?}", v);
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.25s
Running `target/debug/rust`
投入する金額を入力してください:
1000
商品の金額を入力してください:
333
お釣りは[667]です。
[(500, 1), (100, 1), (50, 1), (10, 1), (1, 7)]

### もっと簡潔に

use std::collections::HashMap;
use itertools::Itertools;

fn cash_change(mut change: u32) -> HashMap<u32, u32> {
    let mut change_map: HashMap<u32, u32> = HashMap::new();
    let coin = vec![5000, 1000, 500, 100, 50, 10, 1];
    for i in coin {
        if change / i > 0 {
            change_map.insert(i, change/i);
            change = change % i;
        }
    }
    change_map
}

fn main() {
    println!("投入する金額を入力してください:");

    let mut word = String::new();
    std::io::stdin().read_line(&mut word).ok();
    let cash: u32 = word.trim().parse().expect("entered string was not a number");


    println!("商品の金額を入力してください:");
    let mut word = String::new();
    std::io::stdin().read_line(&mut word).ok();
    let price: u32 = word.trim().parse().expect("entered string was not a number");

    println!("お釣りは[{}]です。", cash - price);
    let mut change_map = cash_change(cash - price);
    let mut v: Vec<_> = change_map.into_iter().collect();
    v.sort_by(|x,y| y.0.cmp(&x.0));
    println!("{:?}", v);
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.01s
Running `target/debug/rust`
投入する金額を入力してください:
3000
商品の金額を入力してください:
200
お釣りは[2800]です。
[(1000, 2), (500, 1), (100, 3)]

おおおおおおお、なるほど〜。同じ処理のところはイテレータにしてforループで回せば、より簡潔に書けますね。