【Python】ブルームフィルタ

bloom filterの全体像。ハッシュ化した値をインデックス番号にして、bloom_filterの値を更新する。
bloom filterにハッシュ値が入っているかどうか確認することで、bloom filterに登録されているかを判定する。

bloom_filter = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

def hash_a(val):
    return hash_val

def hash_b(val):
    return hash_val


val = "hello world"

a_hashed = hash_a(val) # 1
b_hashed = hash_b(val) # 4

bloom_filter[a_hashed] = 1
bloom_filter[b_hashed] = 1

[0, 1, 0, 0, 4, 0, 0, 0, 0, 0]

bloom_filter[a_hashed]
bloom_filter[b_hashed]
import functools

class BloomFilter:
    def __init__(self, filter_size):
        self.filter_size = filter_size
        self.bloom_filter = [0 for _ in range(filter_size)]

    def set_v(self, val):
        indexes = self.n_hash(val)
        for index in indexes:
            self.bloom_filter[index] = 1

    def n_hash(self, val):
        hashed = abs(hash(val))
        d_lst = [int(n) for n in str(hashed)]
        return [
            self._hash_common(lambda acc, d: acc + d, d_lst),
            self._hash_common(lambda acc, d: acc + 3 * d, d_lst),
        ]

    def _hash_common(self, func, d_lst):
        execed = abs(functools.reduce(func, d_lst, 0))
        while execed >= self.filter_size:
            execed = execed / self.filter_size
        return int(execed)

    def exist_v(self, val):
        indexes = self.n_hash(val)
        for index in indexes:
            if self.bloom_filter[index] == 0:
                return False
            return True

bf = BloomFilter(10)
print(bf.bloom_filter)
bf.set_v(3)
print(bf.bloom_filter)
print(bf.exist_v(3))
print(bf.exist_v(10))

$ python3 bloom_filter.py
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 1, 0, 0, 0, 0, 0, 1]
True
False

BTCではSPVがScriptPubKeyをbloomfilterにセットして、リモートノードがマッチングアルゴリズムで判定する。
-> リモートノード側では、トランザクションごとに、ScriptPubKey, Outpointがマッチするかを判定して、マッチする場合はブルームフィルタを更新している。

なるほど、直接pubkeyのデータをやり取りしない分、安全性が向上するということね。これをRustでやりたい。

文字列を数値に変換するには、文字列を数値として持っておいて、それを変換でしょうか。

static ASCII_LOWER: [char; 26] = [
    'a', 'b', 'c', 'd', 'e', 
    'f', 'g', 'h', 'i', 'j', 
    'k', 'l', 'm', 'n', 'o',
    'p', 'q', 'r', 's', 't', 
    'u', 'v', 'w', 'x', 'y', 
    'z',
];

【Python】HD walletの親鍵/子鍵とchaincodeの概要

子の秘密鍵は、親のchaincdeと[親の公開鍵+index]をhmac_sha512でハッシュ化して作成している。

### マスター秘密鍵、公開鍵

import os
import binascii
import hmac
import hashlib
import ecdsa

seed = os.urandom(32)
root_key = b"Bitcoin seed"

def hmac_sha512(data, key_message):
    hash = hmac.new(data, key_message, hashlib.sha512).digest()
    return hash

def create_pubkey(private_key):
    publickey = ecdsa.SigningKey.from_string(private_key, curve=ecdsa.SECP256k1).verifying_key.to_string()
    return publickey

master = hmac_sha512(seed, root_key)

master_secretkey = master[:32]
master_chaincode = master[32:]

master_publickey = create_pubkey(master_secretkey)
master_publickey_integer = int.from_bytes(master_publickey[32:], byteorder="big")

if master_publickey_integer %2 == 0:
    master_publickey_x = b"\x02" + master_publickey[:32]
else:
    master_publickey_x = b"\x03" + master_publickey[:32]

print(binascii.hexlify(master_secretkey))
print(binascii.hexlify(master_chaincode))
print(binascii.hexlify(master_publickey_x))

