【Rust】複数スレッドを同時にループ処理で走らせる【並列処理】

見た目はかなり悪いが、、、thread::spawnで同時にスレッドを走らせることができる。
各スレッドでは全てループ処理で待ち受けておいて、目的を達成したらbreakする。

static Washer_queue: Mutex<VecDeque<u32>> = Mutex::new(VecDeque::new());
static Dryer_queue: Mutex<VecDeque<u32>> = Mutex::new(VecDeque::new());
static Folder_queue: Mutex<VecDeque<u32>> = Mutex::new(VecDeque::new());
static Done_queue: Mutex<VecDeque<u32>> = Mutex::new(VecDeque::new());

fn assemble_laundry(n: u32) {
    for i in 1..(n+1) {
        Washer_queue.lock().unwrap().push_back(i);
    }
}

fn washer() {
    let w = Washer_queue.lock().unwrap().pop_front();
    if w != None {
        println!("washing {:?}...", w.unwrap());
        thread::sleep(Duration::from_millis(300));
        Dryer_queue.lock().unwrap().push_back(w.unwrap());
    }
}

fn dryer() {
    let d = Dryer_queue.lock().unwrap().pop_front();
    if d != None {
        println!("Drying {:?}...", d.unwrap());
        thread::sleep(Duration::from_millis(200));
        Folder_queue.lock().unwrap().push_back(d.unwrap());
    }
}

fn folder() {
    let f = Folder_queue.lock().unwrap().pop_front();
    if f != None {
        println!("Folding {:?}...", f.unwrap());
        thread::sleep(Duration::from_millis(100));
        Done_queue.lock().unwrap().push_back(f.unwrap());
    }
}

fn main() {    
    assemble_laundry(4);
    println!("{:?}", Washer_queue);
    let wash_handle = thread::spawn(|| {
        loop {
            if Washer_queue.lock().unwrap().len() == 0 {
                break;
            }
            washer();
        }
    });
    let dry_handle = thread::spawn(|| {
        loop {
            if Done_queue.lock().unwrap().len() == 4 {
                break;
            }
            dryer();
        }
    });
    let fold_handle = thread::spawn(|| {
        loop {
            if Done_queue.lock().unwrap().len() == 4{
                break;
            }
            folder();
        }
    });
    wash_handle.join().unwrap();
    dry_handle.join().unwrap();
    fold_handle.join().unwrap();
    println!("Washer {:?}", Washer_queue);
    println!("Dryer {:?}", Dryer_queue);
    println!("Folder {:?}", Folder_queue);
    println!("All work finished");
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.20s
Running `target/debug/parallel`
Mutex { data: [1, 2, 3, 4], poisoned: false, .. }
washing 1…
washing 2…
Drying 1…
Folding 1…
washing 3…
Drying 2…
Folding 2…
washing 4…
Drying 3…
Folding 3…
Drying 4…
Folding 4…
Mutex { data: [], poisoned: false, .. }
Mutex { data: [], poisoned: false, .. }
Mutex { data: [], poisoned: false, .. }
All work finished

もうちょっとうまい書き方をしたいが、やりたいこと自体はできている。。

【Rust】rustでqueueのpush, popをやりたい

VecDequeを使うと、vectorのpush, popが抽象化されている。

use std::collections::VecDeque;

fn main() {

    let mut washload: VecDeque<u32> = VecDeque::new();
    washload.push_back(1);
    washload.push_back(2);
    washload.push_back(3);
    println!("{:?}", washload);

    let n = washload.pop_front();
    println!("{:?}", n.unwrap());
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.17s
Running `target/debug/parallel`
[1, 2, 3]
1

なるほど、確かに使いやすい。

【並列処理】各スレッドでQueueを使ったパイプライン処理

whileのループでqueueから待ち受けるという処理を3つのスレッドで同時に実行している。うむ、中々複雑になってきた。

import time
from queue import Queue
from threading import Thread

Washload = str

class Washer(Thread):
    def __init__(self, in_queue: Queue[Washload],
        out_queue: Queue[Washload]):
        super().__init__()
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self) -> None:
        while True:
            washload = self.in_queue.get()
            print(f"Washer: washing{washload}...")
            time.sleep(4)
            self.out_queue.put(f'{washload}')
            self.in_queue.task_done()

class Dryer(Thread):
    def __init__(self, in_queue: Queue[Washload],
        out_queue: Queue[Washload]):
        super().__init__()
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self) -> None:
        while True:
            washload = self.in_queue.get()
            print(f"Dryer: dying {washload} ...")
            time.sleep(2)
            self.out_queue.put(f'{washload}')
            self.in_queue.task_done()

