【並列処理】パスワードクラッキング

これをRustのthread poolで実装したい。。

import time
import math
import hashlib
import typing as T
import os
from multiprocessing import Pool

ChunkRange = T.Tuple[int, int]

def crack_chunk(crypto_hash: str, length: int, chunk_start: int, chunk_end: int)-> T.Union[str, None]:
    print(f"Processing {chunk_start} to {chunk_end}")
    combinations = get_combinations(length=length, min_number=chunk_start, max_number=chunk_end)
    for combination in combinations:
        if check_password(crypto_hash, combination):
            return combination
    return


def get_chunks(num_ranges: int, length: int) -> T.Iterator[ChunkRange]:
    max_number = int(math.pow(10, length) - 1)
    chunk_starts = [int(max_number / num_ranges * i) for i in range(num_ranges)]
    chunk_ends = [start_point - 1 for start_point in chunk_starts[1:]] + [max_number]
    return zip(chunk_starts, chunk_ends)

def crack_password_parallel(crypto_hash: str, legnth: int) -> None:
    num_cores = os.cpu_count()
    print("Processing number combinations correctly")
    start_time = time.perf_counter()

    with Pool() as pool:
        arguments = ((crypto_hash, length, chunk_start, chunk_end)
            for chunk_start, chunk_end in get_chunks(num_cores, length))
        results = pool.starmap(crack_chunk, arguments)
        print("waiting for chunks to finish")
        pool.close()
        pool.join()
    
    result = [res for res in results if res]
    print(f"PASSWORD CRACKED: {result[0]}")
    process_time = time.perf_counter() - start_time
    print(f"PROCESS TIME: {process_time}") 

def get_combinations(*, length: int,
    min_number: int = 0,
    max_number: T.Optional[int] = None) -> T.List[str]:

    combinations = []
    if not max_number:
        max_number = int(math.pow(10, length) - 1)
    for i in range(min_number, max_number + 1):
        str_num = str(i)
        zeros = "0" * (length - len(str_num))
        combinations.append("".join((zeros, str_num)))
    return combinations

def get_crypto_hash(password: str) -> str:
    return hashlib.sha256(password.encode()).hexdigest()

def check_password(expected_crypto_hash: str,
    possible_password: str) -> bool:
    actual_crypto_hash = get_crypto_hash(possible_password)
    return expected_crypto_hash == actual_crypto_hash

def crack_password(crypto_hash: str, length: int) -> None:
    print("Processing number combinations sequentially")
    start_time = time.perf_counter()
    combinations = get_combinations(length=length)
    for combination in combinations:
        if check_password(crypto_hash, combination):
            print(f"PASSWORD CRACKED: {combination}")
            break

    process_time = time.perf_counter() - start_time
    print(f"PROCESS TIME: {process_time}" )

if __name__ == "__main__":
    crypto_hash = "8bb0cf6eb9b17d0f7d22b456f121257dc1254e1f01665370476383ea776df414"
    length = 7
    crack_password_parallel(crypto_hash, length)

$ python3 password_cracking_parallel.py
Processing number combinations correctly
Processing 4999999 to 9999999
Processing 0 to 4999998
waiting for chunks to finish
PASSWORD CRACKED: 1234567
PROCESS TIME: 3.9475781959481537

【Python】thread Pool【並列処理】

import time
import queue
import typing as T
from threading import Thread, current_thread

Callback = T.Callable[..., None]
Task = T.Tuple[Callback, T.Any, T.Any]
TaskQueue = queue.Queue

class Worker(Thread):
    def __init__(self, tasks: queue.Queue[Task]):
        super().__init__()
        self.tasks = tasks

    def run(self) -> None:
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception as e:
                print(e)
            self.tasks.task_done()

class ThreadPool:
    def __init__(self, num_threads: int):
        self.tasks: TaskQueue = queue.Queue(num_threads)
        self.num_threads = num_threads

        for _ in range(self.num_threads):
            worker = Worker(self.tasks)
            worker.setDaemon(True)
            worker.start()

    def submit(self, func: Callback, *args, **kargs) -> None:
        self.tasks.put((func, args, kargs))

    def wait_completion(self) -> None:
        self.tasks.join()

