【Python】シャミアの秘密計算

import random
from typing import List, Tuple

def generate_coefficients(secret: int, threshold: int) -> List[int]:
    coefficients = [secret]
    for _ in range(threshold - 1):
        coefficients.append(random.randint(1, 256))
    return coefficients

def create_shares(
    secret: int, total_shares: int, threshold: int
) -> List[Tuple[int, int]]: 
    coefficients = generate_coefficients(secret, threshold)
    shares = []
    for x in range(1, total_shares + 1):
        y = sum(coeff * (x**exp) for exp, coeff in enumerate(coefficients))
        shares.append((x, y))
    return shares

def reconstruct_secret(shares: List[Tuple[int, int]], threshold: int) -> int:
    def _lagrange_interpolation(x: int, x_s: List[int], y_s: List[int]) -> int:
        def _basis(j: int) -> int:
            num = 1
            den = 1
            for m in range(len(x_s)):
                if m != j:
                    num *= x - x_s[m]
                    den *= x_s[j] - x_s[m]
            return num // den

        result = 0
        for j in range(len(y_s)):
            result += y_s[j] * _basis(j)
        return result

    x_s, y_s = zip(*shares)
    return _lagrange_interpolation(0, x_s, y_s)

if __name__ == "__main__":
    secret = 2732
    total_shares = 5
    threshold = 2

    shares = create_shares(secret, total_shares, threshold)
    print("shares:", shares)

    selected_shares = shares[:threshold]
    print(zip(*selected_shares))
    x_s, y_s = zip(*selected_shares)
    print(x_s, y_s)
    recovered_secret = reconstruct_secret(selected_shares, threshold)
    print("Recovered Secret:", recovered_secret)

$ python3 test.py
shares: [(1, 2961), (2, 3190), (3, 3419), (4, 3648), (5, 3877)]

(1, 2) (2961, 3190)
Recovered Secret: 2732

【Python】error: externally-managed-environment

$ pip3 install psycopg2
error: externally-managed-environment

× This environment is externally managed
╰─> To install Python packages system-wide, try apt install
python3-xyz, where xyz is the package you are trying to
install.

プロジェクトごとに仮想環境を作って、そこでインストールすることが推奨されている。
$ python3 -m venv venv
$ source venv/bin/activate
$ python3 -V
$ pip3 install psycopg2 
$ pip3 install python-dotenv
$ cd assets
$ python3 migration.py
$ deactivate

### 仮想環境を作らない方法
$ pip3 install psycopg2 –break-system-packages
これでもいける。

【Python】psycopg2で複数のsql文を同時に実行する

psycopg2 の execute は、commitの前に連続して実行できる。そのため、以下のように、sql文をforループで回すことができる。

import psycopg2
import os
from dotenv import load_dotenv

load_dotenv('../.env')
PSQL_PASSWORD = os.getenv('PSQL_PASSWORD')

connection = psycopg2.connect(
    host='localhost',
    user='postgres',
    password=PSQL_PASSWORD
)
cur = connection.cursor()

sql1 = "CREATE TABLE IF NOT EXISTS person1 ( \
            id SERIAL PRIMARY KEY, \
            name TEXT NOT NULL, \
            data BYTEA \
        );"
sql2 = "CREATE TABLE IF NOT EXISTS person2 ( \
            id SERIAL PRIMARY KEY, \
            name TEXT NOT NULL, \
            data BYTEA \
        );"
sqls = [sql1, sql2]

for sql in sqls:
    cur.execute(sql)

connection.commit()
cur.close()
connection.close()

$ python3 migration.py

# \dt
List of relations
Schema | Name | Type | Owner
——–+———-+——-+———-
public | person1 | table | postgres
public | person2 | table | postgres

なるほどー

【Python】ブルームフィルタ

bloom filterの全体像。ハッシュ化した値をインデックス番号にして、bloom_filterの値を更新する。
bloom filterにハッシュ値が入っているかどうか確認することで、bloom filterに登録されているかを判定する。

bloom_filter = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

def hash_a(val):
    return hash_val

def hash_b(val):
    return hash_val


val = "hello world"

a_hashed = hash_a(val) # 1
b_hashed = hash_b(val) # 4

bloom_filter[a_hashed] = 1
bloom_filter[b_hashed] = 1

[0, 1, 0, 0, 4, 0, 0, 0, 0, 0]

bloom_filter[a_hashed]
bloom_filter[b_hashed]
import functools

