RDS

テーブルとカラムを表現するには、hashmap< Vector > となるだろうか?

class Table:
    def __init__(self, name, columns):
        self.name = name
        self.columns = columns
        self.rows = []

    def insert(self, values):
        if len(values) != len(self.columns):
            raise ValueError("列の数と値の数が一致しません。")
        row = dict(zip(self.columns, values))
        self.rows.append(row)

    def select_all(self):
        return self.rows

    def __str__(self):
        lines = [" | ".join(self.columns)]
        lines.append("-" * len(lines[0]))
        for row in self.rows:
            lines.append(" | ".join(str(row[col]) for col in self.columns))
        return "\n".join(lines)

class Database:
    def __init__(self):
        self.tables = {}

    def create_table(self, name, columns):
        if name in self.tables:
            raise ValueError(f"テーブル '{name}' は既に存在します。")
        self.tables[name] = Table(name, columns)

    def insert_into(self, table_name, values):
        self.tables[table_name].insert(values)

    def select_all(self, table_name):
        return self.tables[table_name].select_all()

    def show_table(self, table_name):
        print(self.tables[table_name])

if __name__ == "__main__":
    db = Database()
    db.create_table("users", ["id", "name", "email"])
    db.insert_into("users", [1, "Alice", "alice@example.com"])
    db.insert_into("users", [2, "Bob", "bob@example.com"])

    print("📄 users テーブル内容:")
    db.show_table("users")

    print("\n📋 SELECT * FROM users:")
    for row in db.select_all("users"):
        print(row)

$ python3 app.py
📄 users テーブル内容:
id | name | email
—————–
1 | Alice | alice@example.com
2 | Bob | bob@example.com

📋 SELECT * FROM users:
{‘id’: 1, ‘name’: ‘Alice’, ’email’: ‘alice@example.com’}
{‘id’: 2, ‘name’: ‘Bob’, ’email’: ‘bob@example.com’}

【Python】NoSQL【Rust】

import json
import os
from typing import List, Dict, Any

class SimpleNoSQL:
    def __init__(self, db_path: str= "db.json"):
        self.db_path = db_path
        if os.path.exists(self.db_path):
            with open(self.db_path, "r") as f:
                self.data = json.load(f)
        else:
            self.data = []

    def save(self):
        with open(self.db_path, "w") as f:
            json.dump(self.data, f, indent=2)

    def insert(self, document: Dict[str, Any]):
        self.data.append(document)
        self.save()

    def find(self, query: Dict[str, Any]) -> List[Dict[str, Any]]:
        def match(doc):
            return all(doc.get(k) == v for k, v in query.items())
        return [doc for doc in self.data if match(doc)]

    def update(self, query: Dict [str, Any], update_fields: Dict[str, Any]):
        for doc in self.data:
            if all(doc.get(k) == v for k, v in query.items()):
                 doc.update(update_fields)
        self.save()

    def delete(self, query: Dict[str, Any]):
        self.data = [doc for doc in self.data if not all(doc.get(k) == v for k, v in query.items())]
        self.save

db = SimpleNoSQL()

db.insert({
    "session_id": "abc123",
    "code": "fn main() { println!(\"Hello\"); }",
    "output": "Hello",
    "status": "success"
})

results = db.find({"session_id": "abc123"})
print("検索結果:", results)

db.update({"session_id": "abc123"}, {"status": "reviewed"})

[
{
“session_id”: “abc123”,
“code”: “fn main() { println!(\”Hello\”); }”,
“output”: “Hello”,
“status”: “reviewed”
}
]

これをRustでやりたい

### Rustでのjsonの保存

use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use serde::{Serialize, Deserialize};

fn json_w() -> std::io::Result<()> {
    let mut map = HashMap::new();
    map.insert("name", "Alice");
    map.insert("city", "Tokyo");
    map.insert("language", "Rust");

    let json = serde_json::to_string_pretty(&map).expect("Failed to serialize");
    let mut file = File::create("output.json")?;
    file.write_all(json.as_bytes())?;
    
    println!("{:?}", json);
    Ok(())
}

fn main() {
    let _ = json_w();
}

### jsonの読み取り