$ python3 master_key.py
b’8a6dbaaff700682778dcbae2bc8718452fe5ed80fc9026a9b564420f8d5b0d80′
b’4ce8b10cc0c0874467d8f438c412fdbf21fba51517e668dbc4bd105af6861dec’
b’03cb15210804ca8f0d45b620832be935e2f90c3830f13f04c4bd6e8b4648f27817′
(secretkey, chaincode, pubkey)

### 子秘密鍵、子公開鍵

index = 0
index_bytes = index.to_bytes(8, "big")

data = master_publickey_x + index_bytes

result_hmac512 = hmac_sha512(data, master_chaincode)
sum_integer = int.from_bytes(master_secretkey,"big") + int.from_bytes(result_hmac512[:32],"big")

p = 2 ** 256 - 2**32 - 2**9 - 2**8 - 2**7 - 2**6 - 2**4 - 1
child_secretkey = (sum_integer % p).to_bytes(32,"big")

child_chaincode = result_hmac512[32:]

child_publickey = create_pubkey(child_secretkey)
child_publickey_integer = int.from_bytes(child_publickey[32:], byteorder="big")

if child_publickey_integer %2 == 0:
    child_publickey_x = b"\x02" + child_publickey[:32]
else:
    child_publickey_x = b"\x03" + child_publickey[:32]

print(binascii.hexlify(child_secretkey))
print(binascii.hexlify(child_chaincode))
print(binascii.hexlify(child_publickey_x))

b’5ff011a3e9cd672aaf0dc9fd52cb3172ac2815cb270f919135e6b0f0e6e03d54′
b’15f4d148b2d7730076d5e670249649ea8f0fd8572dad3818680e347196149dda’
b’03480a1dbb4a87d867bee3d364b608e21d685af271876707b9f9d5b75c6df6fde7′

b’23faf6fad81cd93e12c003c944ba3ef215dae714c638756386b6b9404da5aac9′
b’e3563dc6891e238cd0d8ebf99e65ebfc67cecf42364de9756b89859bbd049b62′
b’02039253af3e828bfbf1e560fe0e923a144fc4496ded3b6bbfa0d568cf7177d1c3′

なるほど、一見複雑そうに見えるが、なかなか面白いね

【Python】DH-Walletのライブラリを利用してみる

$ pip3 install hdwallet

#!/usr/bin/env python3

from hdwallet import HDWallet
from hdwallet.entropies import (
    BIP39Entropy, BIP39_ENTROPY_STRENGTHS
)
from hdwallet.mnemonics import BIP39_MNEMONIC_LANGUAGES
from hdwallet.cryptocurrencies import Bitcoin as Cryptocurrency
from hdwallet.hds import BIP32HD
from hdwallet.derivations import CustomDerivation
from hdwallet.const import PUBLIC_KEY_TYPES

import json

# Initialize Bitcoin HDWallet
hdwallet: HDWallet = HDWallet(
    cryptocurrency=Cryptocurrency,
    hd=BIP32HD,
    network=Cryptocurrency.NETWORKS.MAINNET,
    language=BIP39_MNEMONIC_LANGUAGES.KOREAN,
    public_key_type=PUBLIC_KEY_TYPES.COMPRESSED,
    passphrase="talonlab"
).from_entropy(  # Get Bitcoin HDWallet from entropy
    entropy=BIP39Entropy(
        entropy=BIP39Entropy.generate(
            strength=BIP39_ENTROPY_STRENGTHS.ONE_HUNDRED_SIXTY
        )
    )
).from_derivation(  # Drive from Custom derivation
    derivation=CustomDerivation(
        path="m/0'/0/0"
    )
)

# Print all Bitcoin HDWallet information's
print(json.dumps(hdwallet.dump(exclude={"indexes"}), indent=4, ensure_ascii=False)) 

