【Rust】Axumでpostgresのmigrationを実行したい

tokio-postgres-migrationを使用します。
https://crates.io/crates/tokio-postgres-migration

Cargo.toml

tokio-postgres-migration = "0.1.0"

./asset/0001-create-table-books-up.sql

CREATE TABLE books ( 
    id integer PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
    name varchar(10),
    body text NOT NULL
);
use tokio_postgres_migration::Migration;
//
const SCRIPTS_UP: [(&str, &str); 2] = [
    (
        "0001-create-table-books",
        include_str!(".././asset/0001-create-table-books-up.sql"),
    ),
    (
        "0002-create-table-pets",
        include_str!(".././asset/0002-create-table-pets-up.sql"),
    )
];
//

async fn handle_migration() -> impl IntoResponse {
    let _ = create_table().await;
    (StatusCode::OK, format!("Migration completed"))
}

async fn psql_connect() -> Result<Client, Box<dyn std::error::Error>>  {
    let (client, connection) = tokio_postgres::connect("host=localhost user=postgres password=hogehoge", NoTls).await?;

    tokio::spawn(async move {
        if let Err(e) = connection.await {
            eprintln!("connection error: {}", e);
        }
    });
    Ok(client)
}

async fn create_table() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = psql_connect().await.unwrap();
    let migration = Migration::new("table_to_keep_migrations".to_string());
    migration.up(&mut client, &SCRIPTS_UP).await?;
    Ok(())
}

これだと、booksとpetsは作られるんだけど、Migration::new(“${table_name}”)のtable_nameもselfで作られてしまうな…
https://github.com/jdrouet/tokio-postgres-migration/blob/main/src/lib.rs

migrationがどういうものかは理解した。

【Rust】ワークスペースによる開発

引き続き「RustによるWebアプリケーション開発(豊田優貴著)」を参考に、ワークスペースによる開発を学んでいきます。
https://github.com/rust-web-app-book/rusty-book-manager

$ tree
.
├── Cargo.lock
├── Cargo.toml
├── compose.yaml
├── Dockerfile
├── makefile.toml
├── src
│ └── main.rs
└── target

sr/bin/app.rs にmain関数を記載する。main.rsは削除

### ワークスペースメンバーの作成
$ cargo new –lib api
$ cargo new –lib kernel
$ cargo new –lib adapter
$ cargo new –lib shared
$ cargo new –lib registry

### ワークスペースのメンバーを定義
Cargo.toml

[[bin]]
name = "app"
path = "src/bin/app.rs"

[workspace]
members = ["api", "kernel", "adapter", "shared", "registry"]

[workspace.dependencies]
adapter = { path = "./adapter" }
api = { path = "./api" }
kernel = { path = "./kernel" }
shared = { path = "./shared" }
registry = { path = "./registry" }
anyhow = "1.0.75"
axum = {version = "0.7.5", features = ["macros"]}
tokio = { version = "1.37.0", features = ["full"]}

[dependencies]
adapter.workspace = true
api.workspace = true
registry.workspace = true
shared.workspace = true
anyhow.workspace = true
tokio.workspace = true
axum.workspace = true

[dev-dependencies]
rstest = "0.18.2"

なるほど、なかなか複雑やわ ワークスペースの開発方法はもうちょっと勉強する必要がある。。

【Rust】tokio_postgresの接続を外出しにする

「RustによるWebアプリケーション開発(豊田優貴著)」を読むと、axumのルーティングでwith_state(conn_pool)で接続情報を持たせてますね。まぁそういう方法もありかと思いますが、今回は、関数で呼び出す方法を採用することにした。
参考のソースコード:
https://github.com/kingluo/tokio-postgres-hello-world/blob/master/src/main.rs

async fn psql_connect() -> Result<Client, Box<dyn std::error::Error>>  {
    let (client, connection) = tokio_postgres::connect("host=localhost user=postgres password=hogehoge", NoTls).await?;

    tokio::spawn(async move {
        if let Err(e) = connection.await {
            eprintln!("connection error: {}", e);
        }
    });
    Ok(client)
}

async fn check_users()-> Result<(), Box<dyn std::error::Error>>  {
    
    let client = psql_connect().await.unwrap();
    for row in client.query("SELECT id, username, password From users", &[]).await? {
        let id: i32 = row.get(0);
        let name: String = row.get(1);
        let password: String = row.get(2);

        println!("{} {} {}", id, name, password);
    }
    Ok(())
}

