【Python】共有メモリによるIPC【並列処理】

shared_memory = [-1] * SIZE は、[-1, -1, -1, -1, -1] になり、producerで[0, 1, 2, 3, 4]に変わる。ここがややこしい。

import time
from threading import Thread, current_thread

SIZE = 5
shared_memory = [-1] * SIZE

class Producer(Thread):
    def run(self) -> None:
        self.name = "Producer"
        global shared_memory
        for i in range(SIZE):
            print(f"{current_thread().name}: Writing {int(i)}")
            shared_memory[i - 1] = i

class Consumer(Thread):
    def run(self) -> None:
        self.name = "Consumer"
        global shared_memory
        for i in range(SIZE):
            while True:
                line = shared_memory[i]
                if line == -1:
                    print(f"{current_thread().name}: Data not available\n"
                            f"Sleeping for 1 second before retrying")
                    time.sleep(1)
                    continue
                print(f"{current_thread().name}: Read: {int(line)}")
                break

def main() -> None:
    threads = [
        Consumer(),
        Producer(),
    ]

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()
    

【Rust】Rustで新規のthreadを生成する【並列処理】

use nix::unistd::{fork, getpid, getppid, ForkResult};
use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.75s
Running `target/debug/parallel`
hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the spawned thread!
hi number 3 from the main thread!
hi number 4 from the spawned thread!
hi number 4 from the main thread!
hi number 5 from the spawned thread!

joinを呼び出すことで、実行されたことを保証する

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..5 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

所有権の移転