$ python3 hd_wallet.py
{
“cryptocurrency”: “Bitcoin”,
“symbol”: “BTC”,
“network”: “mainnet”,
“coin_type”: 0,
“entropy”: “f90afa34cc3653845d06c179ba917569c60e1179”,
“strength”: 160,
“mnemonic”: “호흡 분리 여론 온종일 생신 조용히 숙소 추석 시월 청춘 사계절 철도 살림 건축 팩스”,
“passphrase”: “talonlab”,
“language”: “Korean”,
“seed”: “d79ac6b15a3782eefd52ae4b09c961ddd7263f6de4a852534497139516bc04549cb62dd2063f21090b13b58c442a35610f630034ca9ad85877ee8a544f8643b4”,
“ecc”: “SLIP10-Secp256k1”,
“hd”: “BIP32”,
“semantic”: “p2pkh”,
“root_xprivate_key”: “xprv9s21ZrQH143K35Wb8sjvrSbVYqX2xGJvX64QxwC1spNkdnakC1Mtwv2CQE1trAMGA83TGEZSjWtdh1s9pFT2P6z2PKepwjvhbkeU69UsGaE”,
“root_xpublic_key”: “xpub661MyMwAqRbcFZb4EuGwDaYE6sMXMj2mtJz1mKbdS9ujWautjYg9ViLgFWsQNQ3PjEnrdPhNhS9JvuH6WLG5pgJaTBezMoy7ecEKLK3BAvH”,
“root_private_key”: “4ba3752c7a474a3be62ef060fd79c6517b7f2af01d7e573f11685e20f55a0303”,
“root_wif”: “Kykk1CNJWm7sAsve2eDCezUykFyQfX9BavJNTWakMutPYSSHZ5kP”,
“root_chain_code”: “66708f4fdae1aec89612e24205d9244f0b8a1a4ada2fc6076e0eea8a91131831”,
“root_public_key”: “033bbd876465246c571f02df1aa65527c82dfa9779833a2840ecb63167f1f507a0”,
“strict”: true,
“public_key_type”: “compressed”,
“wif_type”: “wif-compressed”,
“derivation”: {
“at”: {
“path”: “m/0’/0/0”,
“depth”: 3,
“index”: 0
},
“xprivate_key”: “xprv9yoVptUsgY29Jz8tMnbLeoWQygM3m6zYSWdYDiVdBazjqyc11ioZHdowCxYi17fN5CwDnm3ith9tMeVjv5GToJY4S7NLrbuged6Q3DJiGWy”,
“xpublic_key”: “xpub6CnrEQ1mWuaSXUDMTp8M1wT9XiBYAZiPojZ926uEjvXiimw9ZG7oqS8R4Dev3hzwcXeqNmMiN9knr88cPtduVZ1Gqn1kchT23eB5wK7N8t1”,
“private_key”: “96ceeef7b35bfd428842d2dc08719d3c5b5cff9967a7110ece12d57af017a66b”,
“wif”: “L2GryxVSFxauZCLCQJqmV85tVNoXCXyRdC5zomodDWwd33BPjJNn”,
“chain_code”: “7a95361a7ffc067c5165411d55d1400bc5b3cb5ea2ca90741e918fd2a8fb3349”,
“public_key”: “02a0e4a452f270f2647949a57cc55909182eb2f232f4b5146ccdd232a4ffed8803”,
“uncompressed”: “04a0e4a452f270f2647949a57cc55909182eb2f232f4b5146ccdd232a4ffed88030259ef4fa9649169a66b61c26881426d68b6b51ad6ea3cb7dc0b41cc85b6ba00”,
“compressed”: “02a0e4a452f270f2647949a57cc55909182eb2f232f4b5146ccdd232a4ffed8803”,
“hash”: “1bff595df8d0d9695e612eca95afcf4c882e5f32”,
“fingerprint”: “1bff595d”,
“parent_fingerprint”: “9b0194d3”,
“addresses”: {
“p2pkh”: “13Z399RbXeELGgKRKq5sM8nNim9waZGoPW”,
“p2sh”: “35nKB4UKgHSzRYX4J36YxLFMqm7YpCkMKU”,
“p2tr”: “bc1pmsdcxya38lnkmrs62e4yuad0cgr322tp9cc7sgjrmfe3e3jasaeqcvrvwl”,
“p2wpkh”: “bc1qr0l4jh0c6rvkjhnp9m9ftt70fjyzuhejdwqt46”,
“p2wpkh_in_p2sh”: “3KNNVBigqcdPAsUU1apzbRJj5dx993YoGQ”,
“p2wsh”: “bc1qfznz7ncnv4xu29zx3cdv7dzdtgc9e600ukd0kxyca7dg6k9gswqqs8xs7w”,
“p2wsh_in_p2sh”: “36MvcLHZq8jAogs7rm8beCo6rZEybZRkzk”
}
}
}