fn json_r() {
    let data = fs::read_to_string("output.json").expect("Failed to read file");
    let map: HashMap<String, String> = serde_json::from_str(&data).expect("Failed to deserialize");

    println!("{:#?}", map);    
}

{
“name”: “Alice”,
“language”: “Rust”,
“city”: “Tokyo”,
}

### 改修
レコードごとに保存する必要があるので HashMap から Vec> に変更

fn json_w() -> std::io::Result<()> {
    let mut data: Vec<HashMap<String, String>> = match File::open("output.json") {
        Ok(mut file) => {
            let mut contents = String::new();
            file.read_to_string(&mut contents)?;
            serde_json::from_str(&contents).unwrap_or_else(|_| Vec::new())
        },
        Err(_) => Vec::new(),
    };

    let mut map = HashMap::new();
    // map.insert("name".to_string(), "Alice".to_string());
    // map.insert("city".to_string(), "Tokyo".to_string());
    // map.insert("language".to_string(), "Rust".to_string());

    map.insert("name".to_string(), "Bob".to_string());
    map.insert("city".to_string(), "Osaka".to_string());
    map.insert("language".to_string(), "Go".to_string());

    data.push(map);

    let json = serde_json::to_string_pretty(&data).expect("Failed to serialize");
    let mut file = File::create("output.json")?;
    file.write_all(json.as_bytes())?;

    println!("{:?}", json);
    Ok(())
}

fn json_r() {
    let data = fs::read_to_string("output.json").expect("Failed to read file");
    let v: Vec<HashMap<String, String>> = serde_json::from_str(&data).expect("Failed to deserialize");

    println!("{:#?}", v);    
}

fn main() {
    // 
    // let _ = json_w();
    let _ = json_r();
}

[
{
“language”: “Rust”,
“name”: “Alice”,
“city”: “Tokyo”,
},
{
“city”: “Osaka”,
“language”: “Go”,
“name”: “Bob”,
},
]

### implで表現する
関数をinsert, selectとする

use std::collections::HashMap;
use std::fs;
use std::fs::{File};
use std::io::{Read, Write};
use serde::{Serialize, Deserialize};

pub static FILE_NAME : &'static str = "output.json";

struct NoSQL {
    data: Vec<HashMap<String, String>>,
}

impl NoSQL {
    fn insert(&mut self, map: HashMap<String, String>) -> std::io::Result<()> {
        self.data = match File::open(FILE_NAME) {
            Ok(mut file) => {
                let mut contents = String::new();
                file.read_to_string(&mut contents)?;
                serde_json::from_str(&contents).unwrap_or_else(|_| Vec::new())
            },
            Err(_) => Vec::new(),
        };        
        self.data.push(map);

        let json = serde_json::to_string_pretty(&self.data).expect("Failed to serialize");
        let mut file = File::create(FILE_NAME)?;
        file.write_all(json.as_bytes())?;

        println!("{:?}", json);
        Ok(())
    }

    fn select(&self) {
        let data = fs::read_to_string(FILE_NAME).expect("Failed to read file");
        let v: Vec<HashMap<String, String>> = serde_json::from_str(&data).expect("Failed to deserialize");
    
        println!("{:#?}", v);    
    }
}

fn main() {
    let mut sql = NoSQL{ data: Vec::new() };

    let mut map = HashMap::new();
    map.insert("name".to_string(), "Bob".to_string());
    map.insert("city".to_string(), "Osaka".to_string());
    map.insert("language".to_string(), "Go".to_string());

    sql.insert(map);
    sql.select();
}

こうすると、nosqlは単純にVec>,と言える。
インメモリデータベースの場合は、メモリに保存するので、jsonではなく、単純にデータとして持っておくだけになる。その場合、サイズが動的なので、スタックではなく、ヒープに保存される。

【Python】editor

import curses

def main(stdscr):
    curses.curs_set(1)
    stdscr.clear()
    stdscr.refresh()

    text = []
    row = 0

    while True:
        stdscr.move(row, 0)
        stdscr.clrtoeol()
        stdscr.addstr(row, 0, ''.join(text))
        stdscr.refresh()
        ch = stdscr.getch()

        if ch == 27:  # ESCキーで終了
            break
        elif ch == 10:  # Enter
            text.append('\n')
            row += 1
        elif ch == 127 or ch == curses.KEY_BACKSPACE:
            if text:
                text.pop()
                if text[-1] == '\n':
                    row -= 1
        else:
            text.append(chr(ch))

    with open("output.txt", "w") as f:
        f.write(''.join(text))
    