class Folder(Thread):
    def __init__(self, in_queue: Queue[Washload]):
        super().__init__()
        self.in_queue = in_queue

    def run(self) -> None:
        while True:
            washload = self.in_queue.get()
            print(f"Folder: folding {washload}...")
            time.sleep(1)
            print(f"Folder: {washload} done!")
            self.in_queue.task_done()

class Pipeline:
    def assemble_laundry_for_washing(self) -> Queue[Washload]:
        washload_count = 4
        washloads_in: Queue[Washload] = Queue(washload_count)
        for washload_num in range(washload_count):
            washloads_in.put(f'Washload #{washload_num}')
        return washloads_in

    def run_concurrently(self) -> None:
        to_be_washed = self.assemble_laundry_for_washing()
        to_be_dried: Queue[Washload] = Queue()
        to_be_folded: Queue[Washload] = Queue()

        Washer(to_be_washed, to_be_dried).start()
        Dryer(to_be_dried, to_be_folded).start()
        Folder(to_be_folded).start()

        to_be_washed.join()
        to_be_dried.join()
        to_be_folded.join()

        print("All done!")

if __name__ == "__main__":
    pipeline = Pipeline()
    pipeline.run_concurrently()

$ python3 pipeline.py
Washer: washingWashload #0…
Washer: washingWashload #1…
Dryer: dying Washload #0 …
Folder: folding Washload #0…
Folder: Washload #0 done!
Washer: washingWashload #2…
Dryer: dying Washload #1 …
Folder: folding Washload #1…
Folder: Washload #1 done!
Washer: washingWashload #3…
Dryer: dying Washload #2 …
Folder: folding Washload #2…
Folder: Washload #2 done!
Dryer: dying Washload #3 …
Folder: folding Washload #3…
Folder: Washload #3 done!
All done!

【並列処理】プリエンプティブマルチタスク

タイムシェアリング方式により、1つのコアでもスレッドが同時に実行されているような印象を与える。
現在のOSは殆どがプリエンプティブマルチタスクの機能がある。

import typing as T
from threading import Thread, Event, Timer, Event

from pacman import get_user_input, compute_game_world, render_next_screen

processor_free = Event()
processor_free_set()
TIME_SLICE = 0.5

class Task(Thread):
    def __init__(self, func: T.Callable[..., None]):
        super().__init__()
        self.func = func

    def run(self) -> None:
        while True:
            processor_free.wait()
            processor_free.clear()
            self.func()

class InterruptService(Timer):
    def __init__(self):
        super().__init__(TIME_SLICE, lambda: None)

    def run(self):
        while not self.finished.wait(self.interval):
            print("Tick!")
            processor_free.set()

def arcade_machine() -> None:
    get_user_input_task = Task(get_user_input)
    compute_game_world_task = Task(compute_game_world)
    render_next_screen_task = Task(render_next_screen)

    InterruptService().start()
    get_user_input_task.start()
    compute_game_world_task.start()
    render_next_screen_task.start()

if __name__ == "__main__":
    arcade_machine()

【並列処理】マルチタスクを学ぶ

アプリケーションの処理は主にCPUバウンド(CPUの計算処理)とI/Oバウンド(ディスクからの読み込み、入出力取得等)の2種類に分類される。
CPUバウンドの負荷は並列化によりパフォーマンスが改善される可能性がある。

スレッドごとに別々のタスクをループで実行

import typing as T
from threading import Thread, Event

from pacman import get_user_input, compute_game_world, render_next_screen

processor_free = Event()
#processor_free_set()

class Task(Thread):
    def __init__(self, func: T.Callable[..., None]):
        super().__init__()
        self.func = func

    def run(self) -> None:
        while True:
            processor_free.wait()
            processor_free.clear()
            self.func()

def arcade_machine() -> None:
    get_user_input_task = Task(get_user_input)
    compute_game_world_task = Task(compute_game_world)
    render_next_screen_task = Task(render_next_screen)

    get_user_input_task.start()
    compute_game_world_task.start()
    render_next_screen_task.start()

if __name__ == "__main__":
    arcade_machine()

ソースコードで見ると、構造がわかりやすい。

【Rust】rustでthread Poolによるパスワードクラッキング【並列処理】

use sha2::{Digest, Sha256};
use std::time;
use std::collections::HashMap;
use threadpool::ThreadPool;