#!/usr/bin/env python3

from hdwallet.utils import generate_passphrase
print(generate_passphrase(length=32))

$ python3 hd_wallet.py
MQIjC1twsYSqRYAYzUbXZGouqirSa65t

#!/usr/bin/env python3

from hdwallet.mnemonics.algorand import AlgorandMnemonic, ALGORAND_MNEMONIC_WORDS, ALGORAND_MNEMONIC_LANGUAGES
mnemonic: str = AlgorandMnemonic.from_words(words=ALGORAND_MNEMONIC_WORDS.TWENTY_FIVE, language=ALGORAND_MNEMONIC_LANGUAGES.ENGLISH)
print(AlgorandMnemonic.from_entropy(entropy="65234f4ec655b087dd74d186126e301d73d563961890b2f718476e1a32522329", language=ALGORAND_MNEMONIC_LANGUAGES.ENGLISH))

hole develop cheese fragile gaze giggle plunge sphere express reunion oblige crack priority ocean seven mosquito wagon glow castle plunge goddess stand empower ability empower

行列の計算

import random
from typing import List

Row = List[int]
Matrix = List[Row]

def matrix_multiply(matrix_a: Matrix, matrix_b: Matrix) -> Matrix:
    num_rows_a = len(matrix_a)
    num_cols_a = len(matrix_a[0])
    num_rows_b = len(matrix_b)
    num_cols_b = len(matrix_b[0])
    if num_cols_a != num_rows_b:
        raise ArithmeticError(
            f"Invalid dimensions; Cannot multiply "
            f"{num_rows_a}x{num_cols_a}*{num_rows_b}x{num_cols_b}"
        )
    solution_matrix = [[0] * num_cols_b for _ in range(num_rows_a)]
    for i in range(num_rows_a):
        for j in range(num_cols_b):
            for k in range(num_cols_a):
                solution_matrix[i][j] += matrix_a[i][k] * matrix_b[k][j]
    return solution_matrix

if __name__ == "__main__":
    cols = 3
    rows = 2
    A = [[random.randint(0, 10) for i in range(cols)]
        for j in range(rows)]
    print(f"matrix A: {A}")
    B = [[random.randint(0, 10) for i in range(rows)]
        for j in range(cols)]
    print(f"matrix B: {B}")
    C = matrix_multiply(A, B)
    print(f"matrix C: {C}")

$ python3 matmul_sequential.py
matrix A: [[9, 0, 0], [10, 6, 1]]
matrix B: [[9, 5], [1, 5], [10, 5]]
matrix C: [[81, 45], [106, 85]]

ランダムな連想配列の計測

選挙投票のカウントを想定したもの

import typing as T
import random

Summary = T.Mapping[int, int]

def process_votes(pile: T.List[int]) -> Summary:
    summary = {}
    for vote in pile:
        if vote in summary:
            summary[vote] += 1
        else:
            summary[vote] = 1
    return summary

if __name__ == "__main__":
    num_candidates = 3
    num_voters = 100
    pile = [random.randint(1, num_candidates) for _ in range(num_voters)]
    print(pile)
    counts = process_votes(pile)
    print(f"Total number of votes: {counts}")

$ python3 count_votes_sequential.py
[1, 1, 2, 1, 2, 3, 2, 2, 1, 2, 2, 3, 3, 2, 3, 1, 2, 2, 2, 3, 1, 1, 2, 3, 1, 2, 3, 3, 2, 2, 3, 2, 1, 3, 2, 2, 3, 3, 2, 3, 3, 3, 2, 3, 2, 1, 1, 3, 2, 3, 3, 2, 2, 1, 1, 1, 1, 2, 3, 3, 1, 2, 2, 3, 2, 1, 2, 2, 1, 3, 1, 1, 2, 1, 1, 2, 3, 3, 1, 2, 1, 2, 3, 3, 3, 1, 2, 2, 2, 1, 1, 3, 2, 1, 2, 2, 3, 2, 3, 1]
Total number of votes: {1: 29, 2: 40, 3: 31}

Rustで書き直す
範囲指定のランダムは rnd.gen_range となる。