class BloomFilter:
    def __init__(self, filter_size):
        self.filter_size = filter_size
        self.bloom_filter = [0 for _ in range(filter_size)]

    def set_v(self, val):
        indexes = self.n_hash(val)
        for index in indexes:
            self.bloom_filter[index] = 1

    def n_hash(self, val):
        hashed = abs(hash(val))
        d_lst = [int(n) for n in str(hashed)]
        return [
            self._hash_common(lambda acc, d: acc + d, d_lst),
            self._hash_common(lambda acc, d: acc + 3 * d, d_lst),
        ]

    def _hash_common(self, func, d_lst):
        execed = abs(functools.reduce(func, d_lst, 0))
        while execed >= self.filter_size:
            execed = execed / self.filter_size
        return int(execed)

    def exist_v(self, val):
        indexes = self.n_hash(val)
        for index in indexes:
            if self.bloom_filter[index] == 0:
                return False
            return True

bf = BloomFilter(10)
print(bf.bloom_filter)
bf.set_v(3)
print(bf.bloom_filter)
print(bf.exist_v(3))
print(bf.exist_v(10))

$ python3 bloom_filter.py
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 1, 0, 0, 0, 0, 0, 1]
True
False

BTCではSPVがScriptPubKeyをbloomfilterにセットして、リモートノードがマッチングアルゴリズムで判定する。
-> リモートノード側では、トランザクションごとに、ScriptPubKey, Outpointがマッチするかを判定して、マッチする場合はブルームフィルタを更新している。

なるほど、直接pubkeyのデータをやり取りしない分、安全性が向上するということね。これをRustでやりたい。

文字列を数値に変換するには、文字列を数値として持っておいて、それを変換でしょうか。

static ASCII_LOWER: [char; 26] = [
    'a', 'b', 'c', 'd', 'e', 
    'f', 'g', 'h', 'i', 'j', 
    'k', 'l', 'm', 'n', 'o',
    'p', 'q', 'r', 's', 't', 
    'u', 'v', 'w', 'x', 'y', 
    'z',
];

【Python】HD walletの親鍵/子鍵とchaincodeの概要

子の秘密鍵は、親のchaincdeと[親の公開鍵+index]をhmac_sha512でハッシュ化して作成している。

### マスター秘密鍵、公開鍵

import os
import binascii
import hmac
import hashlib
import ecdsa

seed = os.urandom(32)
root_key = b"Bitcoin seed"

def hmac_sha512(data, key_message):
    hash = hmac.new(data, key_message, hashlib.sha512).digest()
    return hash

def create_pubkey(private_key):
    publickey = ecdsa.SigningKey.from_string(private_key, curve=ecdsa.SECP256k1).verifying_key.to_string()
    return publickey

master = hmac_sha512(seed, root_key)

master_secretkey = master[:32]
master_chaincode = master[32:]

master_publickey = create_pubkey(master_secretkey)
master_publickey_integer = int.from_bytes(master_publickey[32:], byteorder="big")

if master_publickey_integer %2 == 0:
    master_publickey_x = b"\x02" + master_publickey[:32]
else:
    master_publickey_x = b"\x03" + master_publickey[:32]

print(binascii.hexlify(master_secretkey))
print(binascii.hexlify(master_chaincode))
print(binascii.hexlify(master_publickey_x))

$ python3 master_key.py
b’8a6dbaaff700682778dcbae2bc8718452fe5ed80fc9026a9b564420f8d5b0d80′
b’4ce8b10cc0c0874467d8f438c412fdbf21fba51517e668dbc4bd105af6861dec’
b’03cb15210804ca8f0d45b620832be935e2f90c3830f13f04c4bd6e8b4648f27817′
(secretkey, chaincode, pubkey)

### 子秘密鍵、子公開鍵

index = 0
index_bytes = index.to_bytes(8, "big")

data = master_publickey_x + index_bytes

result_hmac512 = hmac_sha512(data, master_chaincode)
sum_integer = int.from_bytes(master_secretkey,"big") + int.from_bytes(result_hmac512[:32],"big")

p = 2 ** 256 - 2**32 - 2**9 - 2**8 - 2**7 - 2**6 - 2**4 - 1
child_secretkey = (sum_integer % p).to_bytes(32,"big")

child_chaincode = result_hmac512[32:]

child_publickey = create_pubkey(child_secretkey)
child_publickey_integer = int.from_bytes(child_publickey[32:], byteorder="big")

if child_publickey_integer %2 == 0:
    child_publickey_x = b"\x02" + child_publickey[:32]
else:
    child_publickey_x = b"\x03" + child_publickey[:32]