def cpu_waster(i: int) -> None:
    name = current_thread().getName()
    print(f"{name} doing {i} work")
    time.sleep(3)

def main() -> None:
    pool = ThreadPool(num_threads=5)
    for i in range(20):
        pool.submit(cpu_waster, i)

    print("All work requests sent")
    pool.wait_completion()
    print("All work completed")

if __name__ == "__main__":
    main()

name = current_thread().getName()
Thread-2 doing 0 work
Thread-5 doing 1 work
Thread-4 doing 2 work
Thread-1 doing 3 work
Thread-3 doing 4 work
Thread-2 doing 5 work
Thread-5 doing 6 work
Thread-1 doing 8 work
Thread-4 doing 7 work
Thread-3 doing 9 work
Thread-5 doing 10 work
Thread-1 doing 11 work
Thread-4 doing 12 work
Thread-2 doing 14 work
Thread-3 doing 13 work
All work requests sent
Thread-1 doing 15 work
Thread-4 doing 16 work
Thread-2 doing 17 work
Thread-3 doing 18 work
Thread-5 doing 19 work
All work completed

スレッドプールを実装してそれぞれのスレッドで並列処理を行なっているのはわかるんだが、
callbackとqueue, typingの使い方がイマイチよくわからん…

【Rust】rustでunix domain socket【並列処理】

use std::io::prelude::*;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::Path;

fn handle_client(mut stream: UnixStream) -> std::io::Result<()> {
    let mut buf = [0; 1024];

    let n = stream.read(&mut buf)?;
    let s = String::from_utf8_lossy(&buf[..n]);
    println!("{}", s);

    Ok(())
}