curses.wrapper(main)

Rustで書くと

use crossterm::{
    cursor,
    event::{read, Event, KeyCode},
    terminal::{enable_raw_mode, disable_raw_mode, ClearType},
    ExecutableCommand,
};
use std::io::{stdout, Write};
use std::fs::File;

fn main() -> std::io::Result<()> {
    let mut stdout = stdout();
    let mut buffer = String::new();

    enable_raw_mode()?;
    stdout.execute(crossterm::terminal::Clear(ClearType::All))?;
    stdout.execute(cursor::MoveTo(0, 0))?;

    loop {
        match read()? {
            Event::Key(event) => match event.code {
                KeyCode::Char(c) => {
                    buffer.push(c);
                    print!("{}", c);
                    stdout.flush()?;
                }
                KeyCode::Enter => {
                    buffer.push('\n');
                    print!("\r\n");
                    stdout.flush()?;
                }
                KeyCode::Backspace => {
                    buffer.pop();
                    stdout.execute(cursor::MoveLeft(1))?;
                    print!(" ");
                    stdout.execute(cursor::MoveLeft(1))?;
                    stdout.flush()?;
                }
                KeyCode::Esc => {
                    break;
                }
                _ => {}
            },
            _ => {}
        }
    }

    disable_raw_mode()?;

    let mut file = File::create("output.txt")?;
    write!(file, "{}", buffer)?;

    Ok(())
}

inputファイルを読み込んで、出力

use crossterm::{
    cursor,
    event::{read, Event, KeyCode},
    terminal::{enable_raw_mode, disable_raw_mode, ClearType},
    ExecutableCommand,
};
use std::io::{stdout, Write};
use std::fs::{File, read_to_string};

fn main() -> std::io::Result<()> {
    let mut stdout = stdout();
    let mut buffer = String::new();

    let initial_text = match read_to_string("input.txt") {
        Ok(content) => content,
        Err(_) => String::new(),
    };
    buffer.push_str(&initial_text);

    enable_raw_mode().expect("Failed to enable raw mode");
    stdout.execute(crossterm::terminal::Clear(ClearType::All))?;
    stdout.execute(cursor::MoveTo(0, 0))?;

    let mut row: u16 = 0;
    let mut col: u16 = 0;
    for ch in buffer.chars() {
        match ch {
            '\n' => {
                print!("\r\n");
                row += 1;
                col = 0;
            }
            _ => {
                print!("{}", ch);
                col += 1;
            }
        }
    }
    stdout.flush()?;

    stdout.execute(cursor::MoveTo(col, row))?;

    loop {
        match read()? {
            Event::Key(event) => match event.code {
                KeyCode::Char(c) => {
                    buffer.push(c);
                    print!("{}", c);
                    stdout.flush()?;
                }
                KeyCode::Enter => {
                    buffer.push('\n');
                    row += 1;
                    print!("\r\n");
                    stdout.flush()?;
                }
                KeyCode::Backspace => {
                    buffer.pop();
                    stdout.execute(cursor::MoveLeft(1))?;
                    print!(" ");
                    stdout.execute(cursor::MoveLeft(1))?;
                    stdout.flush()?;
                }
                KeyCode::Esc => {
                    break;
                }
                _ => {}
            },
            _ => {}
        }
    }

    disable_raw_mode()?;

    let mut file = File::create("output.txt")?;
    write!(file, "{}", buffer)?;

    Ok(())
}

input.txtを行単位で読み込んで出力するなどの工夫が必要

サッカーゲーム

import random

def player_attack():
    print("攻撃!シュートする方向を選んでください:")
    print("1: 左\n2: 中央\n3: 右")
    try:
        choice = int(input("あなたの選択: "))
        if choice not in [1, 2, 3]:
            raise ValueError
    except ValueError:
        print("無効な入力です。中央にします。")
        choice = 2
    return choice

def cpu_defend():
    return random.randint(1, 3)

def cpu_attack():
    return random.randint(1, 3)

