【Python】ゼロ知識証明

ゼロ知識証明とは、証明者がその事実を提示するだけで、その事実の内容を隠すことができる証明方法
Zero-knowledge Proofとしてよく使われるのが、ハミルトングラフの知識を持っていることや秘密のパスワードを証明せず知っていることを示す。

xを知っていおり、 y = x^2 mod p を示す。

import random

p = 23
x = 5
y = pow(x, 2, p)

print(" y =", y)

def prover_step():
    r = random.randint(1, p - 1)
    a = pow(r, 2, p)
    return r, a

def verifier_challenge():
    return random.randint(0, 1)

def prover_response(r, c):
    if c == 0:
        return r
    else:
        return (r * x) % p

def verifier_check(a, z, c):
    if c == 0:
        return pow(z, 2, p) == a
    else:
        return pow(z, 2, p) == (a * y) % p

r, a = prover_step()
print("Prover: 提出 a=", a)

c = verifier_challenge()
print("Verifier: チャレンジ c=", c)

z = prover_response(r, c)
print("Prover: 応答 z =", z)

ok = verifier_check(a, z, c)
print("Verifier: 検証結果 =", ok)

$ python3 zkp.py
y = 2
Prover: 提出 a= 4
Verifier: チャレンジ c= 1
Prover: 応答 z = 10
Verifier: 検証結果 = True

—– 実際の計算
r = ?,
a = 4,
c = 1
z = 10
10 * 10 % 23 = 5 * 2 / 23

x の値を知らせずに、y = x^2 mod p を証明している。証明の際には、c != 0 の時にのみyの値を利用する
zkpの概念は理解できたが、なぜこれが証明になるのかのロジックはよくわからない..

【Solana】Proof of HistoryをPythonで書く

前のブロックのiter,hashoutをベースに iter%(2**delay) == 0 になるまで、ブロックごとに262144回sha256を計算して、hash値を求める。hashoutが決まっているので、先回りしてブロックを作れそうだが、SolanaはPoSのため、不正がバレるとstakingが没収されてしまう。なるほどねー

import datetime
import hashlib
import time
import os
import pathlib
import random
import string
from random import randint

iter=1
hashin = ''.join(random.choices(string.ascii_uppercase + string.digits, k = 10))
hashout = hashlib.sha256(hashin.encode()).hexdigest()

print(hashin)
print(hashout)

def datastream():
    v1 = int(randint(1200, 1500))
    v2 = int(randint(1300, 1700))
    v3 = int(randint(1100, 1500))
    v4 = int(randint(4000, 5600))
    v5 = int(randint(4000, 5600))
    v6 = int(randint(1900, 2400))
    v7 = int(randint(1920, 2300))
    v8 = int(randint(1850, 2200))
    v9 = int(randint(1900, 2300))
    v10 = int(randint(1800, 2200))
    return [v1, v2, v3, v4, v5, v6, v7, v8, v9, v10];

class Block:
    blockNo = 0
    count = iter
    data = None
    next = None
    hash = "None"
    previous_hash = "None"
    timestamp = datetime.datetime.now()

    def __init__(self, data):
        self.data = datastream()

    def hash(self):
        file = pathlib.Path("TToken")
        if file.exists():
            tier=open("TToken").readline().rstrip()
        else:
            with open("benchmark.py") as infile:
                exec(infile.read())
            tier=open("TToken").readline().rstrip()

        if tier=="T1":
            h = hashlib.md5()
        elif tier=="T2":
            h = hashlib.sha1()
        elif tier=="T3":
            h = hashlib.blake2s()
        elif tier=="T4":
            h = hashlib.sha3_256()

        h.update(
		str(self.nonce).encode('utf-8') +
		str(self.data).encode('utf-8') +
		str(self.previous_hash).encode('utf-8') +
		str(self.timestamp).encode('utf-8') +
		str(self.blockNo).encode('utf-8')
		)
        return h.hexdigest()
    
        block.blockNo = self.block.blockNo + 1

    def __str__(self):
        return "Block Number: " + str(self.blockNo) + "\nHistory Count: " + str(self.count) + "\nBlock Data: " + str(self.data) + "\nBlock Hash: " + str(self.hash) + "\nPrevious Hash: " + str(self.previous_hash) + "\n--------------"