fn main() {

    let hash = "2e9352c704043c75fa1c2a424fce7bef0569ec08af453e841101596d911d26e3".to_string();
    let length = 4;
    crack_password_parallel(hash, length);
}

fn crack_password_parallel(crypto_hash: String, length: u32) {
    let num_cores = num_cpus::get() as u32;
    let chunks = get_chunks(num_cores, length);
    let pool = ThreadPool::new(num_cores as usize);
    println!("{:?}", chunks);
    
    for (chunk_start, chunk_end) in chunks {
        let hash = crypto_hash.clone();
        pool.execute(move|| {
            println!("{}:{}", chunk_start, chunk_end);
            let combinations = get_chunk_combinations(length, chunk_start, chunk_end);
            for combination in combinations {
                if check_password(&hash, combination.clone()) {
                    println!("PASSWORD CRACKED:{}", combination);
                    break;
                }
            }
        });
    }
    pool.join();

}

fn get_chunk_combinations(length: u32, min_number: u32, max_number: u32) -> Vec<String> {
    let mut combinations: Vec<String> = Vec::new();
    for i in min_number..max_number {
        let str_num: String = i.to_string();
        let zeros: String = "0".repeat((length - str_num.chars().count() as u32).try_into().unwrap());
        combinations.push(format!("{}{}", zeros, str_num));
    }
    return combinations;
}

fn get_chunks(num_ranges: u32, length: u32) -> HashMap<u32, u32>{
    let max_number = 10_i32.pow(length) as u32;

    let mut chunk_starts = Vec::new();
    for i in 0..num_ranges {
        chunk_starts.push(max_number / num_ranges * i )
    }

    let mut chunk_ends = Vec::new();
    for i in &chunk_starts[1..] {
        chunk_ends.push(i - 1);
    }
    chunk_ends.push(max_number);

    let mut chunks:HashMap<u32, u32> = HashMap::new();
    for i in 0..chunk_starts.len() {
        chunks.insert(chunk_starts[i], chunk_ends[i]);
    }
    return chunks
}

fn get_combinations(length: u32) -> Vec<String> {
    let mut combinations: Vec<String> = Vec::new();
    let min_number = 0;
    let max_number = 10_i32.pow(length);

    for i in min_number..max_number {
        let str_num: String = i.to_string();
        let zeros: String = "0".repeat((length - str_num.chars().count() as u32).try_into().unwrap());
        combinations.push(format!("{}{}", zeros, str_num));
    }
    return combinations;
}

fn get_crypto_hash(password: String) -> String {
    let sha = Sha256::digest(password);
    hex::encode(sha).to_string()
}

fn check_password(expected_crypto_hash: &String, possible_password: String) -> bool {
    let actual_crypto_hash = get_crypto_hash(possible_password);
    return *expected_crypto_hash == actual_crypto_hash
}