【Rust】axumでstatusコードを返却する

### StatusCodeの種類
StatusCode::OK
StatusCode::NOT_FOUND
StatusCode::BAD_REQUEST
StatusCode::INTERNAL_SERVER_ERROR
StatusCode::CREATED

### レスポンスにStatusCodeを返却する

use axum::{
    response::{IntoResponse, Response},
    http::{StatusCode},
};
//

async fn handle_health() -> impl IntoResponse {
    (StatusCode::NOT_FOUND, format!("Status Code Test: Not Found"))
}

$ curl -X GET 192.168.33.10:3000/healtcurl -X GET 192.168.33.10:3000/health
Status Code Test: Not Found

$ curl -i 192.168.33.10:3000/health
HTTP/1.1 404 Not Found
content-type: text/plain; charset=utf-8
content-length: 27
date: Tue, 25 Feb 2025 07:33:11 GMT

なるほど、エラー時にStatusCodeとメッセージを返却したいですね。

【Rust】関数内でmutexの2回目のlockとdrop(binding)の使い方

std::sync::Mutexは、同じスレッド内で2回ロックしようとするとデッドロックしてしまう。
以下のようにdropして2回ロックする方法もあるが、

let mut binding = VECT.lock().unwrap();
    let objs = binding.deref_mut();

    let mut flag = 0;
    for v in objs {
        // 略
    }

    drop(binding);

そうすると、dropした後にasyc/awitを使おうとすると、axumではルーティングでエラーになってしまう。
.route(“/”, get(handle_index))
| — ^^^^^^^^^^^^ the trait `Handler<_, _>` is not implemented for fn item `fn(ConnectInfo) -> impl Future> {handle_index}`

そのため、ロックする場合は、関数でロックして処理するようにして、一つの関数の中で2回ロックしないようにする。
別の関数で呼び出す場合は、drop(binding);とする必要はない。

fn check(name: Name) -> bool {
    let mut binding = VECT.lock().unwrap();
    let objs = binding.deref_mut();

    let mut flag = 0;
    for v in objs {
        if name.family == v.family {
            if name.first == v.first {
                if name.age == v.age {
                    return false;
                }
            }
        }
    }
    return true
}

なるほど〜、なんでずっとエラーになっているか理由がわかった!

【Rust】Peer情報をnode ip listに書き込む

bitcoinのIPアドレスリストは、ipアドレスとUnix timeが対になっている
(参考↓)
https://qiita.com/onokatio/items/04f23b7300dec7e287cb
その他にも、ipv4/ipv6や、ping, lastsend, lastrec, id, versionなどの情報も含まれる。

handshake後にpeers.datのファイルに書き込むようにして、ping, pongの箇所は、enumでactive/inactiveのstatusで表現することにした。

use serde::{Serialize, Deserialize};
use std::{io::Write};
use chrono::{Utc,DateTime};
use std::fs::OpenOptions;