class Blockchain:
	block = Block("Genesis")
	dummy = head = block

	def add(self, block):
		if (self.block.blockNo ==0):
			block.previous_hash =  "Origin"
		else:
			block.previous_hash = self.block.hash
		block.blockNo = self.block.blockNo + 1

		self.block.next = block
		self.block = self.block.next

	def mine(self, block):
		global iter
		global hashin
		global hashout
		delay = 18
		cont = 1
		while(cont == 1):
			if int(iter%(2**delay) == 0):
				block.count = iter
				block.hash = hashout
				self.add(block)
				print(block)
				iter += 1
				hashin=hashout
				hashout = hashlib.sha256(hashin.encode()).hexdigest()
				cont = 0
				break
			else:
				iter += 1
				hashin=hashout
				hashout = hashlib.sha256(hashin.encode()).hexdigest()

t_initial = time.perf_counter()
blockchain = Blockchain()
b=int(input('Enter the number of Blocks for this simulation:'))

for n in range(b):
    blockchain.mine(Block(n+1))
t_final = time.perf_counter()
delta_t = t_final - t_initial
delta_unit = delta_t*1000/b
print("comutation Time per Block(ms):"+str(delta_unit))
input('Press ENTER to exit')

Enter the number of Blocks for this simulation:3
Block Number: 1
History Count: 262144
Block Data: [1297, 1642, 1372, 5138, 5301, 2188, 1998, 2150, 1914, 1862]
Block Hash: a53e39cef8be27ba38e73fe216fbfc2efc63bca056fa2a6a18380e9d93c98ea3
Previous Hash: Origin
————–
Block Number: 2
History Count: 524288
Block Data: [1211, 1633, 1307, 4757, 5133, 2206, 2032, 1891, 2257, 2139]
Block Hash: 79561ebd2627b432d1e619dee9db7ac85593a4357925827754b1faefd42c1b72
Previous Hash: a53e39cef8be27ba38e73fe216fbfc2efc63bca056fa2a6a18380e9d93c98ea3
————–
Block Number: 3
History Count: 786432
Block Data: [1459, 1682, 1131, 5339, 4983, 2057, 1948, 2192, 2017, 2076]
Block Hash: d33e10fa10273b5d64ccdad34ffcbaae7673cb785807c49f199b204a148e6cd9
Previous Hash: 79561ebd2627b432d1e619dee9db7ac85593a4357925827754b1faefd42c1b72
————–
comutation Time per Block(ms):795.6611973543962
Press ENTER to exit

【Python】current pathを取得する

abspathがファイルの絶対パス。

import os

current_path = os.path.abspath(__file__)
exec_path = current_path.replace('/python/post.py', '')
path = "{}/config/hoge.txt".format(exec_path)
print(path)

なるほど、パスの取得は色々あって便利やね。

【Python】jsonデータを1行ずつ読み取ってjsonの配列にする

json.dumps した後に、配列(json_list=[])に入れると、配列自体はjsonではないので、うまくいかない。
配列に入れてからjson.dumpsすると、jsonデータとして扱われる。

import json
import requests


json_list = []
with open('./data/names.txt') as f:
    for line in f:
        json_open = json.loads(line)
        json_list.append(json_open)
        
json_data = json.dumps(json_list)
print(json_data)

response = requests.post(
    "http://httpbin.org/post", 
    data=json_data,
    headers={'Content-type': 'application/json'}
)
print(response.status_code)
print(response.text)

これ解決するのに半日以上かかりましたorz…

【Python】シャミアの秘密計算

import random
from typing import List, Tuple

def generate_coefficients(secret: int, threshold: int) -> List[int]:
    coefficients = [secret]
    for _ in range(threshold - 1):
        coefficients.append(random.randint(1, 256))
    return coefficients

def create_shares(
    secret: int, total_shares: int, threshold: int
) -> List[Tuple[int, int]]: 
    coefficients = generate_coefficients(secret, threshold)
    shares = []
    for x in range(1, total_shares + 1):
        y = sum(coeff * (x**exp) for exp, coeff in enumerate(coefficients))
        shares.append((x, y))
    return shares

def reconstruct_secret(shares: List[Tuple[int, int]], threshold: int) -> int:
    def _lagrange_interpolation(x: int, x_s: List[int], y_s: List[int]) -> int:
        def _basis(j: int) -> int:
            num = 1
            den = 1
            for m in range(len(x_s)):
                if m != j:
                    num *= x - x_s[m]
                    den *= x_s[j] - x_s[m]
            return num // den

        result = 0
        for j in range(len(y_s)):
            result += y_s[j] * _basis(j)
        return result

    x_s, y_s = zip(*shares)
    return _lagrange_interpolation(0, x_s, y_s)