fn main()-> Result<(), Box<dyn std::error::Error>> {
    let sockfile = Path::new("/tmp/uds.sock");
    if sockfile.exists() {
        fs::remove_file(&sockfile)?;
    }

    let listner = UnixListener::bind(sockfile)?;
    for stream in listner.incoming() {
        let stream = stream?;
        thread::spawn(move || handle_client(stream).unwrap());
    }

    Ok(())
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.28s
Running `target/debug/parallel`

これは基本的に、socketのincomingの待ち受け

use nix::unistd::{fork, getpid, getppid, ForkResult};
use std::thread;
use std::fs;
use std::io::prelude::*;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::Path;

use futures::StreamExt;
use tokio;
use tokio::io::AsyncReadExt;
use tokio::net::unix::{UnixListener, UnixStream};

async fn handle_client(mut stream: UnixStream) -> Result<(), Box<dyn std::error::Error>> {
    let mut buf = [0; 1024];

    let n = stream.read(&mut buf).await?;
    let s = String::from_utf8_lossy(&buf[..n]);
    println!("{}", s);

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // let mut stm = UnixStream::connect("/tmp/uds.sock");
    // stm?.write_all(b"hello world")?;

    let sockfile = Path::new("/tmp/uds.sock");
    if sockfile.exists() {
        fs::remove_file(&sockfile)?;
    }

    let listner = UnixListener::bind(sockfile)?;
    let mut incoming = listner.incoming();
    while let Some(stream) = incoming.next().await {
        let stream = stream?;
        tokio::spawn(async move {
            handle_client(stream).await.unwrap();
        });
    }

    Ok(())
}

動かない….

やりたいことはこれなんだが、これも上手くいかん…

use std::io::prelude::*;
use std::os::unix::net::{UnixListener, UnixStream};
use std::thread;

pub static SOCKET_PATH: &'static str = "rst.sock";

fn sender() {
    let name = "Sender".to_string();
    let mut stream = UnixStream::connect(SOCKET_PATH).unwrap();
    let messages = vec!["Hello", " ", "world!"];
    for message in messages {
        stream.write_all(b"hello world").unwrap();
        let mut response = String::new();
        stream.read_to_string(&mut response).unwrap();
        println!("{response}");
    }
}

fn handle_client(mut stream: UnixStream) -> std::io::Result<()> {
    let mut buf = [0; 1024];

    let n = stream.read(&mut buf)?;
    let s = String::from_utf8_lossy(&buf[..n]);
    println!("{}", s);

    Ok(())
}

fn receiver() {
    let handle = thread::spawn(move || {
        let listner = UnixListener::bind(SOCKET_PATH).unwrap();
        for stream in listner.incoming() {
            let stream = stream.unwrap();
            thread::spawn(move || handle_client(stream).unwrap());
        }
    });
    handle.join().unwrap();
}

fn main() {
    receiver();
    sender();
}

【python】UNIXドメインソケットによるデータの受け渡し【並列処理】

ソケット通信は異なるサーバ間同士でのやりとりかと思っていたが、別々のスレッド同士でもできるのね。

import socket
import os.path
import time
from threading import Thread, current_thread

SOCK_FILE = "./mailbox"
BUFFER_SIZE = 1024

class Sender(Thread):
    def run(self) -> None:
        self.name = "Sender"
        client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        client.connect(SOCK_FILE)

        messages = ["Hello", " ", "world!"]
        for msg in messages:
            print(f"{current_thread().name}: Send: '{msg}'")
            client.sendall(str.encode(msg))
        
        client.close()

class Receiver(Thread):
    def run(self) -> None:
        self.name = "Receiver"
        server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        server.bind(SOCK_FILE)
        server.listen()
        print(
            f"{current_thread().name}: Listening for incoming messages...")
        conn, addr = server.accept()

        while True:
            data = conn.recv(BUFFER_SIZE)
            if not data:
                break
            message = data.decode()
            print(f"{current_thread().name}: Received: '{message}'")
        server.close()

def main() -> None:
    if os.path.exists(SOCK_FILE):
        os.remove(SOCK_FILE)

    receiver = Receiver()
    receiver.start()
    time.sleep(1)
    sender = Sender()
    sender.start()

    for thread in [receiver, sender]:
        thread.join()

    os.remove(SOCK_FILE)

if __name__ == "__main__":
    main()

$ python3 sockets.py
Receiver: Listening for incoming messages…
Sender: Send: ‘Hello’
Receiver: Received: ‘Hello’
Sender: Send: ‘ ‘
Receiver: Received: ‘ ‘
Sender: Send: ‘world!’
Receiver: Received: ‘world!’

【python】メッセージキューによるデータの受け渡し【並列処理】

q = Queue() で初期化した後は、qでやり取りしていますね。

import time
from queue import Queue
from threading import Thread, current_thread

class Worker(Thread):
    def __init__(self, queue: Queue, id: int):
        super().__init__(name=str(id))
        self.queue = queue

    def run(self) -> None:
        while not self.queue.empty():
            item = self.queue.get()
            print(f"Thread {current_thread().name}: "
                f"processing item {item} from the queue")
            time.sleep(2)

def main(thread_num: int) -> None:
    q = Queue()
    for i in range(10):
        q.put(i)

    threads = []
    for i in range(thread_num):
        thread = Worker(q, i + 1)
        thread.start()
        threads.append(thread)
    
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    thread_num = 4
    main(thread_num)

$ python3 message_queue.py
Thread 1: processing item 0 from the queue
Thread 2: processing item 1 from the queue
Thread 3: processing item 2 from the queue
Thread 4: processing item 3 from the queue
Thread 2: processing item 4 from the queue
Thread 3: processing item 5 from the queue
Thread 1: processing item 6 from the queue
Thread 4: processing item 7 from the queue
Thread 3: processing item 8 from the queue
Thread 2: processing item 9 from the queue

【Python】パイプによるIPC【並列処理】

reader_conn, writer_conn = Pipe() でpipeを接続してるのはわかります。
共有メモリの場合は、グローバルな変数で値をやり取りしたが、Pipeの場合はその名の通り、一方のスレッドからもう一方のスレッドへ値を渡すときに使われるってことかな。。

from threading import Thread, current_thread
from multiprocessing import Pipe
from multiprocessing.connection import Connection

class Writer(Thread):
    def __init__(self, conn: Connection):
        super().__init__()
        self.conn = conn
        self.name = "Writer"

    def run(self) -> None:
        print(f"{current_thread().name}: Sending rubber duck...")
        self.conn.send("Rubber duck")

class Reader(Thread):
    def __init__(self, conn: Connection):
        super().__init__()
        self.conn = conn
        self.name = "Reader"

    def run(self) -> None:
        print(f"{current_thread().name}: Reading...")
        msg = self.conn.recv()
        print(f"{current_thread().name}: Received: {msg}")

def main() -> None:
    reader_conn, writer_conn = Pipe()
    reader = Reader(reader_conn)
    writer = Writer(writer_conn)
    threads = [
        writer,
        reader
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()

$ python3 pipe.py
Writer: Sending rubber duck…
Reader: Reading…
Reader: Received: Rubber duck

【Python】プロセスの中で子スレッドを作成【並列処理】

import os
import time
import threading
from threading import Thread

def cpu_waster(i: int) -> None:
    name = threading.current_thread().getName()
    print(f"{name} doing {i} work")
    time.sleep(3)

def display_threads() -> None:
    print("-" * 10)
    print(f"Current process PID: {os.getpid()}")
    print(f"Thread Count: {threading.active_count()}")
    print("Active threads:")
    for thread in threading.enumerate():
        print(thread)

def main(num_threads: int) -> None:
    display_threads()

    print(f"Starting {num_threads} CPU wasters...")
    for i in range(num_threads):
        thread = Thread(target=cpu_waster, args=(i,))
        thread.start()
    
    display_threads()

if __name__ == "__main__":
    num_threads = 5
    main(num_threads)

$ python3 multithreading.py
———-
Current process PID: 580482
Thread Count: 1
Active threads:
<_MainThread(MainThread, started 281472949920800)>
Starting 5 CPU wasters…
Thread-2 (cpu_waster) doing 1 work
Thread-3 (cpu_waster) doing 2 work
Thread-4 (cpu_waster) doing 3 work
/home/vagrant/dev/rust/parallel/python/multithreading.py:7: DeprecationWarning: getName() is deprecated, get the name attribute instead
name = threading.current_thread().getName()
Thread-1 (cpu_waster) doing 0 work
Thread-5 (cpu_waster) doing 4 work
———-
Current process PID: 580482
Thread Count: 6
Active threads:
<_MainThread(MainThread, started 281472949920800)>




うーむ、中々難儀やのう

【Python】親プロセスと子プロセスを生成する【並列処理】

import os
from multiprocessing import Process

def run_child() -> None:
    print("Child: I am the child process")
    print(f"Child: Child's PID: {os.getpid()}")
    print(f"Child: Parent's PID: {os.getppid()}")

def start_parent(num_children: int) -> None:
    print("Parent: I am the parent process")
    print(f"Parent : Parent's PID: {os.getpid()}")
    for i in range(num_children):
        print(f"Starting Process {i}")
        Process(target=run_child).start()

if __name__ == "__main__":
    num_children = 3
    start_parent(num_children)

$ python3 child_processes.py
Parent: I am the parent process
Parent : Parent’s PID: 576837
Starting Process 0
Starting Process 1
Starting Process 2
Child: I am the child process
Child: Child’s PID: 576838
Child: Parent’s PID: 576837
Child: I am the child process
Child: Child’s PID: 576840
Child: Parent’s PID: 576837
Child: I am the child process
Child: Child’s PID: 576839
Child: Parent’s PID: 576837

### Rustで書きたい

use std::process;
use std::process::Command;

fn main() {
    println!("My id is {}", process::id());

    let mut command = Command::new("ls");
    if let Ok(child) = command.spawn() {
        println!("My id is {}", child.id());
    }
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.15s
Running `target/debug/parallel`
My id is 577522
My id is 577561
Cargo.lock Cargo.toml python src target

Command::newはprocessってよりコマンドのprocessなのでちょっと意図しているものと違うな

【Blockchain】ウォレット機能の考察

### ウォレットとして必要な機能
– 秘密鍵、公開鍵の作成、保存
– ブロックチェーン残高の表示
– トランザクションの送信、受信
– ブロックチェーン価格のマーケットデータ表示
※ステーキング、スワッピング、dAppsなど応用機能もある

### 前準備
$ pip install flask
$ pip install web3
$ pip install requests

from aiohttp import request
from flask import Flask, jsonify, render_template, session
from web3 import Web3
import requests
import jsonify

app = Flask(__name__)

infura_url = 'https://mainnet.infura.io/v3/fuga'
app.config['SECRET_KEY'] = 'hoge'

@app.route('/new_account', methods=['GET'])
def new_account():
    account = web3.eth.account.create('hogehoge')
    session['account'] = {
        'privateKey': account.key.hex(),
        'address' : account.address
    }
    return jsonify(session['account'])
with open('erc20_abi.json') as f:
    erc20_abi = json.load(f)

@app.route('/balance/<contract_address>', methods=['GET'])
def get_balance(contract_address):
    address = session.get('account').get('address')
    checksum_address = Web3.to_checksum_address(address)
    print(checksum_address)
    contract = web3.eth.contract(address=contract_address, abi=erc20_abi)
    balance = contract.functions.balanceOf(checksum_address).call()
    return jsonify({'balance': balance})

@app.route('/send_transaction', methods=['POST'])
def send_transaction():
    data = request.get_json()
    nonce = web3.eth.getTransactionCount(session['account']['address'])
    txn_dict = {
        'to': data['to'],
        'value': web3.toWei(data['amount'], 'either'),
        'gas': 2000000,
        'gasPrice': web3.toWei('40', 'gwei'),
        'nonce': nonce,
        'chainId': 3
    }
    signed_txn = web3.eth.account.signTransaction(txn_dict, session['account']['privateKey'])
    txn_hash = web3.eth.sendRawTransaction(signed_txn.rawTransaction)
    return jsonify({'transaction_hash': txn_hash.hex()})

@app.route('/market_chart/<contract_address>/<days>', methods=['GET'])
def get_market_chart(contract_address, days):
    api_key = 'coingecho_api_key'
    response = requests.get(f'https://api.coingecko.com/api/v3/coins/ethereum/contract/{contract_address}/market_chart?vs_currency=usd&days={days}&api_key={api_key}')
    market_chart = response.json(f'https://api.coingecko.com/api/v3/coins/ethereum/contract/{contract_address}/market_chart?vs_currency=usd&days={days}&api_key={api_key}')
    market_chart = response.json()
    return jsonify(market_chart)

@app.route('/')
def home():
    return render_template('index.html')

if __name__ == '__main__':
    app.run(debug=True)

なるほど、walletのエッセンスは理解できました。Pythonでは”web3″のライブラリでかなり抽象化されているのがわかります。
balanceのところはUTXOにするか、Account型にするかで変わってきますね。
マーケットデータは一旦スタック。
テストネット、メインネットの概念はもう少し深掘りする

【Python】非同期処理を理解する

### 非同期処理とは?
あるタスクが終了するのを待っている間、別のタスクを実行すること

$ pip3 install aiohttp

import datetime
import aiohttp
import asyncio

start = datetime.datetime.now()

def log(message):
    print(f'{(datetime.datetime.now() - start).seconds}秒経過', message)

async def fetch(session, url):
    """ 非同期にURLからデータを取得 """
    print(f"Fetching {url}")
    async with session.get(url) as response:
        return await response.text()

async def main():
    log("タスク開始")
    urls = [
        "http://google.com",
        "http://qiita.com",
        "https://www.python.org/",
        "https://www.mozilla.org/en-US/",
        "https://html.spec.whatwg.org/multipage/",
        "https://www.w3.org/TR/css/",
        "https://ecma-international.org/",
        "https://www.typescriptlang.org/",
        "https://www.oracle.com/jp/java/technologies/",
        "https://www.ruby-lang.org/ja/",
        "https://www.postgresql.org/",
        "https://www.mysql.com/jp/",
        "https://docs.djangoproject.com/ja/5.0/",
        "https://spring.pleiades.io/projects/spring-boot",
        "https://rubyonrails.org/"
        "https://firebase.google.com/?hl=ja",
        "https://go.dev/",
        "https://nodejs.org/en"
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]

        print("Starting tasks...")

        print("Tasks are running in the background...")

        results = await asyncio.gather(*tasks)

        for result in results:
            print(result[:100])

    log("task finished")

if __name__ == "__main__":
    asyncio.run(main())