fn crack_password(crypto_hash: String, length: u32) {
    println!("Processing number combinations sequentially");
    let start_time = time::Instant::now();
    let combinations: Vec<String> = get_combinations(length);
    for combination in combinations {
        if check_password(&crypto_hash.clone(), combination.clone()) {
            println!("PASSWORD CRACKED:{}", combination);
            break;
        }
    }
    println!("PROCESS TIME: {:?}", start_time.elapsed());
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.32s
Running `target/debug/parallel`
{5000: 10000, 0: 4999}
5000:10000
0:4999
PASSWORD CRACKED:5231

これは中々素晴らしいね^^

【並列処理】パスワードクラッキング

これをRustのthread poolで実装したい。。

import time
import math
import hashlib
import typing as T
import os
from multiprocessing import Pool

ChunkRange = T.Tuple[int, int]

def crack_chunk(crypto_hash: str, length: int, chunk_start: int, chunk_end: int)-> T.Union[str, None]:
    print(f"Processing {chunk_start} to {chunk_end}")
    combinations = get_combinations(length=length, min_number=chunk_start, max_number=chunk_end)
    for combination in combinations:
        if check_password(crypto_hash, combination):
            return combination
    return


def get_chunks(num_ranges: int, length: int) -> T.Iterator[ChunkRange]:
    max_number = int(math.pow(10, length) - 1)
    chunk_starts = [int(max_number / num_ranges * i) for i in range(num_ranges)]
    chunk_ends = [start_point - 1 for start_point in chunk_starts[1:]] + [max_number]
    return zip(chunk_starts, chunk_ends)

def crack_password_parallel(crypto_hash: str, legnth: int) -> None:
    num_cores = os.cpu_count()
    print("Processing number combinations correctly")
    start_time = time.perf_counter()

    with Pool() as pool:
        arguments = ((crypto_hash, length, chunk_start, chunk_end)
            for chunk_start, chunk_end in get_chunks(num_cores, length))
        results = pool.starmap(crack_chunk, arguments)
        print("waiting for chunks to finish")
        pool.close()
        pool.join()
    
    result = [res for res in results if res]
    print(f"PASSWORD CRACKED: {result[0]}")
    process_time = time.perf_counter() - start_time
    print(f"PROCESS TIME: {process_time}") 

def get_combinations(*, length: int,
    min_number: int = 0,
    max_number: T.Optional[int] = None) -> T.List[str]:

    combinations = []
    if not max_number:
        max_number = int(math.pow(10, length) - 1)
    for i in range(min_number, max_number + 1):
        str_num = str(i)
        zeros = "0" * (length - len(str_num))
        combinations.append("".join((zeros, str_num)))
    return combinations

def get_crypto_hash(password: str) -> str:
    return hashlib.sha256(password.encode()).hexdigest()

def check_password(expected_crypto_hash: str,
    possible_password: str) -> bool:
    actual_crypto_hash = get_crypto_hash(possible_password)
    return expected_crypto_hash == actual_crypto_hash

def crack_password(crypto_hash: str, length: int) -> None:
    print("Processing number combinations sequentially")
    start_time = time.perf_counter()
    combinations = get_combinations(length=length)
    for combination in combinations:
        if check_password(crypto_hash, combination):
            print(f"PASSWORD CRACKED: {combination}")
            break

    process_time = time.perf_counter() - start_time
    print(f"PROCESS TIME: {process_time}" )

if __name__ == "__main__":
    crypto_hash = "8bb0cf6eb9b17d0f7d22b456f121257dc1254e1f01665370476383ea776df414"
    length = 7
    crack_password_parallel(crypto_hash, length)

$ python3 password_cracking_parallel.py
Processing number combinations correctly
Processing 4999999 to 9999999
Processing 0 to 4999998
waiting for chunks to finish
PASSWORD CRACKED: 1234567
PROCESS TIME: 3.9475781959481537

【Rust】rustのthread Pool【並列処理】

use threadpool::ThreadPool;
use std::sync::mpsc;

pub struct Data { n: i32 }

impl Data {
    fn incr(&mut self) { self.n += 1; }
}

fn main() {
    
    let n_workers = 4;
    let pool = ThreadPool::new(n_workers);
    let (tx, rx) = mpsc::channel();
    let v = vec![Data{n:0}, Data{n:1}, Data{n:2}];
    let n_jobs = v.len();
    for mut data in v {
        let tx = tx.clone();
        pool.execute(move || {
            data.incr();
            tx.send(data).expect("channel will be there waiting for the pool");
        });
    }
    let sum: i32 = rx.iter().take(n_jobs).map(|data| data.n).sum();
    println!("sum= {}", sum);
}

sum= 6

thread poolはデフォルトの選択肢?

use threadpool::ThreadPool;
use std::sync::mpsc;
use std::thread;


fn main() {
    
    let n_workers = 4;
    let pool = ThreadPool::new(n_workers);
    let (tx, rx) = mpsc::channel();
    for i in 0..20 {
        let tx = tx.clone();
        pool.execute(move || {
            // println!("{}", thread::current().name().unwrap());
            tx.send(i).expect("channel will be there waiting for the pool");        
        });
    }
    for i in 0..20 {
        let j = rx.recv().unwrap();
        println!("{}", j);
    }
}

【Python】thread Pool【並列処理】

import time
import queue
import typing as T
from threading import Thread, current_thread

Callback = T.Callable[..., None]
Task = T.Tuple[Callback, T.Any, T.Any]
TaskQueue = queue.Queue

class Worker(Thread):
    def __init__(self, tasks: queue.Queue[Task]):
        super().__init__()
        self.tasks = tasks

    def run(self) -> None:
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception as e:
                print(e)
            self.tasks.task_done()

class ThreadPool:
    def __init__(self, num_threads: int):
        self.tasks: TaskQueue = queue.Queue(num_threads)
        self.num_threads = num_threads

        for _ in range(self.num_threads):
            worker = Worker(self.tasks)
            worker.setDaemon(True)
            worker.start()

    def submit(self, func: Callback, *args, **kargs) -> None:
        self.tasks.put((func, args, kargs))

    def wait_completion(self) -> None:
        self.tasks.join()

def cpu_waster(i: int) -> None:
    name = current_thread().getName()
    print(f"{name} doing {i} work")
    time.sleep(3)

def main() -> None:
    pool = ThreadPool(num_threads=5)
    for i in range(20):
        pool.submit(cpu_waster, i)

    print("All work requests sent")
    pool.wait_completion()
    print("All work completed")

if __name__ == "__main__":
    main()

name = current_thread().getName()
Thread-2 doing 0 work
Thread-5 doing 1 work
Thread-4 doing 2 work
Thread-1 doing 3 work
Thread-3 doing 4 work
Thread-2 doing 5 work
Thread-5 doing 6 work
Thread-1 doing 8 work
Thread-4 doing 7 work
Thread-3 doing 9 work
Thread-5 doing 10 work
Thread-1 doing 11 work
Thread-4 doing 12 work
Thread-2 doing 14 work
Thread-3 doing 13 work
All work requests sent
Thread-1 doing 15 work
Thread-4 doing 16 work
Thread-2 doing 17 work
Thread-3 doing 18 work
Thread-5 doing 19 work
All work completed

スレッドプールを実装してそれぞれのスレッドで並列処理を行なっているのはわかるんだが、
callbackとqueue, typingの使い方がイマイチよくわからん…

【Rust】rustでunix domain socket【並列処理】

use std::io::prelude::*;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::Path;

fn handle_client(mut stream: UnixStream) -> std::io::Result<()> {
    let mut buf = [0; 1024];

    let n = stream.read(&mut buf)?;
    let s = String::from_utf8_lossy(&buf[..n]);
    println!("{}", s);

    Ok(())
}

fn main()-> Result<(), Box<dyn std::error::Error>> {
    let sockfile = Path::new("/tmp/uds.sock");
    if sockfile.exists() {
        fs::remove_file(&sockfile)?;
    }

    let listner = UnixListener::bind(sockfile)?;
    for stream in listner.incoming() {
        let stream = stream?;
        thread::spawn(move || handle_client(stream).unwrap());
    }

    Ok(())
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.28s
Running `target/debug/parallel`

これは基本的に、socketのincomingの待ち受け

use nix::unistd::{fork, getpid, getppid, ForkResult};
use std::thread;
use std::fs;
use std::io::prelude::*;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::Path;

use futures::StreamExt;
use tokio;
use tokio::io::AsyncReadExt;
use tokio::net::unix::{UnixListener, UnixStream};

async fn handle_client(mut stream: UnixStream) -> Result<(), Box<dyn std::error::Error>> {
    let mut buf = [0; 1024];

    let n = stream.read(&mut buf).await?;
    let s = String::from_utf8_lossy(&buf[..n]);
    println!("{}", s);

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // let mut stm = UnixStream::connect("/tmp/uds.sock");
    // stm?.write_all(b"hello world")?;

    let sockfile = Path::new("/tmp/uds.sock");
    if sockfile.exists() {
        fs::remove_file(&sockfile)?;
    }

    let listner = UnixListener::bind(sockfile)?;
    let mut incoming = listner.incoming();
    while let Some(stream) = incoming.next().await {
        let stream = stream?;
        tokio::spawn(async move {
            handle_client(stream).await.unwrap();
        });
    }

    Ok(())
}

動かない….

やりたいことはこれなんだが、これも上手くいかん…

use std::io::prelude::*;
use std::os::unix::net::{UnixListener, UnixStream};
use std::thread;

pub static SOCKET_PATH: &'static str = "rst.sock";

fn sender() {
    let name = "Sender".to_string();
    let mut stream = UnixStream::connect(SOCKET_PATH).unwrap();
    let messages = vec!["Hello", " ", "world!"];
    for message in messages {
        stream.write_all(b"hello world").unwrap();
        let mut response = String::new();
        stream.read_to_string(&mut response).unwrap();
        println!("{response}");
    }
}

fn handle_client(mut stream: UnixStream) -> std::io::Result<()> {
    let mut buf = [0; 1024];

    let n = stream.read(&mut buf)?;
    let s = String::from_utf8_lossy(&buf[..n]);
    println!("{}", s);

    Ok(())
}

fn receiver() {
    let handle = thread::spawn(move || {
        let listner = UnixListener::bind(SOCKET_PATH).unwrap();
        for stream in listner.incoming() {
            let stream = stream.unwrap();
            thread::spawn(move || handle_client(stream).unwrap());
        }
    });
    handle.join().unwrap();
}

fn main() {
    receiver();
    sender();
}