【Python】非同期処理を理解する

### 非同期処理とは?
あるタスクが終了するのを待っている間、別のタスクを実行すること

$ pip3 install aiohttp

import datetime
import aiohttp
import asyncio

start = datetime.datetime.now()

def log(message):
    print(f'{(datetime.datetime.now() - start).seconds}秒経過', message)

async def fetch(session, url):
    """ 非同期にURLからデータを取得 """
    print(f"Fetching {url}")
    async with session.get(url) as response:
        return await response.text()

async def main():
    log("タスク開始")
    urls = [
        "http://google.com",
        "http://qiita.com",
        "https://www.python.org/",
        "https://www.mozilla.org/en-US/",
        "https://html.spec.whatwg.org/multipage/",
        "https://www.w3.org/TR/css/",
        "https://ecma-international.org/",
        "https://www.typescriptlang.org/",
        "https://www.oracle.com/jp/java/technologies/",
        "https://www.ruby-lang.org/ja/",
        "https://www.postgresql.org/",
        "https://www.mysql.com/jp/",
        "https://docs.djangoproject.com/ja/5.0/",
        "https://spring.pleiades.io/projects/spring-boot",
        "https://rubyonrails.org/"
        "https://firebase.google.com/?hl=ja",
        "https://go.dev/",
        "https://nodejs.org/en"
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]

        print("Starting tasks...")

        print("Tasks are running in the background...")

        results = await asyncio.gather(*tasks)

        for result in results:
            print(result[:100])

    log("task finished")

if __name__ == "__main__":
    asyncio.run(main())

【Python】threadingによる並列処理(マルチスレッド)

threadingというライブラリを使用する
今いるスレッドを確認

import threading
import warnings
warnings.simplefilter('ignore')

print(threading.currentThread().getName())

$ python3 main.py
MainThread

### threading.Threadでmainとは別にスレッドを作成する

import threading
import time
import warnings
warnings.simplefilter('ignore')

def boil_udon():
    print(" ■ thread :", threading.currentThread().getName())

    print(' うどんを茹でます。')
    time.sleep(3)
    print(' うどんが茹で上がりました。')

if __name__ == "__main__":
    print(" ■ thread :", threading.currentThread().getName())

    print('うどんを作ります。')

    # スレッドを作成
    thread1 = threading.Thread(target=boil_udon)
    thread1.start()
    thread1.join()

    print('うどんの盛り付けをします。')
    print('うどんができました。')

$ python3 main.py
■ thread : MainThread
うどんを作ります。
■ thread : Thread-1 (boil_udon)
うどんを茹でます。
うどんが茹で上がりました。
うどんの盛り付けをします。
うどんができました。

### スレッドを更に追加する

import threading
import time
import warnings
warnings.simplefilter('ignore')

def boil_udon():
    print(" ■ thread :", threading.currentThread().getName())

    print(' うどんを茹でます。')
    time.sleep(3)
    print(' うどんが茹で上がりました。')

def make_tuyu():
    print(" ■ thread :", threading.currentThread().getName())

    print(' うどんの汁を作ります。')
    time.sleep(2)
    print(' うどんの汁ができました。')


if __name__ == "__main__":
    print(" ■ thread :", threading.currentThread().getName())

    print('うどんを作ります。')

    # スレッドを作成
    thread1 = threading.Thread(target=boil_udon)
    thread2 = threading.Thread(target=make_tuyu)

    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()

    print('うどんの盛り付けをします。')
    print('うどんができました。')

$ python3 main.py
■ thread : MainThread
うどんを作ります。
■ thread : Thread-1 (boil_udon)
うどんを茹でます。
■ thread : Thread-2 (make_tuyu)
うどんの汁を作ります。
うどんの汁ができました。
うどんが茹で上がりました。
うどんの盛り付けをします。
うどんができました。

threadが作られると、同時に処理されていることがわかる
ThreadPoolExecutorの場合は、単純に1つの処理を複数のスレッドで実行するが、threadingの場合は、プログラム内容を指定してスレッドを作成することができる

Pythonでマルチスレッドとマルチプロセス

### シングルスレッド

from concurrent.futures import ThreadPoolExecutor
import time
def func():
    time.sleep(1)
start = time.time()
for i in range(8):
    func()
print(time.time() - start)

$ python3 single.py
8.064586639404297

### マルチスレッド(同時に実行)

from concurrent.futures import ThreadPoolExecutor
import time
def func():
    time.sleep(1)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as e:
    for i in range(8):
        e.submit(func)
print(time.time() - start)

$ python3 multithread.py
2.0498673915863037

### マルチプロセス(複数のプロセスで同時に処理)

from concurrent.futures import ProcessPoolExecutor
import time
def func():
    time.sleep(1)
start = time.time()
with ProcessPoolExecutor(max_workers=4) as e:
    for i in range(8):
        e.submit(func)
print(time.time() - start)

$ python3 multiprocess.py
2.1424620151519775

