【python】メッセージキューによるデータの受け渡し【並列処理】

q = Queue() で初期化した後は、qでやり取りしていますね。

import time
from queue import Queue
from threading import Thread, current_thread

class Worker(Thread):
    def __init__(self, queue: Queue, id: int):
        super().__init__(name=str(id))
        self.queue = queue

    def run(self) -> None:
        while not self.queue.empty():
            item = self.queue.get()
            print(f"Thread {current_thread().name}: "
                f"processing item {item} from the queue")
            time.sleep(2)

def main(thread_num: int) -> None:
    q = Queue()
    for i in range(10):
        q.put(i)

    threads = []
    for i in range(thread_num):
        thread = Worker(q, i + 1)
        thread.start()
        threads.append(thread)
    
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    thread_num = 4
    main(thread_num)

$ python3 message_queue.py
Thread 1: processing item 0 from the queue
Thread 2: processing item 1 from the queue
Thread 3: processing item 2 from the queue
Thread 4: processing item 3 from the queue
Thread 2: processing item 4 from the queue
Thread 3: processing item 5 from the queue
Thread 1: processing item 6 from the queue
Thread 4: processing item 7 from the queue
Thread 3: processing item 8 from the queue
Thread 2: processing item 9 from the queue

【Python】パイプによるIPC【並列処理】

reader_conn, writer_conn = Pipe() でpipeを接続してるのはわかります。
共有メモリの場合は、グローバルな変数で値をやり取りしたが、Pipeの場合はその名の通り、一方のスレッドからもう一方のスレッドへ値を渡すときに使われるってことかな。。

from threading import Thread, current_thread
from multiprocessing import Pipe
from multiprocessing.connection import Connection

class Writer(Thread):
    def __init__(self, conn: Connection):
        super().__init__()
        self.conn = conn
        self.name = "Writer"

    def run(self) -> None:
        print(f"{current_thread().name}: Sending rubber duck...")
        self.conn.send("Rubber duck")

class Reader(Thread):
    def __init__(self, conn: Connection):
        super().__init__()
        self.conn = conn
        self.name = "Reader"

    def run(self) -> None:
        print(f"{current_thread().name}: Reading...")
        msg = self.conn.recv()
        print(f"{current_thread().name}: Received: {msg}")

