【Rust】ディレクトリのパス一覧をStringでVectorに入れる

時間かかったー

fn main() -> Result<(), Box<dyn std::error::Error>>{    

    let mut entries: Vec<String> = Vec::new();
    let dir = Path::new("./src");
    if dir.is_dir(){
        for entry in fs::read_dir(dir)? {
            let e = entry?;
            let p = e.path().file_name().unwrap().to_string_lossy().into_owned();
            entries.push(p);
        }
    }
    println!("{:?}", entries);
    Ok(())
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.31s
Running `target/debug/parallel queue`
[“pipeline.rs”, “unix_domain_socket.rs”, “password_cracking.rs”, “shared_ipc.rs”, “main.rs”, “child_processes.rs”, “multithreading.rs”, “queue.rs”, “thread_pool.rs”, “password_cracking_parallel.rs”, “unixstream.rs”, “pipe.rs”]

【Rust】コマンドライン引数を取得する

use std::env;

fn main() {    
    let args: Vec<String> = env::args().collect();
    println!("{:?}", args[1]);
}

$ cargo run queue
Compiling parallel v0.1.0 (/home/vagrant/dev/rust/parallel)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.15s
Running `target/debug/parallel queue`
“queue”

【Rust】複数スレッドを同時にループ処理で走らせる【並列処理】

見た目はかなり悪いが、、、thread::spawnで同時にスレッドを走らせることができる。
各スレッドでは全てループ処理で待ち受けておいて、目的を達成したらbreakする。

static Washer_queue: Mutex<VecDeque<u32>> = Mutex::new(VecDeque::new());
static Dryer_queue: Mutex<VecDeque<u32>> = Mutex::new(VecDeque::new());
static Folder_queue: Mutex<VecDeque<u32>> = Mutex::new(VecDeque::new());
static Done_queue: Mutex<VecDeque<u32>> = Mutex::new(VecDeque::new());

fn assemble_laundry(n: u32) {
    for i in 1..(n+1) {
        Washer_queue.lock().unwrap().push_back(i);
    }
}

fn washer() {
    let w = Washer_queue.lock().unwrap().pop_front();
    if w != None {
        println!("washing {:?}...", w.unwrap());
        thread::sleep(Duration::from_millis(300));
        Dryer_queue.lock().unwrap().push_back(w.unwrap());
    }
}

fn dryer() {
    let d = Dryer_queue.lock().unwrap().pop_front();
    if d != None {
        println!("Drying {:?}...", d.unwrap());
        thread::sleep(Duration::from_millis(200));
        Folder_queue.lock().unwrap().push_back(d.unwrap());
    }
}

fn folder() {
    let f = Folder_queue.lock().unwrap().pop_front();
    if f != None {
        println!("Folding {:?}...", f.unwrap());
        thread::sleep(Duration::from_millis(100));
        Done_queue.lock().unwrap().push_back(f.unwrap());
    }
}

fn main() {    
    assemble_laundry(4);
    println!("{:?}", Washer_queue);
    let wash_handle = thread::spawn(|| {
        loop {
            if Washer_queue.lock().unwrap().len() == 0 {
                break;
            }
            washer();
        }
    });
    let dry_handle = thread::spawn(|| {
        loop {
            if Done_queue.lock().unwrap().len() == 4 {
                break;
            }
            dryer();
        }
    });
    let fold_handle = thread::spawn(|| {
        loop {
            if Done_queue.lock().unwrap().len() == 4{
                break;
            }
            folder();
        }
    });
    wash_handle.join().unwrap();
    dry_handle.join().unwrap();
    fold_handle.join().unwrap();
    println!("Washer {:?}", Washer_queue);
    println!("Dryer {:?}", Dryer_queue);
    println!("Folder {:?}", Folder_queue);
    println!("All work finished");
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.20s
Running `target/debug/parallel`
Mutex { data: [1, 2, 3, 4], poisoned: false, .. }
washing 1…
washing 2…
Drying 1…
Folding 1…
washing 3…
Drying 2…
Folding 2…
washing 4…
Drying 3…
Folding 3…
Drying 4…
Folding 4…
Mutex { data: [], poisoned: false, .. }
Mutex { data: [], poisoned: false, .. }
Mutex { data: [], poisoned: false, .. }
All work finished

もうちょっとうまい書き方をしたいが、やりたいこと自体はできている。。

【Rust】rustでqueueのpush, popをやりたい

VecDequeを使うと、vectorのpush, popが抽象化されている。

use std::collections::VecDeque;

fn main() {

    let mut washload: VecDeque<u32> = VecDeque::new();
    washload.push_back(1);
    washload.push_back(2);
    washload.push_back(3);
    println!("{:?}", washload);

    let n = washload.pop_front();
    println!("{:?}", n.unwrap());
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.17s
Running `target/debug/parallel`
[1, 2, 3]
1

なるほど、確かに使いやすい。

【Rust】rustでthread Poolによるパスワードクラッキング【並列処理】

use sha2::{Digest, Sha256};
use std::time;
use std::collections::HashMap;
use threadpool::ThreadPool;

fn main() {

    let hash = "2e9352c704043c75fa1c2a424fce7bef0569ec08af453e841101596d911d26e3".to_string();
    let length = 4;
    crack_password_parallel(hash, length);
}

fn crack_password_parallel(crypto_hash: String, length: u32) {
    let num_cores = num_cpus::get() as u32;
    let chunks = get_chunks(num_cores, length);
    let pool = ThreadPool::new(num_cores as usize);
    println!("{:?}", chunks);
    
    for (chunk_start, chunk_end) in chunks {
        let hash = crypto_hash.clone();
        pool.execute(move|| {
            println!("{}:{}", chunk_start, chunk_end);
            let combinations = get_chunk_combinations(length, chunk_start, chunk_end);
            for combination in combinations {
                if check_password(&hash, combination.clone()) {
                    println!("PASSWORD CRACKED:{}", combination);
                    break;
                }
            }
        });
    }
    pool.join();

}

fn get_chunk_combinations(length: u32, min_number: u32, max_number: u32) -> Vec<String> {
    let mut combinations: Vec<String> = Vec::new();
    for i in min_number..max_number {
        let str_num: String = i.to_string();
        let zeros: String = "0".repeat((length - str_num.chars().count() as u32).try_into().unwrap());
        combinations.push(format!("{}{}", zeros, str_num));
    }
    return combinations;
}

fn get_chunks(num_ranges: u32, length: u32) -> HashMap<u32, u32>{
    let max_number = 10_i32.pow(length) as u32;

    let mut chunk_starts = Vec::new();
    for i in 0..num_ranges {
        chunk_starts.push(max_number / num_ranges * i )
    }

    let mut chunk_ends = Vec::new();
    for i in &chunk_starts[1..] {
        chunk_ends.push(i - 1);
    }
    chunk_ends.push(max_number);

    let mut chunks:HashMap<u32, u32> = HashMap::new();
    for i in 0..chunk_starts.len() {
        chunks.insert(chunk_starts[i], chunk_ends[i]);
    }
    return chunks
}

fn get_combinations(length: u32) -> Vec<String> {
    let mut combinations: Vec<String> = Vec::new();
    let min_number = 0;
    let max_number = 10_i32.pow(length);

    for i in min_number..max_number {
        let str_num: String = i.to_string();
        let zeros: String = "0".repeat((length - str_num.chars().count() as u32).try_into().unwrap());
        combinations.push(format!("{}{}", zeros, str_num));
    }
    return combinations;
}

fn get_crypto_hash(password: String) -> String {
    let sha = Sha256::digest(password);
    hex::encode(sha).to_string()
}

fn check_password(expected_crypto_hash: &String, possible_password: String) -> bool {
    let actual_crypto_hash = get_crypto_hash(possible_password);
    return *expected_crypto_hash == actual_crypto_hash
}

fn crack_password(crypto_hash: String, length: u32) {
    println!("Processing number combinations sequentially");
    let start_time = time::Instant::now();
    let combinations: Vec<String> = get_combinations(length);
    for combination in combinations {
        if check_password(&crypto_hash.clone(), combination.clone()) {
            println!("PASSWORD CRACKED:{}", combination);
            break;
        }
    }
    println!("PROCESS TIME: {:?}", start_time.elapsed());
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.32s
Running `target/debug/parallel`
{5000: 10000, 0: 4999}
5000:10000
0:4999
PASSWORD CRACKED:5231

これは中々素晴らしいね^^

【Rust】rustのthread Pool【並列処理】

use threadpool::ThreadPool;
use std::sync::mpsc;

pub struct Data { n: i32 }

impl Data {
    fn incr(&mut self) { self.n += 1; }
}

fn main() {
    
    let n_workers = 4;
    let pool = ThreadPool::new(n_workers);
    let (tx, rx) = mpsc::channel();
    let v = vec![Data{n:0}, Data{n:1}, Data{n:2}];
    let n_jobs = v.len();
    for mut data in v {
        let tx = tx.clone();
        pool.execute(move || {
            data.incr();
            tx.send(data).expect("channel will be there waiting for the pool");
        });
    }
    let sum: i32 = rx.iter().take(n_jobs).map(|data| data.n).sum();
    println!("sum= {}", sum);
}

sum= 6

thread poolはデフォルトの選択肢?

use threadpool::ThreadPool;
use std::sync::mpsc;
use std::thread;


fn main() {
    
    let n_workers = 4;
    let pool = ThreadPool::new(n_workers);
    let (tx, rx) = mpsc::channel();
    for i in 0..20 {
        let tx = tx.clone();
        pool.execute(move || {
            // println!("{}", thread::current().name().unwrap());
            tx.send(i).expect("channel will be there waiting for the pool");        
        });
    }
    for i in 0..20 {
        let j = rx.recv().unwrap();
        println!("{}", j);
    }
}

【Rust】rustでパイプを使ったデータの受け渡し【並列処理】

let (mut tx, mut rx) = channel();でチャンネルを開設したとき、tx, rxの型が何かに苦戦しました。結局、pipe_channel::Sender、pipe_channel::Receiverになるのね。相互にやり取りするのではなく、一方的にデータを送るときなどに有効活用できそうです。

use std::thread;
use pipe_channel::*;

fn writer(mut tx: pipe_channel::Sender<String>) {
    let name = "Writer".to_string();
    let handle = thread::spawn(move || {
        println!("{}: Sending rubber duck...", name);
        tx.send("Rubber duck".to_string()).unwrap();
    });
    handle.join().unwrap();
}

fn reader(mut rx: pipe_channel::Receiver<String>) {
    let name = "Reader".to_string();
    let handle = thread::spawn(move || {
        println!("{}: Reading...", name);
        println!("{}: received ({})", name, rx.recv().unwrap());
    });
    handle.join().unwrap();
}

fn main() {
    let (mut tx, mut rx) = channel();
    writer(tx);
    reader(rx);
}

fn print_typename<T>(_: T) {
    println!("{}", std::any::type_name::<T>());
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.29s
Running `target/debug/parallel`
Writer: Sending rubber duck…
Reader: Reading…
Reader: received (Rubber duck)

【Rust】rustでもglobal vectorによるIPC処理【並列処理】

forループが終わってからスレッドが並列で動いているので、rustとpythonだと、スレッドの立ち方が微妙に違いますね。

static SIZE: u32 = 5;
static Shared_Memory: Mutex<Vec<i32>> = Mutex::new(Vec::new());

fn main() {
    for _i in 0..SIZE {
        Shared_Memory.lock().unwrap().push(-1);
    }
    consumer();
    producer();
    
    println!("{:?}", Shared_Memory);
}

fn producer() {
    let name = "Producer".to_string();
    let mut memory = Shared_Memory.lock().unwrap();
    for i in 0..SIZE {   
        println!("{}: Writing {}", name, i);
        memory[i as usize] = i as i32;
    }
}

fn consumer() {
    let name = "Consumer".to_string();
    
    let handle = thread::spawn(move || {
        let mut memory = Shared_Memory.lock().unwrap();
        for i in 0..SIZE {
            while true {
                let line = memory[i as usize];
                if line == -1{
                    println!("{}: Data not available, sleeping fro 1 second for retrying.", name);
                    thread::sleep(Duration::from_millis(500));
                }
                println!("{}: Read: ({})", name, line);
                break;
            }
        }
    });
    handle.join().unwrap();
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.33s
Running `target/debug/parallel`
Consumer: Data not available, sleeping fro 1 second for retrying.
Consumer: Read: (-1)
Consumer: Data not available, sleeping fro 1 second for retrying.
Consumer: Read: (-1)
Consumer: Data not available, sleeping fro 1 second for retrying.
Consumer: Read: (-1)
Consumer: Data not available, sleeping fro 1 second for retrying.
Consumer: Read: (-1)
Consumer: Data not available, sleeping fro 1 second for retrying.
Consumer: Read: (-1)
Producer: Writing 0
Producer: Writing 1
Producer: Writing 2
Producer: Writing 3
Producer: Writing 4
Mutex { data: [0, 1, 2, 3, 4], poisoned: false, .. }

【Python】共有メモリによるIPC【並列処理】

shared_memory = [-1] * SIZE は、[-1, -1, -1, -1, -1] になり、producerで[0, 1, 2, 3, 4]に変わる。ここがややこしい。

import time
from threading import Thread, current_thread

SIZE = 5
shared_memory = [-1] * SIZE

class Producer(Thread):
    def run(self) -> None:
        self.name = "Producer"
        global shared_memory
        for i in range(SIZE):
            print(f"{current_thread().name}: Writing {int(i)}")
            shared_memory[i - 1] = i

class Consumer(Thread):
    def run(self) -> None:
        self.name = "Consumer"
        global shared_memory
        for i in range(SIZE):
            while True:
                line = shared_memory[i]
                if line == -1:
                    print(f"{current_thread().name}: Data not available\n"
                            f"Sleeping for 1 second before retrying")
                    time.sleep(1)
                    continue
                print(f"{current_thread().name}: Read: {int(line)}")
                break

def main() -> None:
    threads = [
        Consumer(),
        Producer(),
    ]

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()
    

【Rust】Rustで新規のthreadを生成する【並列処理】

use nix::unistd::{fork, getpid, getppid, ForkResult};
use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.75s
Running `target/debug/parallel`
hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the spawned thread!
hi number 3 from the main thread!
hi number 4 from the spawned thread!
hi number 4 from the main thread!
hi number 5 from the spawned thread!

joinを呼び出すことで、実行されたことを保証する

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..5 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

所有権の移転

fn main() {
    let v = vec![1, 2, 3]; 
    let handle = thread::spawn(move|| {
        for i in 0..3 {
            println!("hi number {} from the spawned thread!", v[i]);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..3 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

なんか違うような気がするが…

use nix::unistd::{fork, getpid, getppid, ForkResult};
use std::thread;
use std::time::Duration;
use rayon::current_num_threads;

fn cpu_waster(i: u32) {
    let name = gettid::gettid().to_string();
    println!("{} doing {} work", name, i);
    thread::sleep(Duration::from_millis(5));
}

fn display_threads() {
    let str: String = "-".to_string();
    println!("{}", str.repeat(10));
    println!("Current process PID: {}", getpid());
    println!("Thread Count: {}", current_num_threads());
}

fn main() {
    let num_threads = 5;
    display_threads();

    println!("Starting {} CPU waters...", num_threads);
    let handle = thread::spawn(move|| {
        for i in 0..num_threads {
            cpu_waster(i);
        }
    });
    display_threads();

    handle.join().unwrap();
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.23s
Running `target/debug/parallel`
———-
Current process PID: 582557
Thread Count: 2
Starting 5 CPU waters…
———-
Current process PID: 582557
Thread Count: 2
582600 doing 0 work
582600 doing 1 work
582600 doing 2 work
582600 doing 3 work
582600 doing 4 work