print(binascii.hexlify(child_secretkey))
print(binascii.hexlify(child_chaincode))
print(binascii.hexlify(child_publickey_x))

b’5ff011a3e9cd672aaf0dc9fd52cb3172ac2815cb270f919135e6b0f0e6e03d54′
b’15f4d148b2d7730076d5e670249649ea8f0fd8572dad3818680e347196149dda’
b’03480a1dbb4a87d867bee3d364b608e21d685af271876707b9f9d5b75c6df6fde7′

b’23faf6fad81cd93e12c003c944ba3ef215dae714c638756386b6b9404da5aac9′
b’e3563dc6891e238cd0d8ebf99e65ebfc67cecf42364de9756b89859bbd049b62′
b’02039253af3e828bfbf1e560fe0e923a144fc4496ded3b6bbfa0d568cf7177d1c3′

なるほど、一見複雑そうに見えるが、なかなか面白いね

【Python】DH-Walletのライブラリを利用してみる

$ pip3 install hdwallet

#!/usr/bin/env python3

from hdwallet import HDWallet
from hdwallet.entropies import (
    BIP39Entropy, BIP39_ENTROPY_STRENGTHS
)
from hdwallet.mnemonics import BIP39_MNEMONIC_LANGUAGES
from hdwallet.cryptocurrencies import Bitcoin as Cryptocurrency
from hdwallet.hds import BIP32HD
from hdwallet.derivations import CustomDerivation
from hdwallet.const import PUBLIC_KEY_TYPES

import json

# Initialize Bitcoin HDWallet
hdwallet: HDWallet = HDWallet(
    cryptocurrency=Cryptocurrency,
    hd=BIP32HD,
    network=Cryptocurrency.NETWORKS.MAINNET,
    language=BIP39_MNEMONIC_LANGUAGES.KOREAN,
    public_key_type=PUBLIC_KEY_TYPES.COMPRESSED,
    passphrase="talonlab"
).from_entropy(  # Get Bitcoin HDWallet from entropy
    entropy=BIP39Entropy(
        entropy=BIP39Entropy.generate(
            strength=BIP39_ENTROPY_STRENGTHS.ONE_HUNDRED_SIXTY
        )
    )
).from_derivation(  # Drive from Custom derivation
    derivation=CustomDerivation(
        path="m/0'/0/0"
    )
)

# Print all Bitcoin HDWallet information's
print(json.dumps(hdwallet.dump(exclude={"indexes"}), indent=4, ensure_ascii=False)) 