def player_defend():
    print("守備!相手のシュートを読む方向を選択してください:")
    print("1: 左\n2: 中央\n3: 右")
    try:
        choice = int(input("あなたの選択: "))
        if choice not in [1, 2, 3]:
            raise ValueError
    except ValueError:
        print("無効な入力です。中央にします。")
        choice = 2
    return choice

def game():
    player_score = 0
    cpu_score = 0

    print("== サッカー対決: 3ターンマッチ ==")
    for turn in range(1, 4):
        print(f"\n--- 第{turn}ターン ---")

        attack_dir = player_attack()
        defend_dir = cpu_defend()
        if attack_dir != defend_dir:
            print("ゴール!! 🎉")
            player_score += 1
        else:
            print("セーブされた!")

        attack_dir = cpu_attack()
        defend_dir = player_defend()
        if attack_dir != defend_dir:
            print("CPUがゴール!! 😱")
            cpu_score += 1
        else:
            print("ナイスセーブ!! 🎉")

    print(f"\n== 試合終了==\n あなた: {player_score}点\n CPU:{cpu_score}点")
    if player_score > cpu_score:
        print("勝利")
    elif player_score < cpu_score:
        print("敗北")
    else:
        print("引き分け")

if __name__ == "__main__":
    game()

$ python3 football.py
== サッカー対決: 3ターンマッチ ==

— 第1ターン —
攻撃!シュートする方向を選んでください:
1: 左
2: 中央
3: 右
あなたの選択: 1
ゴール!! 🎉
守備!相手のシュートを読む方向を選択してください:
1: 左
2: 中央
3: 右
あなたの選択: 2
CPUがゴール!! 😱

— 第2ターン —
攻撃!シュートする方向を選んでください:
1: 左
2: 中央
3: 右
あなたの選択: 1
ゴール!! 🎉
守備!相手のシュートを読む方向を選択してください:
1: 左
2: 中央
3: 右
あなたの選択: 3
ナイスセーブ!! 🎉

— 第3ターン —
攻撃!シュートする方向を選んでください:
1: 左
2: 中央
3: 右
あなたの選択: 2
セーブされた!
守備!相手のシュートを読む方向を選択してください:
1: 左
2: 中央
3: 右
あなたの選択: 1
ナイスセーブ!! 🎉

== 試合終了==
あなた: 2点
CPU:1点
勝利

【Python】ゼロ知識証明

ゼロ知識証明とは、証明者がその事実を提示するだけで、その事実の内容を隠すことができる証明方法
Zero-knowledge Proofとしてよく使われるのが、ハミルトングラフの知識を持っていることや秘密のパスワードを証明せず知っていることを示す。

xを知っていおり、 y = x^2 mod p を示す。

import random

p = 23
x = 5
y = pow(x, 2, p)

print(" y =", y)

def prover_step():
    r = random.randint(1, p - 1)
    a = pow(r, 2, p)
    return r, a

def verifier_challenge():
    return random.randint(0, 1)

def prover_response(r, c):
    if c == 0:
        return r
    else:
        return (r * x) % p

def verifier_check(a, z, c):
    if c == 0:
        return pow(z, 2, p) == a
    else:
        return pow(z, 2, p) == (a * y) % p

r, a = prover_step()
print("Prover: 提出 a=", a)

c = verifier_challenge()
print("Verifier: チャレンジ c=", c)

z = prover_response(r, c)
print("Prover: 応答 z =", z)

ok = verifier_check(a, z, c)
print("Verifier: 検証結果 =", ok)

$ python3 zkp.py
y = 2
Prover: 提出 a= 4
Verifier: チャレンジ c= 1
Prover: 応答 z = 10
Verifier: 検証結果 = True

—– 実際の計算
r = ?,
a = 4,
c = 1
z = 10
10 * 10 % 23 = 5 * 2 / 23

x の値を知らせずに、y = x^2 mod p を証明している。証明の際には、c != 0 の時にのみyの値を利用する
zkpの概念は理解できたが、なぜこれが証明になるのかのロジックはよくわからない..

【Solana】Proof of HistoryをPythonで書く