use std::time::Instant;
use threadpool::ThreadPool;
use rand::Rng;
use std::collections::HashMap;

fn process_votes(pile: Vec<u32>) -> HashMap<u32, u32> {
    let mut summary = HashMap::new();
    for vote in pile {
        if summary.get(&vote) != None {
            let count = summary.get(&vote).unwrap();
            summary.insert(vote, count + 1);
        } else {
            summary.insert(vote, 1);
        }
    }
    summary
}


fn main(){    
    
    let num_voters = 100;
    let num_candidate = 3;
    
    let mut pile: Vec<u32> = Vec::new();
    let mut rnd = rand::thread_rng();
    for _ in 0..num_voters {
        pile.push(rnd.gen_range(1..(num_candidate + 1)))
    }
    let summary = process_votes(pile);
    println!("{:?}", summary);
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.29s
Running `target/debug/parallel`
{3: 42, 2: 33, 1: 25}

ここまではOK

【Python】ファイルから文字列を検索

短いコードなんだけと、割と高度なことをやっている…

import os
import time
import glob
import typing as T

def search_file(file_location: str, search_string: str) -> bool:
    with open(file_location, "r", encoding="utf8") as file:
        return search_string in file.read()
    
def search_files_sequentially(file_locations: T.List[str], search_string: str) -> None:
    for file_name in file_locations:
        result = search_file(file_name, search_string)
        if result:
            print(f"Found word in file: `{file_name}`")

if __name__ == "__main__":
    file_locations = list(
        glob.glob(f"{os.path.abspath(os.getcwd())}/*.py"))
    search_string = input("what word are you trying to find?: ")

    start_time = time.perf_counter()
    search_files_sequentially(file_locations, search_string)
    process_time = time.perf_counter() - start_time
    print(f"PROCESS TIME: {process_time}")

$ python3 find_files_sequential.py
what word are you trying to find?: queue
Found word in file: `/home/vagrant/dev/rust/parallel/python/thread_pool.py`
Found word in file: `/home/vagrant/dev/rust/parallel/python/pipeline.py`
Found word in file: `/home/vagrant/dev/rust/parallel/python/message_queue.py`
PROCESS TIME: 0.004676494048908353

OK, これをRustで書きたい…

【並列処理】パスワードクラッキング

これをRustのthread poolで実装したい。。

import time
import math
import hashlib
import typing as T
import os
from multiprocessing import Pool

ChunkRange = T.Tuple[int, int]

def crack_chunk(crypto_hash: str, length: int, chunk_start: int, chunk_end: int)-> T.Union[str, None]:
    print(f"Processing {chunk_start} to {chunk_end}")
    combinations = get_combinations(length=length, min_number=chunk_start, max_number=chunk_end)
    for combination in combinations:
        if check_password(crypto_hash, combination):
            return combination
    return


def get_chunks(num_ranges: int, length: int) -> T.Iterator[ChunkRange]:
    max_number = int(math.pow(10, length) - 1)
    chunk_starts = [int(max_number / num_ranges * i) for i in range(num_ranges)]
    chunk_ends = [start_point - 1 for start_point in chunk_starts[1:]] + [max_number]
    return zip(chunk_starts, chunk_ends)

def crack_password_parallel(crypto_hash: str, legnth: int) -> None:
    num_cores = os.cpu_count()
    print("Processing number combinations correctly")
    start_time = time.perf_counter()

    with Pool() as pool:
        arguments = ((crypto_hash, length, chunk_start, chunk_end)
            for chunk_start, chunk_end in get_chunks(num_cores, length))
        results = pool.starmap(crack_chunk, arguments)
        print("waiting for chunks to finish")
        pool.close()
        pool.join()
    
    result = [res for res in results if res]
    print(f"PASSWORD CRACKED: {result[0]}")
    process_time = time.perf_counter() - start_time
    print(f"PROCESS TIME: {process_time}") 

def get_combinations(*, length: int,
    min_number: int = 0,
    max_number: T.Optional[int] = None) -> T.List[str]:

    combinations = []
    if not max_number:
        max_number = int(math.pow(10, length) - 1)
    for i in range(min_number, max_number + 1):
        str_num = str(i)
        zeros = "0" * (length - len(str_num))
        combinations.append("".join((zeros, str_num)))
    return combinations

def get_crypto_hash(password: str) -> str:
    return hashlib.sha256(password.encode()).hexdigest()

def check_password(expected_crypto_hash: str,
    possible_password: str) -> bool:
    actual_crypto_hash = get_crypto_hash(possible_password)
    return expected_crypto_hash == actual_crypto_hash

def crack_password(crypto_hash: str, length: int) -> None:
    print("Processing number combinations sequentially")
    start_time = time.perf_counter()
    combinations = get_combinations(length=length)
    for combination in combinations:
        if check_password(crypto_hash, combination):
            print(f"PASSWORD CRACKED: {combination}")
            break

    process_time = time.perf_counter() - start_time
    print(f"PROCESS TIME: {process_time}" )

if __name__ == "__main__":
    crypto_hash = "8bb0cf6eb9b17d0f7d22b456f121257dc1254e1f01665370476383ea776df414"
    length = 7
    crack_password_parallel(crypto_hash, length)

$ python3 password_cracking_parallel.py
Processing number combinations correctly
Processing 4999999 to 9999999
Processing 0 to 4999998
waiting for chunks to finish
PASSWORD CRACKED: 1234567
PROCESS TIME: 3.9475781959481537

【Python】thread Pool【並列処理】

import time
import queue
import typing as T
from threading import Thread, current_thread

Callback = T.Callable[..., None]
Task = T.Tuple[Callback, T.Any, T.Any]
TaskQueue = queue.Queue

class Worker(Thread):
    def __init__(self, tasks: queue.Queue[Task]):
        super().__init__()
        self.tasks = tasks

    def run(self) -> None:
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception as e:
                print(e)
            self.tasks.task_done()

class ThreadPool:
    def __init__(self, num_threads: int):
        self.tasks: TaskQueue = queue.Queue(num_threads)
        self.num_threads = num_threads

        for _ in range(self.num_threads):
            worker = Worker(self.tasks)
            worker.setDaemon(True)
            worker.start()

    def submit(self, func: Callback, *args, **kargs) -> None:
        self.tasks.put((func, args, kargs))

    def wait_completion(self) -> None:
        self.tasks.join()

def cpu_waster(i: int) -> None:
    name = current_thread().getName()
    print(f"{name} doing {i} work")
    time.sleep(3)

def main() -> None:
    pool = ThreadPool(num_threads=5)
    for i in range(20):
        pool.submit(cpu_waster, i)

    print("All work requests sent")
    pool.wait_completion()
    print("All work completed")

if __name__ == "__main__":
    main()

name = current_thread().getName()
Thread-2 doing 0 work
Thread-5 doing 1 work
Thread-4 doing 2 work
Thread-1 doing 3 work
Thread-3 doing 4 work
Thread-2 doing 5 work
Thread-5 doing 6 work
Thread-1 doing 8 work
Thread-4 doing 7 work
Thread-3 doing 9 work
Thread-5 doing 10 work
Thread-1 doing 11 work
Thread-4 doing 12 work
Thread-2 doing 14 work
Thread-3 doing 13 work
All work requests sent
Thread-1 doing 15 work
Thread-4 doing 16 work
Thread-2 doing 17 work
Thread-3 doing 18 work
Thread-5 doing 19 work
All work completed

スレッドプールを実装してそれぞれのスレッドで並列処理を行なっているのはわかるんだが、
callbackとqueue, typingの使い方がイマイチよくわからん…

【Rust】rustでunix domain socket【並列処理】

use std::io::prelude::*;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::Path;

fn handle_client(mut stream: UnixStream) -> std::io::Result<()> {
    let mut buf = [0; 1024];

    let n = stream.read(&mut buf)?;
    let s = String::from_utf8_lossy(&buf[..n]);
    println!("{}", s);

    Ok(())
}

fn main()-> Result<(), Box<dyn std::error::Error>> {
    let sockfile = Path::new("/tmp/uds.sock");
    if sockfile.exists() {
        fs::remove_file(&sockfile)?;
    }

    let listner = UnixListener::bind(sockfile)?;
    for stream in listner.incoming() {
        let stream = stream?;
        thread::spawn(move || handle_client(stream).unwrap());
    }

    Ok(())
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.28s
Running `target/debug/parallel`

これは基本的に、socketのincomingの待ち受け

use nix::unistd::{fork, getpid, getppid, ForkResult};
use std::thread;
use std::fs;
use std::io::prelude::*;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::Path;

use futures::StreamExt;
use tokio;
use tokio::io::AsyncReadExt;
use tokio::net::unix::{UnixListener, UnixStream};

async fn handle_client(mut stream: UnixStream) -> Result<(), Box<dyn std::error::Error>> {
    let mut buf = [0; 1024];

    let n = stream.read(&mut buf).await?;
    let s = String::from_utf8_lossy(&buf[..n]);
    println!("{}", s);

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // let mut stm = UnixStream::connect("/tmp/uds.sock");
    // stm?.write_all(b"hello world")?;

    let sockfile = Path::new("/tmp/uds.sock");
    if sockfile.exists() {
        fs::remove_file(&sockfile)?;
    }

    let listner = UnixListener::bind(sockfile)?;
    let mut incoming = listner.incoming();
    while let Some(stream) = incoming.next().await {
        let stream = stream?;
        tokio::spawn(async move {
            handle_client(stream).await.unwrap();
        });
    }

    Ok(())
}

動かない….

やりたいことはこれなんだが、これも上手くいかん…

use std::io::prelude::*;
use std::os::unix::net::{UnixListener, UnixStream};
use std::thread;

pub static SOCKET_PATH: &'static str = "rst.sock";

fn sender() {
    let name = "Sender".to_string();
    let mut stream = UnixStream::connect(SOCKET_PATH).unwrap();
    let messages = vec!["Hello", " ", "world!"];
    for message in messages {
        stream.write_all(b"hello world").unwrap();
        let mut response = String::new();
        stream.read_to_string(&mut response).unwrap();
        println!("{response}");
    }
}

fn handle_client(mut stream: UnixStream) -> std::io::Result<()> {
    let mut buf = [0; 1024];

    let n = stream.read(&mut buf)?;
    let s = String::from_utf8_lossy(&buf[..n]);
    println!("{}", s);

    Ok(())
}

fn receiver() {
    let handle = thread::spawn(move || {
        let listner = UnixListener::bind(SOCKET_PATH).unwrap();
        for stream in listner.incoming() {
            let stream = stream.unwrap();
            thread::spawn(move || handle_client(stream).unwrap());
        }
    });
    handle.join().unwrap();
}

fn main() {
    receiver();
    sender();
}

【python】UNIXドメインソケットによるデータの受け渡し【並列処理】

ソケット通信は異なるサーバ間同士でのやりとりかと思っていたが、別々のスレッド同士でもできるのね。

import socket
import os.path
import time
from threading import Thread, current_thread

SOCK_FILE = "./mailbox"
BUFFER_SIZE = 1024

class Sender(Thread):
    def run(self) -> None:
        self.name = "Sender"
        client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        client.connect(SOCK_FILE)

        messages = ["Hello", " ", "world!"]
        for msg in messages:
            print(f"{current_thread().name}: Send: '{msg}'")
            client.sendall(str.encode(msg))
        
        client.close()

class Receiver(Thread):
    def run(self) -> None:
        self.name = "Receiver"
        server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        server.bind(SOCK_FILE)
        server.listen()
        print(
            f"{current_thread().name}: Listening for incoming messages...")
        conn, addr = server.accept()

        while True:
            data = conn.recv(BUFFER_SIZE)
            if not data:
                break
            message = data.decode()
            print(f"{current_thread().name}: Received: '{message}'")
        server.close()

def main() -> None:
    if os.path.exists(SOCK_FILE):
        os.remove(SOCK_FILE)

    receiver = Receiver()
    receiver.start()
    time.sleep(1)
    sender = Sender()
    sender.start()

    for thread in [receiver, sender]:
        thread.join()

    os.remove(SOCK_FILE)

if __name__ == "__main__":
    main()

$ python3 sockets.py
Receiver: Listening for incoming messages…
Sender: Send: ‘Hello’
Receiver: Received: ‘Hello’
Sender: Send: ‘ ‘
Receiver: Received: ‘ ‘
Sender: Send: ‘world!’
Receiver: Received: ‘world!’