import os
import time
import datetime
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

x1, x2, y1, y2 = -1.8, 1.8, -1.8, 1.8
c_real, c_imag = -0.62772, -0.42193

def calculate_z_serial_purepython(maxiter, zs, cs):
    output = [0] * len(zs)
    for i in range(len(zs)):
        n = 0
        z = zs[i]
        c = cs[i]
        if (i % 100) == 0:
            time.sleep(0.0001)
        while abs(z) < 2 and n < maxiter:
            z = z * z + c
            n += 1
        output[i] = n
    return output

def calc_pure_python(desired_width, max_iterations):
    x_step = (float(x2 - x1) / float(desired_width))
    y_step = (float(y1 - y2) / float(desired_width))
    x = []
    y = []
    ycoord = y2
    while ycoord > y1:
        y.append(ycoord)
        ycoord += y_step
    xcoord = x1
    while xcoord < x2:
        x.append(xcoord)
        xcoord += x_step

    zs = []
    cs = []
    for ycoord in y:
        for xcoord in x:
            zs.append(complex(xcoord, ycoord))
            cs.append(complex(c_real, c_imag))
    
    output = calculate_z_serial_purepython(max_iterations, zs, cs)

if __name__ == "__main__":

    max_workers = os.cpu_count()
    start = datetime.datetime.now()
    for i in range(16):
        calc_pure_python(desired_width=500, max_iterations=100)
    elapsed = datetime.datetime.now() - start
    print("SingleThread: {}ms".format(elapsed.seconds*1000 + elapsed.microseconds/1000))

    start = datetime.datetime.now()
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        for i in range(16):
            executor.submit(calc_pure_python, 500, 100)
    elapsed = datetime.datetime.now() - start
    print("MultiThread: {}ms".format(elapsed.seconds*1000 + elapsed.microseconds/1000))

    # マルチプロセスの場合
    start = datetime.datetime.now()
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        for i in range(16):
            executor.submit(calc_pure_python, 500, 100)
    elapsed = datetime.datetime.now() - start
    print("MultiProcess: {}ms".format(elapsed.seconds*1000 + elapsed.microseconds/1000))

$ python3 main.py
SingleThread: 17934.699ms
MultiThread: 11256.051ms
MultiProcess: 8493.925ms

os.cpu_count() でCPUのコアを取得できるんか…
処理に時間がかかる箇所をマルチプロセスで実装すれば良さそうではあるな…

【Python】Proof of Stake

import hashlib
import json
from datetime import datetime
import random

class Block:
    def __init__(self, index, previous_hash, timestamp, data, validator):
        self.index = index
        self.previous_hash = previous_hash
        self.timestamp = timestamp
        self.data = data
        self.validator = validator
        self.hash = self.calculate_hash()

    def calculate_hash(self):
        block_dict = self.__dict__
        if 'hash' in block_dict:
            del block_dict['hash']
        block_string = json.dumps(block_dict, sort_keys=True).encode()
        return hashlib.sha256(block_string).hexdigest()

class Blockchain:
    def __init__(self):
        self.chain = []              # A list that holde the blocks of the blockchain
        self.unconfirmed_data = []   # A list of new data or transactions
        self.validators = {}         # A dictionary where each validator is recoded
        self.staked_tokens = {}      # A dictionary that holds the amount of staked tokens for each validator.
        self.minimum_stake = 500     # The minimum amout of tokens a validator must stake to participate in the network
        self.create_genesis_block()  # The genesis block is the first block in the blockchain.

    def create_genesis_block(self):
        genesis_block = Block(0, None, str(datetime.now()), "Genesis Block", None)
        self.chain.append(genesis_block)    
    
    def last_block(self):
        return self.chain[-1]

    def add_data(self, new_data):
        self.unconfirmed_data.append(new_data)

    def add_validator(self, validator, stake):
        if stake >= self.minimum_stake:
            self.staked_tokens[validator] = stake
            self.validators[validator] = True
        else: 
            print(f"{validator} does not meet the minimum stake requirement.")
    
    def select_validator(self):
        total_stake = sum(self.staked_tokens.values())
        selected_validator = None
        while selected_validator == None:
            pick = random.uniform(0, total_stake)
            print(pick)
            current = 0
            for validator, stake in self.staked_tokens.items():
                print(validator, stake)
                current += stake
                if current > pick:
                    selected_validator = validator
                    break
        return selected_validator

    def create_block(self, validator):
        if not self.unconfirmed_data:
            return False
        
        last_block = self.last_block()
        new_block = Block(index=last_block.index + 1,
                        previous_hash=last_block.hash,
                        timestamp=str(datetime.now()),
                        data=self.unconfirmed_data,
                        validator=validator)
        self.chain.append(new_block)
        self.unconfirmed_data = []
        return new_block.index

    def display_chain(self):
        for block in self.chain:
            print(f"Block {block.__dict__}")


# sample
blockchain = Blockchain()