前のブロックのiter,hashoutをベースに iter%(2**delay) == 0 になるまで、ブロックごとに262144回sha256を計算して、hash値を求める。hashoutが決まっているので、先回りしてブロックを作れそうだが、SolanaはPoSのため、不正がバレるとstakingが没収されてしまう。なるほどねー

import datetime
import hashlib
import time
import os
import pathlib
import random
import string
from random import randint

iter=1
hashin = ''.join(random.choices(string.ascii_uppercase + string.digits, k = 10))
hashout = hashlib.sha256(hashin.encode()).hexdigest()

print(hashin)
print(hashout)

def datastream():
    v1 = int(randint(1200, 1500))
    v2 = int(randint(1300, 1700))
    v3 = int(randint(1100, 1500))
    v4 = int(randint(4000, 5600))
    v5 = int(randint(4000, 5600))
    v6 = int(randint(1900, 2400))
    v7 = int(randint(1920, 2300))
    v8 = int(randint(1850, 2200))
    v9 = int(randint(1900, 2300))
    v10 = int(randint(1800, 2200))
    return [v1, v2, v3, v4, v5, v6, v7, v8, v9, v10];

class Block:
    blockNo = 0
    count = iter
    data = None
    next = None
    hash = "None"
    previous_hash = "None"
    timestamp = datetime.datetime.now()

    def __init__(self, data):
        self.data = datastream()

    def hash(self):
        file = pathlib.Path("TToken")
        if file.exists():
            tier=open("TToken").readline().rstrip()
        else:
            with open("benchmark.py") as infile:
                exec(infile.read())
            tier=open("TToken").readline().rstrip()

        if tier=="T1":
            h = hashlib.md5()
        elif tier=="T2":
            h = hashlib.sha1()
        elif tier=="T3":
            h = hashlib.blake2s()
        elif tier=="T4":
            h = hashlib.sha3_256()

        h.update(
		str(self.nonce).encode('utf-8') +
		str(self.data).encode('utf-8') +
		str(self.previous_hash).encode('utf-8') +
		str(self.timestamp).encode('utf-8') +
		str(self.blockNo).encode('utf-8')
		)
        return h.hexdigest()
    
        block.blockNo = self.block.blockNo + 1

    def __str__(self):
        return "Block Number: " + str(self.blockNo) + "\nHistory Count: " + str(self.count) + "\nBlock Data: " + str(self.data) + "\nBlock Hash: " + str(self.hash) + "\nPrevious Hash: " + str(self.previous_hash) + "\n--------------"

class Blockchain:
	block = Block("Genesis")
	dummy = head = block

	def add(self, block):
		if (self.block.blockNo ==0):
			block.previous_hash =  "Origin"
		else:
			block.previous_hash = self.block.hash
		block.blockNo = self.block.blockNo + 1

		self.block.next = block
		self.block = self.block.next

	def mine(self, block):
		global iter
		global hashin
		global hashout
		delay = 18
		cont = 1
		while(cont == 1):
			if int(iter%(2**delay) == 0):
				block.count = iter
				block.hash = hashout
				self.add(block)
				print(block)
				iter += 1
				hashin=hashout
				hashout = hashlib.sha256(hashin.encode()).hexdigest()
				cont = 0
				break
			else:
				iter += 1
				hashin=hashout
				hashout = hashlib.sha256(hashin.encode()).hexdigest()

t_initial = time.perf_counter()
blockchain = Blockchain()
b=int(input('Enter the number of Blocks for this simulation:'))

for n in range(b):
    blockchain.mine(Block(n+1))
t_final = time.perf_counter()
delta_t = t_final - t_initial
delta_unit = delta_t*1000/b
print("comutation Time per Block(ms):"+str(delta_unit))
input('Press ENTER to exit')