fn main() {
    let v = vec![1, 2, 3]; 
    let handle = thread::spawn(move|| {
        for i in 0..3 {
            println!("hi number {} from the spawned thread!", v[i]);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..3 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

なんか違うような気がするが…

use nix::unistd::{fork, getpid, getppid, ForkResult};
use std::thread;
use std::time::Duration;
use rayon::current_num_threads;

fn cpu_waster(i: u32) {
    let name = gettid::gettid().to_string();
    println!("{} doing {} work", name, i);
    thread::sleep(Duration::from_millis(5));
}

fn display_threads() {
    let str: String = "-".to_string();
    println!("{}", str.repeat(10));
    println!("Current process PID: {}", getpid());
    println!("Thread Count: {}", current_num_threads());
}

fn main() {
    let num_threads = 5;
    display_threads();

    println!("Starting {} CPU waters...", num_threads);
    let handle = thread::spawn(move|| {
        for i in 0..num_threads {
            cpu_waster(i);
        }
    });
    display_threads();

    handle.join().unwrap();
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.23s
Running `target/debug/parallel`
———-
Current process PID: 582557
Thread Count: 2
Starting 5 CPU waters…
———-
Current process PID: 582557
Thread Count: 2
582600 doing 0 work
582600 doing 1 work
582600 doing 2 work
582600 doing 3 work
582600 doing 4 work

【Rust】rustで子プロセスを生成したい【並列処理】

nixを使います。

nix = “0.16.1”

use nix::unistd::{getpid};

fn main() {
    println!("My id is {}", getpid());
}

My id is 577891

### 子プロセスの生成

use nix::unistd::{fork, getpid, getppid, ForkResult};
use nix::sys::wait::waitpid;
use std::process::exit;

fn main() {
    let num_children = 2;
    start_parent(num_children);
}

fn start_parent(num_children: u32) {
    println!("Parent: I am the parent process");
    println!("Parent PID is {}", getpid());
    for i in 0..num_children {
        run_child();
    }
}

fn run_child() {
    let child_pid = match fork() {
        Ok(ForkResult::Parent {child, ..}) => {
            println!("Main({}) forked child ({})", getpid(), child);
            child
        },
        Ok(ForkResult::Child) => {
            println!("Child({}) PPID is ({})", getpid(), getppid());
            exit(0);
        },
        Err(_) => panic!("fork failed"),
    };

    match waitpid(child_pid, None) {
        Ok(status) => println!(""),
        Err(_) => println!("waitpid() failed"),
    }
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.20s
Running `target/debug/parallel`
Parent: I am the parent process
Parent PID is 579363
Main(579363) forked child (579390)
Child(579390) PPID is (579363)

Main(579363) forked child (579391)
Child(579391) PPID is (579363)

おおおおおおおおお、中々素晴らしい!

【Rust】処理を分割するための関数を作ろう【並列処理】

fn get_chunks(num_ranges: u32, length: u32) {
    let max_number = 10_i32.pow(length) as u32;

    let mut chunk_starts = Vec::new();
    for i in 0..num_ranges {
        chunk_starts.push(max_number / num_ranges * i )
    }

    let mut chunk_ends:Vec<u32> = Vec::new();
    for i in &chunk_starts[1..] {
        chunk_ends.push(i - 1);
    }
    chunk_ends.push(max_number);

    let mut chunks = HashMap::new();
    for i in 0..chunk_starts.len() {
        chunks.insert(chunk_starts[i], chunk_ends[i]);
    }
    println!("{:?}", chunks);
}

get_chunks(5, 2);
{40: 59, 60: 79, 20: 39, 80: 100, 0: 19}

なるほどね

【Rust】総当たり計算によるパスワード解読【並列処理】

存在しうる全ての値を計算する。
桁数が増えれば増えるほど、処理時間がかかる。6桁で約1秒(1000ms)くらいかかる。

use sha2::{Digest, Sha256};
use std::time;

fn main() {
    let password: String = "13225".to_string(); 
    let length = password.chars().count() as u32;

    let crypto_hash: String = get_crypto_hash(password);
    crack_password(crypto_hash, length);
}

fn get_combinations(length: u32) -> Vec<String> {
    let mut combinations: Vec<String> = Vec::new();
    let min_number = 0;
    let max_number = 10_i32.pow(length);

    for i in min_number..max_number {
        let str_num: String = i.to_string();
        let zeros: String = "0".repeat((length - str_num.chars().count() as u32).try_into().unwrap());
        combinations.push(format!("{}{}", zeros, str_num));
    }
    return combinations;
}

fn get_crypto_hash(password: String) -> String {
    let sha = Sha256::digest(password);
    hex::encode(sha).to_string()
}

fn check_password(expected_crypto_hash: String, possible_password: String) -> bool {
    let actual_crypto_hash = get_crypto_hash(possible_password);
    return expected_crypto_hash == actual_crypto_hash
}

fn crack_password(crypto_hash: String, length: u32) {
    println!("Processing number combinations sequentially");
    let start_time = time::Instant::now();
    let combinations: Vec<String> = get_combinations(length);
    for combination in combinations {
        if check_password(crypto_hash.clone(), combination.clone()) {
            println!("PASSWORD CRACKED:{}", combination);
            break;
        }
    }
    println!("PROCESS TIME: {:?}", start_time.elapsed());
}

Compiling parallel v0.1.0 (/home/vagrant/dev/rust/parallel)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.16s
Running `target/debug/parallel`
Processing number combinations sequentially
PASSWORD CRACKED:13225
PROCESS TIME: 97.760443ms

【Rust】rustで冪乗の計算をする

println!("{}", power(3));

//

fn power(x: u32) -> i32 {
    10_i32.pow(x)
}

Compiling sample v0.1.0 (/home/vagrant/dev/rust/sample)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.19s
Running `target/debug/sample`
1000

【Rust】axumでjsonデータを受け取ってjsonで返す

値を整形せずに、そのまま返しています。

async fn handle_service(extract::Json(payload): extract::Json<Service>) -> Json<Service>{

    println!("{:?}", payload);
    Json(payload)
}

curl -X POST -H “Content-Type: application/json” -d “{\”version\”:\”1\”,\”nonce\”:\”1482680747\”,\”connections\”:21}” 127.0.0.1:3000/service
{“version”:”1″,”nonce”:”1482680747″,”connections”:21}

なるほどー

【Rust】自分のIPアドレスを取得する

IPを返却するサイトからIPアドレスを取得します。
httpbinにreqwest::getすると、String型で返却されるので、serde_jsonを使ってIpの構造体に変換しています。

use reqwest::Client;
use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize, Debug)]
struct Ip {
    origin: String,
}

#[tokio::main]
async fn main() {
    let _ = get_ip().await;
}

// get ip
async fn get_ip() -> Result<(), Box<dyn std::error::Error>> {

    let ip: Ip = serde_json::from_str(&reqwest::get("https://httpbin.org/ip")
        .await?
        .text()
        .await?)?;
    
    println!("ip = {:?}", ip.origin);
    Ok(())
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 2.08s
Running `target/debug/sample`
ip = “***.***.*.***”

うーむ、なんだかな〜

【Rust】トランザクションにnftデータを追加する

UnsignedTransaction, SignedTransactionの構造体に、nft_dataと、nft_originを追加する。
nft_dataは、nftとして保存するデータ。
nft_originはnftを発行したトランザクションのhashデータ。
NFTを譲渡する場合は、nft_dataを空にして、nft_originに値を入れる。

#[derive(Serialize, Deserialize, Clone, Debug)]
struct UnsignedTransaction {
    time: String,
    sender: String,
    receiver: String,
    amount: i32,
    nft_data: String,
    nft_origin: String,
}

nft_holderというhashmapを作成して、そこにnftを発行したトランザクションのhashデータと、最後にNTFを譲渡されたreceiverの連想配列を入れていく。
誰が何を持っているかは、トランザクションのhashデータをデコードして、nft_dataの値を取り出す。

#[tokio::main]
async fn main(){

    let mut transaction_pool: Vec<SignedTransaction> = Vec::new();
    let transaction1 = UnsignedTransaction {time:Utc::now().to_string(), sender:"047683c00f6z".to_string(), receiver:"DyH5SHvezz".to_string(), amount: 0, nft_data:"hello world".to_string(),nft_origin:"".to_string()};
    let transaction2 = UnsignedTransaction {time:Utc::now().to_string(), sender:"DyH5SHvezz".to_string(), receiver:"655EFC80ss".to_string(), amount: 0, nft_data:"".to_string(),nft_origin:"eyJ0aW1lIjoiMjAyNS0wMS0xNyAwODoyOTo0Ni42NzgzNDU3OTQgVVRDIiwic2VuZGVyIjoiMDQ3NjgzYzAwZjZ6IiwicmVjZWl2ZXIiOiJEeUg1U0h2ZXp6IiwiYW1vdW50IjowLCJuZnRfZGF0YSI6ImhlbGxvIHdvcmxkIiwibmZ0X29yaWdpbiI6IiJ9".to_string()};
    let transaction3 = UnsignedTransaction {time:Utc::now().to_string(), sender:"047683c00f6z".to_string(), receiver:"DyH5SHvezz".to_string(), amount: 0, nft_data:"Milk Cafe".to_string(),nft_origin:"".to_string()};
    let transaction4 = UnsignedTransaction {time:Utc::now().to_string(), sender:"047683c00f6z".to_string(), receiver:"DyH5SHvezz".to_string(), amount: 1000, nft_data:"".to_string(),nft_origin:"".to_string()};

    // println!("{}", BASE64_STANDARD.encode(serde_json::to_vec(&transaction1.clone()).unwrap()));
    // println!("{}", base64_decode(&str));

    transaction_pool.push(sign_transaction(&transaction1));
    transaction_pool.push(sign_transaction(&transaction2));
    transaction_pool.push(sign_transaction(&transaction3));
    transaction_pool.push(sign_transaction(&transaction4));

    let nft_holder:HashMap<String, String> = nft_calc(transaction_pool);
    for (k, v) in nft_holder {
        let transaction_str = base64_decode(&k);
        let transaction:UnsignedTransaction = serde_json::from_str(&transaction_str).unwrap();
        println!("保有者:{}, NFT:{}", v, transaction.nft_data);
    }
}

fn nft_calc(transaction_pool: Vec<SignedTransaction>) -> HashMap<String, String> {
    let mut nft_holder: HashMap<String, String> = HashMap::new();
    for transaction in transaction_pool {
        if transaction.amount == 0 {
            let transaction_hash: String = BASE64_STANDARD.encode(serde_json::to_vec(&transaction.clone()).unwrap());
            if transaction.nft_origin == "" && transaction.nft_data != "" && (nft_holder.get(&transaction_hash) == None) {
                nft_holder.insert(transaction_hash, transaction.receiver);
            } else if (nft_holder.get(&transaction_hash) == Some(&transaction.sender)) && transaction.nft_data == "" {
                nft_holder.insert(transaction.nft_origin, transaction.receiver);
            }
        }
    }
    return nft_holder
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 2.13s
Running `target/debug/sample`
保有者:DyH5SHvezz, NFT:hello world
保有者:DyH5SHvezz, NFT:Milk Cafe

これは中々凄いな…