def main() -> None:
    reader_conn, writer_conn = Pipe()
    reader = Reader(reader_conn)
    writer = Writer(writer_conn)
    threads = [
        writer,
        reader
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()

$ python3 pipe.py
Writer: Sending rubber duck…
Reader: Reading…
Reader: Received: Rubber duck

【Python】プロセスの中で子スレッドを作成【並列処理】

import os
import time
import threading
from threading import Thread

def cpu_waster(i: int) -> None:
    name = threading.current_thread().getName()
    print(f"{name} doing {i} work")
    time.sleep(3)

def display_threads() -> None:
    print("-" * 10)
    print(f"Current process PID: {os.getpid()}")
    print(f"Thread Count: {threading.active_count()}")
    print("Active threads:")
    for thread in threading.enumerate():
        print(thread)

def main(num_threads: int) -> None:
    display_threads()

    print(f"Starting {num_threads} CPU wasters...")
    for i in range(num_threads):
        thread = Thread(target=cpu_waster, args=(i,))
        thread.start()
    
    display_threads()

if __name__ == "__main__":
    num_threads = 5
    main(num_threads)

$ python3 multithreading.py
———-
Current process PID: 580482
Thread Count: 1
Active threads:
<_MainThread(MainThread, started 281472949920800)>
Starting 5 CPU wasters…
Thread-2 (cpu_waster) doing 1 work
Thread-3 (cpu_waster) doing 2 work
Thread-4 (cpu_waster) doing 3 work
/home/vagrant/dev/rust/parallel/python/multithreading.py:7: DeprecationWarning: getName() is deprecated, get the name attribute instead
name = threading.current_thread().getName()
Thread-1 (cpu_waster) doing 0 work
Thread-5 (cpu_waster) doing 4 work
———-
Current process PID: 580482
Thread Count: 6
Active threads:
<_MainThread(MainThread, started 281472949920800)>




うーむ、中々難儀やのう

【Python】親プロセスと子プロセスを生成する【並列処理】

import os
from multiprocessing import Process

def run_child() -> None:
    print("Child: I am the child process")
    print(f"Child: Child's PID: {os.getpid()}")
    print(f"Child: Parent's PID: {os.getppid()}")

def start_parent(num_children: int) -> None:
    print("Parent: I am the parent process")
    print(f"Parent : Parent's PID: {os.getpid()}")
    for i in range(num_children):
        print(f"Starting Process {i}")
        Process(target=run_child).start()

if __name__ == "__main__":
    num_children = 3
    start_parent(num_children)

$ python3 child_processes.py
Parent: I am the parent process
Parent : Parent’s PID: 576837
Starting Process 0
Starting Process 1
Starting Process 2
Child: I am the child process
Child: Child’s PID: 576838
Child: Parent’s PID: 576837
Child: I am the child process
Child: Child’s PID: 576840
Child: Parent’s PID: 576837
Child: I am the child process
Child: Child’s PID: 576839
Child: Parent’s PID: 576837

### Rustで書きたい

use std::process;
use std::process::Command;

fn main() {
    println!("My id is {}", process::id());

    let mut command = Command::new("ls");
    if let Ok(child) = command.spawn() {
        println!("My id is {}", child.id());
    }
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.15s
Running `target/debug/parallel`
My id is 577522
My id is 577561
Cargo.lock Cargo.toml python src target

Command::newはprocessってよりコマンドのprocessなのでちょっと意図しているものと違うな

【Blockchain】ウォレット機能の考察

### ウォレットとして必要な機能
– 秘密鍵、公開鍵の作成、保存
– ブロックチェーン残高の表示
– トランザクションの送信、受信
– ブロックチェーン価格のマーケットデータ表示
※ステーキング、スワッピング、dAppsなど応用機能もある

### 前準備
$ pip install flask
$ pip install web3
$ pip install requests

from aiohttp import request
from flask import Flask, jsonify, render_template, session
from web3 import Web3
import requests
import jsonify

app = Flask(__name__)

infura_url = 'https://mainnet.infura.io/v3/fuga'
app.config['SECRET_KEY'] = 'hoge'

@app.route('/new_account', methods=['GET'])
def new_account():
    account = web3.eth.account.create('hogehoge')
    session['account'] = {
        'privateKey': account.key.hex(),
        'address' : account.address
    }
    return jsonify(session['account'])
with open('erc20_abi.json') as f:
    erc20_abi = json.load(f)

@app.route('/balance/<contract_address>', methods=['GET'])
def get_balance(contract_address):
    address = session.get('account').get('address')
    checksum_address = Web3.to_checksum_address(address)
    print(checksum_address)
    contract = web3.eth.contract(address=contract_address, abi=erc20_abi)
    balance = contract.functions.balanceOf(checksum_address).call()
    return jsonify({'balance': balance})

@app.route('/send_transaction', methods=['POST'])
def send_transaction():
    data = request.get_json()
    nonce = web3.eth.getTransactionCount(session['account']['address'])
    txn_dict = {
        'to': data['to'],
        'value': web3.toWei(data['amount'], 'either'),
        'gas': 2000000,
        'gasPrice': web3.toWei('40', 'gwei'),
        'nonce': nonce,
        'chainId': 3
    }
    signed_txn = web3.eth.account.signTransaction(txn_dict, session['account']['privateKey'])
    txn_hash = web3.eth.sendRawTransaction(signed_txn.rawTransaction)
    return jsonify({'transaction_hash': txn_hash.hex()})

@app.route('/market_chart/<contract_address>/<days>', methods=['GET'])
def get_market_chart(contract_address, days):
    api_key = 'coingecho_api_key'
    response = requests.get(f'https://api.coingecko.com/api/v3/coins/ethereum/contract/{contract_address}/market_chart?vs_currency=usd&days={days}&api_key={api_key}')
    market_chart = response.json(f'https://api.coingecko.com/api/v3/coins/ethereum/contract/{contract_address}/market_chart?vs_currency=usd&days={days}&api_key={api_key}')
    market_chart = response.json()
    return jsonify(market_chart)

@app.route('/')
def home():
    return render_template('index.html')

if __name__ == '__main__':
    app.run(debug=True)

なるほど、walletのエッセンスは理解できました。Pythonでは”web3″のライブラリでかなり抽象化されているのがわかります。
balanceのところはUTXOにするか、Account型にするかで変わってきますね。
マーケットデータは一旦スタック。
テストネット、メインネットの概念はもう少し深掘りする

【Python】非同期処理を理解する

### 非同期処理とは?
あるタスクが終了するのを待っている間、別のタスクを実行すること

$ pip3 install aiohttp

import datetime
import aiohttp
import asyncio

start = datetime.datetime.now()

def log(message):
    print(f'{(datetime.datetime.now() - start).seconds}秒経過', message)

async def fetch(session, url):
    """ 非同期にURLからデータを取得 """
    print(f"Fetching {url}")
    async with session.get(url) as response:
        return await response.text()

async def main():
    log("タスク開始")
    urls = [
        "http://google.com",
        "http://qiita.com",
        "https://www.python.org/",
        "https://www.mozilla.org/en-US/",
        "https://html.spec.whatwg.org/multipage/",
        "https://www.w3.org/TR/css/",
        "https://ecma-international.org/",
        "https://www.typescriptlang.org/",
        "https://www.oracle.com/jp/java/technologies/",
        "https://www.ruby-lang.org/ja/",
        "https://www.postgresql.org/",
        "https://www.mysql.com/jp/",
        "https://docs.djangoproject.com/ja/5.0/",
        "https://spring.pleiades.io/projects/spring-boot",
        "https://rubyonrails.org/"
        "https://firebase.google.com/?hl=ja",
        "https://go.dev/",
        "https://nodejs.org/en"
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]

        print("Starting tasks...")

        print("Tasks are running in the background...")

        results = await asyncio.gather(*tasks)

        for result in results:
            print(result[:100])

    log("task finished")

if __name__ == "__main__":
    asyncio.run(main())

【Python】threadingによる並列処理(マルチスレッド)

threadingというライブラリを使用する
今いるスレッドを確認

import threading
import warnings
warnings.simplefilter('ignore')

print(threading.currentThread().getName())

$ python3 main.py
MainThread

### threading.Threadでmainとは別にスレッドを作成する

import threading
import time
import warnings
warnings.simplefilter('ignore')

def boil_udon():
    print(" ■ thread :", threading.currentThread().getName())

    print(' うどんを茹でます。')
    time.sleep(3)
    print(' うどんが茹で上がりました。')

if __name__ == "__main__":
    print(" ■ thread :", threading.currentThread().getName())

    print('うどんを作ります。')

    # スレッドを作成
    thread1 = threading.Thread(target=boil_udon)
    thread1.start()
    thread1.join()

    print('うどんの盛り付けをします。')
    print('うどんができました。')

$ python3 main.py
■ thread : MainThread
うどんを作ります。
■ thread : Thread-1 (boil_udon)
うどんを茹でます。
うどんが茹で上がりました。
うどんの盛り付けをします。
うどんができました。

### スレッドを更に追加する

import threading
import time
import warnings
warnings.simplefilter('ignore')

def boil_udon():
    print(" ■ thread :", threading.currentThread().getName())

    print(' うどんを茹でます。')
    time.sleep(3)
    print(' うどんが茹で上がりました。')

def make_tuyu():
    print(" ■ thread :", threading.currentThread().getName())

    print(' うどんの汁を作ります。')
    time.sleep(2)
    print(' うどんの汁ができました。')


if __name__ == "__main__":
    print(" ■ thread :", threading.currentThread().getName())

    print('うどんを作ります。')

    # スレッドを作成
    thread1 = threading.Thread(target=boil_udon)
    thread2 = threading.Thread(target=make_tuyu)

    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()

    print('うどんの盛り付けをします。')
    print('うどんができました。')

$ python3 main.py
■ thread : MainThread
うどんを作ります。
■ thread : Thread-1 (boil_udon)
うどんを茹でます。
■ thread : Thread-2 (make_tuyu)
うどんの汁を作ります。
うどんの汁ができました。
うどんが茹で上がりました。
うどんの盛り付けをします。
うどんができました。

threadが作られると、同時に処理されていることがわかる
ThreadPoolExecutorの場合は、単純に1つの処理を複数のスレッドで実行するが、threadingの場合は、プログラム内容を指定してスレッドを作成することができる

Pythonでマルチスレッドとマルチプロセス

### シングルスレッド

from concurrent.futures import ThreadPoolExecutor
import time
def func():
    time.sleep(1)
start = time.time()
for i in range(8):
    func()
print(time.time() - start)

$ python3 single.py
8.064586639404297

### マルチスレッド(同時に実行)

from concurrent.futures import ThreadPoolExecutor
import time
def func():
    time.sleep(1)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as e:
    for i in range(8):
        e.submit(func)
print(time.time() - start)

$ python3 multithread.py
2.0498673915863037

### マルチプロセス(複数のプロセスで同時に処理)

from concurrent.futures import ProcessPoolExecutor
import time
def func():
    time.sleep(1)
start = time.time()
with ProcessPoolExecutor(max_workers=4) as e:
    for i in range(8):
        e.submit(func)
print(time.time() - start)

$ python3 multiprocess.py
2.1424620151519775

import os
import time
import datetime
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

x1, x2, y1, y2 = -1.8, 1.8, -1.8, 1.8
c_real, c_imag = -0.62772, -0.42193

def calculate_z_serial_purepython(maxiter, zs, cs):
    output = [0] * len(zs)
    for i in range(len(zs)):
        n = 0
        z = zs[i]
        c = cs[i]
        if (i % 100) == 0:
            time.sleep(0.0001)
        while abs(z) < 2 and n < maxiter:
            z = z * z + c
            n += 1
        output[i] = n
    return output

def calc_pure_python(desired_width, max_iterations):
    x_step = (float(x2 - x1) / float(desired_width))
    y_step = (float(y1 - y2) / float(desired_width))
    x = []
    y = []
    ycoord = y2
    while ycoord > y1:
        y.append(ycoord)
        ycoord += y_step
    xcoord = x1
    while xcoord < x2:
        x.append(xcoord)
        xcoord += x_step

    zs = []
    cs = []
    for ycoord in y:
        for xcoord in x:
            zs.append(complex(xcoord, ycoord))
            cs.append(complex(c_real, c_imag))
    
    output = calculate_z_serial_purepython(max_iterations, zs, cs)

if __name__ == "__main__":

    max_workers = os.cpu_count()
    start = datetime.datetime.now()
    for i in range(16):
        calc_pure_python(desired_width=500, max_iterations=100)
    elapsed = datetime.datetime.now() - start
    print("SingleThread: {}ms".format(elapsed.seconds*1000 + elapsed.microseconds/1000))

    start = datetime.datetime.now()
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        for i in range(16):
            executor.submit(calc_pure_python, 500, 100)
    elapsed = datetime.datetime.now() - start
    print("MultiThread: {}ms".format(elapsed.seconds*1000 + elapsed.microseconds/1000))

    # マルチプロセスの場合
    start = datetime.datetime.now()
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        for i in range(16):
            executor.submit(calc_pure_python, 500, 100)
    elapsed = datetime.datetime.now() - start
    print("MultiProcess: {}ms".format(elapsed.seconds*1000 + elapsed.microseconds/1000))

$ python3 main.py
SingleThread: 17934.699ms
MultiThread: 11256.051ms
MultiProcess: 8493.925ms

os.cpu_count() でCPUのコアを取得できるんか…
処理に時間がかかる箇所をマルチプロセスで実装すれば良さそうではあるな…

【Python】Proof of Stake

import hashlib
import json
from datetime import datetime
import random

class Block:
    def __init__(self, index, previous_hash, timestamp, data, validator):
        self.index = index
        self.previous_hash = previous_hash
        self.timestamp = timestamp
        self.data = data
        self.validator = validator
        self.hash = self.calculate_hash()

    def calculate_hash(self):
        block_dict = self.__dict__
        if 'hash' in block_dict:
            del block_dict['hash']
        block_string = json.dumps(block_dict, sort_keys=True).encode()
        return hashlib.sha256(block_string).hexdigest()

class Blockchain:
    def __init__(self):
        self.chain = []              # A list that holde the blocks of the blockchain
        self.unconfirmed_data = []   # A list of new data or transactions
        self.validators = {}         # A dictionary where each validator is recoded
        self.staked_tokens = {}      # A dictionary that holds the amount of staked tokens for each validator.
        self.minimum_stake = 500     # The minimum amout of tokens a validator must stake to participate in the network
        self.create_genesis_block()  # The genesis block is the first block in the blockchain.

    def create_genesis_block(self):
        genesis_block = Block(0, None, str(datetime.now()), "Genesis Block", None)
        self.chain.append(genesis_block)    
    
    def last_block(self):
        return self.chain[-1]

    def add_data(self, new_data):
        self.unconfirmed_data.append(new_data)

    def add_validator(self, validator, stake):
        if stake >= self.minimum_stake:
            self.staked_tokens[validator] = stake
            self.validators[validator] = True
        else: 
            print(f"{validator} does not meet the minimum stake requirement.")
    
    def select_validator(self):
        total_stake = sum(self.staked_tokens.values())
        selected_validator = None
        while selected_validator == None:
            pick = random.uniform(0, total_stake)
            print(pick)
            current = 0
            for validator, stake in self.staked_tokens.items():
                print(validator, stake)
                current += stake
                if current > pick:
                    selected_validator = validator
                    break
        return selected_validator

    def create_block(self, validator):
        if not self.unconfirmed_data:
            return False
        
        last_block = self.last_block()
        new_block = Block(index=last_block.index + 1,
                        previous_hash=last_block.hash,
                        timestamp=str(datetime.now()),
                        data=self.unconfirmed_data,
                        validator=validator)
        self.chain.append(new_block)
        self.unconfirmed_data = []
        return new_block.index

    def display_chain(self):
        for block in self.chain:
            print(f"Block {block.__dict__}")


# sample
blockchain = Blockchain()

blockchain.add_validator("A", 2000)
blockchain.add_validator("B", 50)
blockchain.add_validator("C", 650)
blockchain.add_validator("D", 30)
blockchain.add_validator("E", 100000)
blockchain.add_validator("F", 25)

blockchain.add_data("Alice send Bob 200 coin")
blockchain.add_data("Bob send Chen 2000 coin")

selected_validator = blockchain.select_validator()
print(f"Validator selected: {selected_validator}")

blockchain.create_block(selected_validator)
blockchain.display_chain()

$ python3 main.py
B does not meet the minimum stake requirement.
D does not meet the minimum stake requirement.
F does not meet the minimum stake requirement.
27576.935798973213
A 2000
C 650
E 100000
Validator selected: E
Block {‘index’: 0, ‘previous_hash’: None, ‘timestamp’: ‘2024-11-27 03:59:33.053116’, ‘data’: ‘Genesis Block’, ‘validator’: None, ‘hash’: ‘df536d1db7e82ccd6f51e244928263163cd36b9724c4fdb8df77a72923dca021’}
Block {‘index’: 1, ‘previous_hash’: ‘df536d1db7e82ccd6f51e244928263163cd36b9724c4fdb8df77a72923dca021’, ‘timestamp’: ‘2024-11-27 03:59:33.053659’, ‘data’: [‘Alice send Bob 200 coin’, ‘Bob send Chen 2000 coin’], ‘validator’: ‘E’, ‘hash’: ‘120255b4707e317e599eff7183b73e31bfc691d71ca987d8186a854c91e5d1bd’}

うーん、なるほどな〜 というところか…

【Python】randomのuniform

The uniform() method returns a random floating number between the two specified numbers (both included).

import random

print(random.uniform(20, 30))

$ python3 test.py
25.547661293351773
$ python3 test.py
27.825416674846192