if __name__ == "__main__":
    secret = 2732
    total_shares = 5
    threshold = 2

    shares = create_shares(secret, total_shares, threshold)
    print("shares:", shares)

    selected_shares = shares[:threshold]
    print(zip(*selected_shares))
    x_s, y_s = zip(*selected_shares)
    print(x_s, y_s)
    recovered_secret = reconstruct_secret(selected_shares, threshold)
    print("Recovered Secret:", recovered_secret)

$ python3 test.py
shares: [(1, 2961), (2, 3190), (3, 3419), (4, 3648), (5, 3877)]

(1, 2) (2961, 3190)
Recovered Secret: 2732

【Python】error: externally-managed-environment

$ pip3 install psycopg2
error: externally-managed-environment

× This environment is externally managed
╰─> To install Python packages system-wide, try apt install
python3-xyz, where xyz is the package you are trying to
install.

プロジェクトごとに仮想環境を作って、そこでインストールすることが推奨されている。
$ python3 -m venv venv
$ source venv/bin/activate
$ python3 -V
$ pip3 install psycopg2 
$ pip3 install python-dotenv
$ cd assets
$ python3 migration.py
$ deactivate

### 仮想環境を作らない方法
$ pip3 install psycopg2 –break-system-packages
これでもいける。

【Python】psycopg2で複数のsql文を同時に実行する

psycopg2 の execute は、commitの前に連続して実行できる。そのため、以下のように、sql文をforループで回すことができる。

import psycopg2
import os
from dotenv import load_dotenv

load_dotenv('../.env')
PSQL_PASSWORD = os.getenv('PSQL_PASSWORD')

connection = psycopg2.connect(
    host='localhost',
    user='postgres',
    password=PSQL_PASSWORD
)
cur = connection.cursor()

sql1 = "CREATE TABLE IF NOT EXISTS person1 ( \
            id SERIAL PRIMARY KEY, \
            name TEXT NOT NULL, \
            data BYTEA \
        );"
sql2 = "CREATE TABLE IF NOT EXISTS person2 ( \
            id SERIAL PRIMARY KEY, \
            name TEXT NOT NULL, \
            data BYTEA \
        );"
sqls = [sql1, sql2]

for sql in sqls:
    cur.execute(sql)

connection.commit()
cur.close()
connection.close()

$ python3 migration.py

# \dt
List of relations
Schema | Name | Type | Owner
——–+———-+——-+———-
public | person1 | table | postgres
public | person2 | table | postgres

なるほどー

【Python】ブルームフィルタ

bloom filterの全体像。ハッシュ化した値をインデックス番号にして、bloom_filterの値を更新する。
bloom filterにハッシュ値が入っているかどうか確認することで、bloom filterに登録されているかを判定する。

bloom_filter = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

def hash_a(val):
    return hash_val

def hash_b(val):
    return hash_val


val = "hello world"

a_hashed = hash_a(val) # 1
b_hashed = hash_b(val) # 4

bloom_filter[a_hashed] = 1
bloom_filter[b_hashed] = 1

[0, 1, 0, 0, 4, 0, 0, 0, 0, 0]

bloom_filter[a_hashed]
bloom_filter[b_hashed]
import functools

class BloomFilter:
    def __init__(self, filter_size):
        self.filter_size = filter_size
        self.bloom_filter = [0 for _ in range(filter_size)]

    def set_v(self, val):
        indexes = self.n_hash(val)
        for index in indexes:
            self.bloom_filter[index] = 1

    def n_hash(self, val):
        hashed = abs(hash(val))
        d_lst = [int(n) for n in str(hashed)]
        return [
            self._hash_common(lambda acc, d: acc + d, d_lst),
            self._hash_common(lambda acc, d: acc + 3 * d, d_lst),
        ]

    def _hash_common(self, func, d_lst):
        execed = abs(functools.reduce(func, d_lst, 0))
        while execed >= self.filter_size:
            execed = execed / self.filter_size
        return int(execed)

    def exist_v(self, val):
        indexes = self.n_hash(val)
        for index in indexes:
            if self.bloom_filter[index] == 0:
                return False
            return True

bf = BloomFilter(10)
print(bf.bloom_filter)
bf.set_v(3)
print(bf.bloom_filter)
print(bf.exist_v(3))
print(bf.exist_v(10))