blockchain.add_validator("A", 2000)
blockchain.add_validator("B", 50)
blockchain.add_validator("C", 650)
blockchain.add_validator("D", 30)
blockchain.add_validator("E", 100000)
blockchain.add_validator("F", 25)

blockchain.add_data("Alice send Bob 200 coin")
blockchain.add_data("Bob send Chen 2000 coin")

selected_validator = blockchain.select_validator()
print(f"Validator selected: {selected_validator}")

blockchain.create_block(selected_validator)
blockchain.display_chain()

$ python3 main.py
B does not meet the minimum stake requirement.
D does not meet the minimum stake requirement.
F does not meet the minimum stake requirement.
27576.935798973213
A 2000
C 650
E 100000
Validator selected: E
Block {‘index’: 0, ‘previous_hash’: None, ‘timestamp’: ‘2024-11-27 03:59:33.053116’, ‘data’: ‘Genesis Block’, ‘validator’: None, ‘hash’: ‘df536d1db7e82ccd6f51e244928263163cd36b9724c4fdb8df77a72923dca021’}
Block {‘index’: 1, ‘previous_hash’: ‘df536d1db7e82ccd6f51e244928263163cd36b9724c4fdb8df77a72923dca021’, ‘timestamp’: ‘2024-11-27 03:59:33.053659’, ‘data’: [‘Alice send Bob 200 coin’, ‘Bob send Chen 2000 coin’], ‘validator’: ‘E’, ‘hash’: ‘120255b4707e317e599eff7183b73e31bfc691d71ca987d8186a854c91e5d1bd’}

うーん、なるほどな〜 というところか…

【Python】randomのuniform

The uniform() method returns a random floating number between the two specified numbers (both included).

import random

print(random.uniform(20, 30))

$ python3 test.py
25.547661293351773
$ python3 test.py
27.825416674846192

【Python】listのappend

import json
class MyClass:
    def __init__(self, a, b):
        self.group = []
        self.a = a
        self.b = b

    def add(self):
        c = self.a + self.b
        self.group.append(c)
        print(self.group)

obj = MyClass(2, 4)
obj.add()
obj.add()

$ python3 test.py
[6]
[6, 6]

【Python】jsonのsort_keys

json.dumpする際に、sort_keysをTrueとすることで、dictionallyのkeyでソートする

import json
class MyClass:
    def __init__(self, a, c, b):
        self.a = a
        self.c = c
        self.b = b

obj = MyClass(2, 4, 6)

string = json.dumps(obj.__dict__)
print(string)

string = json.dumps(obj.__dict__, sort_keys=True)
print(string)

$ python3 test.py
{“a”: 2, “c”: 4, “b”: 6}
{“a”: 2, “b”: 6, “c”: 4}

なるほど、これで下のコードが何をやってるのか理解できたわ

import hashlib
import json
from datetime import datetime
import random

class Block:
    def __init__(self, index, previous_hash, timestamp, data, validator):
        self.index = index
        self.previous_hash = previous_hash
        self.timestamp = timestamp
        self.data = data
        self.validator = validator
        self.hash = self.calculate_hash()

    def calculate_hash(self):
        block_dict = self.__dict__
        if 'hash' in block_dict:
            del block_dict['hash']
        block_string = json.dumps(block_dict, sort_keys=True).encode()
        return hashlib.sha256(block_string).hexdigest()

【Python】selfと__dict__

__dict__はオブジェクトが持つ属性とその値を格納する辞書を返す特殊属性
モジュール、クラス、インスタンスなどで参照可能

class MyClass:
    def __init__(self, x, y):
        self.x = x
        self.y = y

obj = MyClass(2, 4)
print(obj.__dict__)

$ python3 test.py
{‘x’: 2, ‘y’: 4}

追加、更新、削除も可能

class MyClass:
    def __init__(self, x, y):
        self.x = x
        self.y = y

obj = MyClass(2, 4)
print(obj.__dict__)

obj.__dict__['z'] = 3
print(obj.__dict__)

obj.__dict__['x'] = 10
print(obj.__dict__)

del obj.__dict__['y']
print(obj.__dict__)

$ python3 test.py
{‘x’: 2, ‘y’: 4}
{‘x’: 2, ‘y’: 4, ‘z’: 3}
{‘x’: 10, ‘y’: 4, ‘z’: 3}
{‘x’: 10, ‘z’: 3}

dictはdictionallyの略ですね。__init__と__dict__が同時に出ると、やや混乱しましたが、使い方を理解すれば合点行きます。

【Python】classとself

initがコンストラクターとなり、selfはオブジェクト自身を表す

class Person:
    def __init__(self, name, age):
        self.name = name
        self.age = age
    def share_self(self):
        print("名前:", self.name)
        print("年齢:", self.age, '歳')

person_1 = Person('田中', 18)
person_2 = Person('山田', 20)

person_1.share_self()
person_2.share_self()

$ python3 test.py
名前: 田中
年齢: 18 歳
名前: 山田
年齢: 20 歳