Enter the number of Blocks for this simulation:3
Block Number: 1
History Count: 262144
Block Data: [1297, 1642, 1372, 5138, 5301, 2188, 1998, 2150, 1914, 1862]
Block Hash: a53e39cef8be27ba38e73fe216fbfc2efc63bca056fa2a6a18380e9d93c98ea3
Previous Hash: Origin
————–
Block Number: 2
History Count: 524288
Block Data: [1211, 1633, 1307, 4757, 5133, 2206, 2032, 1891, 2257, 2139]
Block Hash: 79561ebd2627b432d1e619dee9db7ac85593a4357925827754b1faefd42c1b72
Previous Hash: a53e39cef8be27ba38e73fe216fbfc2efc63bca056fa2a6a18380e9d93c98ea3
————–
Block Number: 3
History Count: 786432
Block Data: [1459, 1682, 1131, 5339, 4983, 2057, 1948, 2192, 2017, 2076]
Block Hash: d33e10fa10273b5d64ccdad34ffcbaae7673cb785807c49f199b204a148e6cd9
Previous Hash: 79561ebd2627b432d1e619dee9db7ac85593a4357925827754b1faefd42c1b72
————–
comutation Time per Block(ms):795.6611973543962
Press ENTER to exit

【Python】current pathを取得する

abspathがファイルの絶対パス。

import os

current_path = os.path.abspath(__file__)
exec_path = current_path.replace('/python/post.py', '')
path = "{}/config/hoge.txt".format(exec_path)
print(path)

なるほど、パスの取得は色々あって便利やね。

【Python】jsonデータを1行ずつ読み取ってjsonの配列にする

json.dumps した後に、配列(json_list=[])に入れると、配列自体はjsonではないので、うまくいかない。
配列に入れてからjson.dumpsすると、jsonデータとして扱われる。

import json
import requests


json_list = []
with open('./data/names.txt') as f:
    for line in f:
        json_open = json.loads(line)
        json_list.append(json_open)
        
json_data = json.dumps(json_list)
print(json_data)

response = requests.post(
    "http://httpbin.org/post", 
    data=json_data,
    headers={'Content-type': 'application/json'}
)
print(response.status_code)
print(response.text)

これ解決するのに半日以上かかりましたorz…

【Python】シャミアの秘密計算

import random
from typing import List, Tuple

def generate_coefficients(secret: int, threshold: int) -> List[int]:
    coefficients = [secret]
    for _ in range(threshold - 1):
        coefficients.append(random.randint(1, 256))
    return coefficients

def create_shares(
    secret: int, total_shares: int, threshold: int
) -> List[Tuple[int, int]]: 
    coefficients = generate_coefficients(secret, threshold)
    shares = []
    for x in range(1, total_shares + 1):
        y = sum(coeff * (x**exp) for exp, coeff in enumerate(coefficients))
        shares.append((x, y))
    return shares

def reconstruct_secret(shares: List[Tuple[int, int]], threshold: int) -> int:
    def _lagrange_interpolation(x: int, x_s: List[int], y_s: List[int]) -> int:
        def _basis(j: int) -> int:
            num = 1
            den = 1
            for m in range(len(x_s)):
                if m != j:
                    num *= x - x_s[m]
                    den *= x_s[j] - x_s[m]
            return num // den

        result = 0
        for j in range(len(y_s)):
            result += y_s[j] * _basis(j)
        return result

    x_s, y_s = zip(*shares)
    return _lagrange_interpolation(0, x_s, y_s)

if __name__ == "__main__":
    secret = 2732
    total_shares = 5
    threshold = 2

    shares = create_shares(secret, total_shares, threshold)
    print("shares:", shares)

    selected_shares = shares[:threshold]
    print(zip(*selected_shares))
    x_s, y_s = zip(*selected_shares)
    print(x_s, y_s)
    recovered_secret = reconstruct_secret(selected_shares, threshold)
    print("Recovered Secret:", recovered_secret)

$ python3 test.py
shares: [(1, 2961), (2, 3190), (3, 3419), (4, 3648), (5, 3877)]

(1, 2) (2961, 3190)
Recovered Secret: 2732

【Python】error: externally-managed-environment

$ pip3 install psycopg2
error: externally-managed-environment

× This environment is externally managed
╰─> To install Python packages system-wide, try apt install
python3-xyz, where xyz is the package you are trying to
install.

プロジェクトごとに仮想環境を作って、そこでインストールすることが推奨されている。
$ python3 -m venv venv
$ source venv/bin/activate
$ python3 -V
$ pip3 install psycopg2 
$ pip3 install python-dotenv
$ cd assets
$ python3 migration.py
$ deactivate

### 仮想環境を作らない方法
$ pip3 install psycopg2 –break-system-packages
これでもいける。