$ python3 bloom_filter.py
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 1, 0, 0, 0, 0, 0, 1]
True
False

BTCではSPVがScriptPubKeyをbloomfilterにセットして、リモートノードがマッチングアルゴリズムで判定する。
-> リモートノード側では、トランザクションごとに、ScriptPubKey, Outpointがマッチするかを判定して、マッチする場合はブルームフィルタを更新している。

なるほど、直接pubkeyのデータをやり取りしない分、安全性が向上するということね。これをRustでやりたい。

文字列を数値に変換するには、文字列を数値として持っておいて、それを変換でしょうか。

static ASCII_LOWER: [char; 26] = [
    'a', 'b', 'c', 'd', 'e', 
    'f', 'g', 'h', 'i', 'j', 
    'k', 'l', 'm', 'n', 'o',
    'p', 'q', 'r', 's', 't', 
    'u', 'v', 'w', 'x', 'y', 
    'z',
];

【Python】HD walletの親鍵/子鍵とchaincodeの概要

子の秘密鍵は、親のchaincdeと[親の公開鍵+index]をhmac_sha512でハッシュ化して作成している。

### マスター秘密鍵、公開鍵

import os
import binascii
import hmac
import hashlib
import ecdsa

seed = os.urandom(32)
root_key = b"Bitcoin seed"

def hmac_sha512(data, key_message):
    hash = hmac.new(data, key_message, hashlib.sha512).digest()
    return hash

def create_pubkey(private_key):
    publickey = ecdsa.SigningKey.from_string(private_key, curve=ecdsa.SECP256k1).verifying_key.to_string()
    return publickey

master = hmac_sha512(seed, root_key)

master_secretkey = master[:32]
master_chaincode = master[32:]

master_publickey = create_pubkey(master_secretkey)
master_publickey_integer = int.from_bytes(master_publickey[32:], byteorder="big")

if master_publickey_integer %2 == 0:
    master_publickey_x = b"\x02" + master_publickey[:32]
else:
    master_publickey_x = b"\x03" + master_publickey[:32]

print(binascii.hexlify(master_secretkey))
print(binascii.hexlify(master_chaincode))
print(binascii.hexlify(master_publickey_x))

$ python3 master_key.py
b’8a6dbaaff700682778dcbae2bc8718452fe5ed80fc9026a9b564420f8d5b0d80′
b’4ce8b10cc0c0874467d8f438c412fdbf21fba51517e668dbc4bd105af6861dec’
b’03cb15210804ca8f0d45b620832be935e2f90c3830f13f04c4bd6e8b4648f27817′
(secretkey, chaincode, pubkey)

### 子秘密鍵、子公開鍵

index = 0
index_bytes = index.to_bytes(8, "big")

data = master_publickey_x + index_bytes

result_hmac512 = hmac_sha512(data, master_chaincode)
sum_integer = int.from_bytes(master_secretkey,"big") + int.from_bytes(result_hmac512[:32],"big")

p = 2 ** 256 - 2**32 - 2**9 - 2**8 - 2**7 - 2**6 - 2**4 - 1
child_secretkey = (sum_integer % p).to_bytes(32,"big")

child_chaincode = result_hmac512[32:]

child_publickey = create_pubkey(child_secretkey)
child_publickey_integer = int.from_bytes(child_publickey[32:], byteorder="big")

if child_publickey_integer %2 == 0:
    child_publickey_x = b"\x02" + child_publickey[:32]
else:
    child_publickey_x = b"\x03" + child_publickey[:32]

print(binascii.hexlify(child_secretkey))
print(binascii.hexlify(child_chaincode))
print(binascii.hexlify(child_publickey_x))

b’5ff011a3e9cd672aaf0dc9fd52cb3172ac2815cb270f919135e6b0f0e6e03d54′
b’15f4d148b2d7730076d5e670249649ea8f0fd8572dad3818680e347196149dda’
b’03480a1dbb4a87d867bee3d364b608e21d685af271876707b9f9d5b75c6df6fde7′

b’23faf6fad81cd93e12c003c944ba3ef215dae714c638756386b6b9404da5aac9′
b’e3563dc6891e238cd0d8ebf99e65ebfc67cecf42364de9756b89859bbd049b62′
b’02039253af3e828bfbf1e560fe0e923a144fc4496ded3b6bbfa0d568cf7177d1c3′

なるほど、一見複雑そうに見えるが、なかなか面白いね