$ python3 hd_wallet.py
{
“cryptocurrency”: “Bitcoin”,
“symbol”: “BTC”,
“network”: “mainnet”,
“coin_type”: 0,
“entropy”: “f90afa34cc3653845d06c179ba917569c60e1179”,
“strength”: 160,
“mnemonic”: “호흡 분리 여론 온종일 생신 조용히 숙소 추석 시월 청춘 사계절 철도 살림 건축 팩스”,
“passphrase”: “talonlab”,
“language”: “Korean”,
“seed”: “d79ac6b15a3782eefd52ae4b09c961ddd7263f6de4a852534497139516bc04549cb62dd2063f21090b13b58c442a35610f630034ca9ad85877ee8a544f8643b4”,
“ecc”: “SLIP10-Secp256k1”,
“hd”: “BIP32”,
“semantic”: “p2pkh”,
“root_xprivate_key”: “xprv9s21ZrQH143K35Wb8sjvrSbVYqX2xGJvX64QxwC1spNkdnakC1Mtwv2CQE1trAMGA83TGEZSjWtdh1s9pFT2P6z2PKepwjvhbkeU69UsGaE”,
“root_xpublic_key”: “xpub661MyMwAqRbcFZb4EuGwDaYE6sMXMj2mtJz1mKbdS9ujWautjYg9ViLgFWsQNQ3PjEnrdPhNhS9JvuH6WLG5pgJaTBezMoy7ecEKLK3BAvH”,
“root_private_key”: “4ba3752c7a474a3be62ef060fd79c6517b7f2af01d7e573f11685e20f55a0303”,
“root_wif”: “Kykk1CNJWm7sAsve2eDCezUykFyQfX9BavJNTWakMutPYSSHZ5kP”,
“root_chain_code”: “66708f4fdae1aec89612e24205d9244f0b8a1a4ada2fc6076e0eea8a91131831”,
“root_public_key”: “033bbd876465246c571f02df1aa65527c82dfa9779833a2840ecb63167f1f507a0”,
“strict”: true,
“public_key_type”: “compressed”,
“wif_type”: “wif-compressed”,
“derivation”: {
“at”: {
“path”: “m/0’/0/0”,
“depth”: 3,
“index”: 0
},
“xprivate_key”: “xprv9yoVptUsgY29Jz8tMnbLeoWQygM3m6zYSWdYDiVdBazjqyc11ioZHdowCxYi17fN5CwDnm3ith9tMeVjv5GToJY4S7NLrbuged6Q3DJiGWy”,
“xpublic_key”: “xpub6CnrEQ1mWuaSXUDMTp8M1wT9XiBYAZiPojZ926uEjvXiimw9ZG7oqS8R4Dev3hzwcXeqNmMiN9knr88cPtduVZ1Gqn1kchT23eB5wK7N8t1”,
“private_key”: “96ceeef7b35bfd428842d2dc08719d3c5b5cff9967a7110ece12d57af017a66b”,
“wif”: “L2GryxVSFxauZCLCQJqmV85tVNoXCXyRdC5zomodDWwd33BPjJNn”,
“chain_code”: “7a95361a7ffc067c5165411d55d1400bc5b3cb5ea2ca90741e918fd2a8fb3349”,
“public_key”: “02a0e4a452f270f2647949a57cc55909182eb2f232f4b5146ccdd232a4ffed8803”,
“uncompressed”: “04a0e4a452f270f2647949a57cc55909182eb2f232f4b5146ccdd232a4ffed88030259ef4fa9649169a66b61c26881426d68b6b51ad6ea3cb7dc0b41cc85b6ba00”,
“compressed”: “02a0e4a452f270f2647949a57cc55909182eb2f232f4b5146ccdd232a4ffed8803”,
“hash”: “1bff595df8d0d9695e612eca95afcf4c882e5f32”,
“fingerprint”: “1bff595d”,
“parent_fingerprint”: “9b0194d3”,
“addresses”: {
“p2pkh”: “13Z399RbXeELGgKRKq5sM8nNim9waZGoPW”,
“p2sh”: “35nKB4UKgHSzRYX4J36YxLFMqm7YpCkMKU”,
“p2tr”: “bc1pmsdcxya38lnkmrs62e4yuad0cgr322tp9cc7sgjrmfe3e3jasaeqcvrvwl”,
“p2wpkh”: “bc1qr0l4jh0c6rvkjhnp9m9ftt70fjyzuhejdwqt46”,
“p2wpkh_in_p2sh”: “3KNNVBigqcdPAsUU1apzbRJj5dx993YoGQ”,
“p2wsh”: “bc1qfznz7ncnv4xu29zx3cdv7dzdtgc9e600ukd0kxyca7dg6k9gswqqs8xs7w”,
“p2wsh_in_p2sh”: “36MvcLHZq8jAogs7rm8beCo6rZEybZRkzk”
}
}
}

#!/usr/bin/env python3

from hdwallet.utils import generate_passphrase
print(generate_passphrase(length=32))

$ python3 hd_wallet.py
MQIjC1twsYSqRYAYzUbXZGouqirSa65t

#!/usr/bin/env python3

from hdwallet.mnemonics.algorand import AlgorandMnemonic, ALGORAND_MNEMONIC_WORDS, ALGORAND_MNEMONIC_LANGUAGES
mnemonic: str = AlgorandMnemonic.from_words(words=ALGORAND_MNEMONIC_WORDS.TWENTY_FIVE, language=ALGORAND_MNEMONIC_LANGUAGES.ENGLISH)
print(AlgorandMnemonic.from_entropy(entropy="65234f4ec655b087dd74d186126e301d73d563961890b2f718476e1a32522329", language=ALGORAND_MNEMONIC_LANGUAGES.ENGLISH))

hole develop cheese fragile gaze giggle plunge sphere express reunion oblige crack priority ocean seven mosquito wagon glow castle plunge goddess stand empower ability empower

行列の計算

import random
from typing import List

Row = List[int]
Matrix = List[Row]

def matrix_multiply(matrix_a: Matrix, matrix_b: Matrix) -> Matrix:
    num_rows_a = len(matrix_a)
    num_cols_a = len(matrix_a[0])
    num_rows_b = len(matrix_b)
    num_cols_b = len(matrix_b[0])
    if num_cols_a != num_rows_b:
        raise ArithmeticError(
            f"Invalid dimensions; Cannot multiply "
            f"{num_rows_a}x{num_cols_a}*{num_rows_b}x{num_cols_b}"
        )
    solution_matrix = [[0] * num_cols_b for _ in range(num_rows_a)]
    for i in range(num_rows_a):
        for j in range(num_cols_b):
            for k in range(num_cols_a):
                solution_matrix[i][j] += matrix_a[i][k] * matrix_b[k][j]
    return solution_matrix