#[derive(Serialize, Deserialize, Clone, Debug)]
enum Status {
    ACTIVE,
    INACTIVE,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
struct Peer {
    ip: String,
    unixtime: i64,
    nodetype: String,
    version: String,
    status: Status,
}

fn main(){
    let utc_datetime: DateTime<Utc> = Utc::now();
    println!("{}", utc_datetime.timestamp());

    let peer = Peer {ip: "192.168.33.10".to_string(), unixtime: utc_datetime.timestamp(), nodetype: "c".to_string(), version: "1.0.1".to_string(), status: Status::ACTIVE};
    println!("{:?}", peer);
    let serialized: Vec<u8> = serde_json::to_vec(&peer).unwrap();
    let mut file_ref = OpenOptions::new()
                        .append(true)
                        .open("./data/peers.dat")
                        .expect("Unable to open file");
    file_ref.write_all(&serialized).expect("write failed");
    file_ref.write_all(b"\n").expect("write failed");
}

Running `target/debug/sample`
1740366182
Peer { ip: “192.168.33.10”, unixtime: 1740366182, nodetype: “c”, version: “1.0.1”, status: ACTIVE }

nodeipリストをどうゆう風にするか、ファイルを分割するか、データ型などずっと悩んでいたけど、他のチェーンの設計は参考になりますね。

ちなみに取り出す際↓

fn get_peers() -> Result<Vec<Peer>, Box<dyn std::error::Error>>  {

    let mut peers: Vec<Peer> = Vec::new();
    for result in BufReader::new(File::open("./data/peers.dat")?).lines() {
        let line: Peer = serde_json::from_str(&result?).unwrap();
        peers.push(line);
    }
    Ok(peers)
}

【Rust】reqwestでVPSのaxumにPOSTできるかテストする

reqwestでipを指定して送る場合は、client.post(“***.**.***.**:3000/post”) だと送れないが、client.post(“http://***.**.***.**:3000/post”) だと問題なく送ることができる。

送る側(ローカル開発環境)

async fn test()-> Result<(), Box<dyn std::error::Error>> {
    let n = Name { family: "tanaka".to_string(), first:"taro".to_string(), age: 20 };
    let json = json!(n);
    println!("{:?}", &json);
    let client = reqwest::Client::new();
    let resp = client.post("http://***.**.***.**:3000/post")
        .json(&json)
        .send()
        .await?;
    let body = resp.text().await?;   
    println!("{}", body);
    Ok(())
}

受け取る側(VPS: ***.**.***.**)

async fn main() {

    let app = Router::new()
        .route("/", get(handle_index))
        .route("/post", post(handle_post))
        .route("/test", get(handle_test));

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

async fn handle_post(extract::Json(name): extract::Json<Name>) ->impl IntoResponse{
    println!("{:?}", name);
    return "All OK".into_response();
}

受け取る側
$ cargo run
Compiling axum v0.1.0 (/home/ubuntu/test/rust_test)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 12.90s
Running `target/debug/axum`
Name { family: “sato”, first: “taro”, age: 50 }
Name { family: “tanaka”, first: “taro”, age: 20 }

送る側
Compiling axum v0.1.0 (/home/vagrant/dev/rust/axum)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 12.59s
Running `target/debug/axum`
Object {“age”: Number(20), “family”: String(“tanaka”), “first”: String(“taro”)}
All OK

reqwestが使えるというのがわかったのは収穫だが、IPだけでなく、httpsかhttpかというプロトコルも一緒に取得する必要がある。

discussion
https://github.com/tokio-rs/axum/discussions/858

ちなみに、VPSでweb socketで待ち受けて、ws://***.**.***.**:3000/ws で接続しても問題なく接続できた。

async fn handler(ConnectInfo(addr): ConnectInfo<SocketAddr>, ws: WebSocketUpgrade) -> Response {
    println!("{}", addr);
    ws.on_upgrade(handle_socket)
}

async fn handle_socket(mut socket: WebSocket) {
    while let Some(msg) = socket.recv().await {
        let mut msg = if let Ok(mut msg) = msg {
            msg
        } else {
            return;
        };
        if socket.send(msg).await.is_err() {
            // client disconnected
            return;
        }
    }
}

なるほど、reqwestが試せるだけで大分進んだ気がする。

【Rust】ダイクストラ法

fn dijkstra(edges: Vec<Vec<[usize; 2]>>, num_v: usize)  -> Vec<usize>{
    let inf = 999;
    let mut dist = Vec::new();
    for _ in 0..num_v {
        dist.push(inf);
    }
    dist[0] = 0;

    let mut q = Vec::new();
    for i in 0..num_v {
        q.push(i);
    }
    
    while *&q.len() > 0 {
        let mut r = q[0];
        for i in &q{
            if dist[*i] < dist[r] {
                r = *i;
            }
        } 
        let n = q.iter().position(|n| *n == r).unwrap();
        let u = q[n];
        q.retain(|&x| x != u);
        
        for i in edges[u].clone() {
            if dist[i[0]] > dist[u] + i[1] {
                dist[i[0]] = dist[u] + i[1];
            }
        }      
    }
    return dist;
}

fn main() {
    let edges = vec![
        vec![[1, 4], [2, 3]],
        vec![[2, 1], [3, 1], [4, 5]],
        vec![[5, 2]],
        vec![[4, 3]],
        vec![[6, 2]],
        vec![[4, 1], [6, 4]],
        vec![],
    ];
    println!("{:?}", edges);

    println!("{:?}", dijkstra(edges, 7));
}

Compiling rust v0.1.0 (/home/vagrant/dev/algorithm/rust)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
Running `target/debug/rust`
[[[1, 4], [2, 3]], [[2, 1], [3, 1], [4, 5]], [[5, 2]], [[4, 3]], [[6, 2]], [[4, 1], [6, 4]], []]
[0, 4, 3, 5, 6, 5, 8]

おおおお、これは凄いな…