if __name__ == "__main__":
    cols = 3
    rows = 2
    A = [[random.randint(0, 10) for i in range(cols)]
        for j in range(rows)]
    print(f"matrix A: {A}")
    B = [[random.randint(0, 10) for i in range(rows)]
        for j in range(cols)]
    print(f"matrix B: {B}")
    C = matrix_multiply(A, B)
    print(f"matrix C: {C}")

$ python3 matmul_sequential.py
matrix A: [[9, 0, 0], [10, 6, 1]]
matrix B: [[9, 5], [1, 5], [10, 5]]
matrix C: [[81, 45], [106, 85]]

ランダムな連想配列の計測

選挙投票のカウントを想定したもの

import typing as T
import random

Summary = T.Mapping[int, int]

def process_votes(pile: T.List[int]) -> Summary:
    summary = {}
    for vote in pile:
        if vote in summary:
            summary[vote] += 1
        else:
            summary[vote] = 1
    return summary

if __name__ == "__main__":
    num_candidates = 3
    num_voters = 100
    pile = [random.randint(1, num_candidates) for _ in range(num_voters)]
    print(pile)
    counts = process_votes(pile)
    print(f"Total number of votes: {counts}")

$ python3 count_votes_sequential.py
[1, 1, 2, 1, 2, 3, 2, 2, 1, 2, 2, 3, 3, 2, 3, 1, 2, 2, 2, 3, 1, 1, 2, 3, 1, 2, 3, 3, 2, 2, 3, 2, 1, 3, 2, 2, 3, 3, 2, 3, 3, 3, 2, 3, 2, 1, 1, 3, 2, 3, 3, 2, 2, 1, 1, 1, 1, 2, 3, 3, 1, 2, 2, 3, 2, 1, 2, 2, 1, 3, 1, 1, 2, 1, 1, 2, 3, 3, 1, 2, 1, 2, 3, 3, 3, 1, 2, 2, 2, 1, 1, 3, 2, 1, 2, 2, 3, 2, 3, 1]
Total number of votes: {1: 29, 2: 40, 3: 31}

Rustで書き直す
範囲指定のランダムは rnd.gen_range となる。

use std::time::Instant;
use threadpool::ThreadPool;
use rand::Rng;
use std::collections::HashMap;

fn process_votes(pile: Vec<u32>) -> HashMap<u32, u32> {
    let mut summary = HashMap::new();
    for vote in pile {
        if summary.get(&vote) != None {
            let count = summary.get(&vote).unwrap();
            summary.insert(vote, count + 1);
        } else {
            summary.insert(vote, 1);
        }
    }
    summary
}


fn main(){    
    
    let num_voters = 100;
    let num_candidate = 3;
    
    let mut pile: Vec<u32> = Vec::new();
    let mut rnd = rand::thread_rng();
    for _ in 0..num_voters {
        pile.push(rnd.gen_range(1..(num_candidate + 1)))
    }
    let summary = process_votes(pile);
    println!("{:?}", summary);
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.29s
Running `target/debug/parallel`
{3: 42, 2: 33, 1: 25}

ここまではOK

【Python】ファイルから文字列を検索

短いコードなんだけと、割と高度なことをやっている…

import os
import time
import glob
import typing as T

def search_file(file_location: str, search_string: str) -> bool:
    with open(file_location, "r", encoding="utf8") as file:
        return search_string in file.read()
    
def search_files_sequentially(file_locations: T.List[str], search_string: str) -> None:
    for file_name in file_locations:
        result = search_file(file_name, search_string)
        if result:
            print(f"Found word in file: `{file_name}`")

if __name__ == "__main__":
    file_locations = list(
        glob.glob(f"{os.path.abspath(os.getcwd())}/*.py"))
    search_string = input("what word are you trying to find?: ")

    start_time = time.perf_counter()
    search_files_sequentially(file_locations, search_string)
    process_time = time.perf_counter() - start_time
    print(f"PROCESS TIME: {process_time}")

$ python3 find_files_sequential.py
what word are you trying to find?: queue
Found word in file: `/home/vagrant/dev/rust/parallel/python/thread_pool.py`
Found word in file: `/home/vagrant/dev/rust/parallel/python/pipeline.py`
Found word in file: `/home/vagrant/dev/rust/parallel/python/message_queue.py`
PROCESS TIME: 0.004676494048908353

OK, これをRustで書きたい…