【python】UNIXドメインソケットによるデータの受け渡し【並列処理】

ソケット通信は異なるサーバ間同士でのやりとりかと思っていたが、別々のスレッド同士でもできるのね。

import socket
import os.path
import time
from threading import Thread, current_thread

SOCK_FILE = "./mailbox"
BUFFER_SIZE = 1024

class Sender(Thread):
    def run(self) -> None:
        self.name = "Sender"
        client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        client.connect(SOCK_FILE)

        messages = ["Hello", " ", "world!"]
        for msg in messages:
            print(f"{current_thread().name}: Send: '{msg}'")
            client.sendall(str.encode(msg))
        
        client.close()

class Receiver(Thread):
    def run(self) -> None:
        self.name = "Receiver"
        server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        server.bind(SOCK_FILE)
        server.listen()
        print(
            f"{current_thread().name}: Listening for incoming messages...")
        conn, addr = server.accept()

        while True:
            data = conn.recv(BUFFER_SIZE)
            if not data:
                break
            message = data.decode()
            print(f"{current_thread().name}: Received: '{message}'")
        server.close()

def main() -> None:
    if os.path.exists(SOCK_FILE):
        os.remove(SOCK_FILE)

    receiver = Receiver()
    receiver.start()
    time.sleep(1)
    sender = Sender()
    sender.start()

    for thread in [receiver, sender]:
        thread.join()

    os.remove(SOCK_FILE)

if __name__ == "__main__":
    main()

$ python3 sockets.py
Receiver: Listening for incoming messages…
Sender: Send: ‘Hello’
Receiver: Received: ‘Hello’
Sender: Send: ‘ ‘
Receiver: Received: ‘ ‘
Sender: Send: ‘world!’
Receiver: Received: ‘world!’

【python】メッセージキューによるデータの受け渡し【並列処理】

q = Queue() で初期化した後は、qでやり取りしていますね。

import time
from queue import Queue
from threading import Thread, current_thread

class Worker(Thread):
    def __init__(self, queue: Queue, id: int):
        super().__init__(name=str(id))
        self.queue = queue

    def run(self) -> None:
        while not self.queue.empty():
            item = self.queue.get()
            print(f"Thread {current_thread().name}: "
                f"processing item {item} from the queue")
            time.sleep(2)

def main(thread_num: int) -> None:
    q = Queue()
    for i in range(10):
        q.put(i)

    threads = []
    for i in range(thread_num):
        thread = Worker(q, i + 1)
        thread.start()
        threads.append(thread)
    
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    thread_num = 4
    main(thread_num)

$ python3 message_queue.py
Thread 1: processing item 0 from the queue
Thread 2: processing item 1 from the queue
Thread 3: processing item 2 from the queue
Thread 4: processing item 3 from the queue
Thread 2: processing item 4 from the queue
Thread 3: processing item 5 from the queue
Thread 1: processing item 6 from the queue
Thread 4: processing item 7 from the queue
Thread 3: processing item 8 from the queue
Thread 2: processing item 9 from the queue

【Rust】rustでパイプを使ったデータの受け渡し【並列処理】

let (mut tx, mut rx) = channel();でチャンネルを開設したとき、tx, rxの型が何かに苦戦しました。結局、pipe_channel::Sender、pipe_channel::Receiverになるのね。相互にやり取りするのではなく、一方的にデータを送るときなどに有効活用できそうです。

use std::thread;
use pipe_channel::*;

fn writer(mut tx: pipe_channel::Sender<String>) {
    let name = "Writer".to_string();
    let handle = thread::spawn(move || {
        println!("{}: Sending rubber duck...", name);
        tx.send("Rubber duck".to_string()).unwrap();
    });
    handle.join().unwrap();
}

fn reader(mut rx: pipe_channel::Receiver<String>) {
    let name = "Reader".to_string();
    let handle = thread::spawn(move || {
        println!("{}: Reading...", name);
        println!("{}: received ({})", name, rx.recv().unwrap());
    });
    handle.join().unwrap();
}

fn main() {
    let (mut tx, mut rx) = channel();
    writer(tx);
    reader(rx);
}

fn print_typename<T>(_: T) {
    println!("{}", std::any::type_name::<T>());
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.29s
Running `target/debug/parallel`
Writer: Sending rubber duck…
Reader: Reading…
Reader: received (Rubber duck)

【Python】パイプによるIPC【並列処理】

reader_conn, writer_conn = Pipe() でpipeを接続してるのはわかります。
共有メモリの場合は、グローバルな変数で値をやり取りしたが、Pipeの場合はその名の通り、一方のスレッドからもう一方のスレッドへ値を渡すときに使われるってことかな。。

from threading import Thread, current_thread
from multiprocessing import Pipe
from multiprocessing.connection import Connection

class Writer(Thread):
    def __init__(self, conn: Connection):
        super().__init__()
        self.conn = conn
        self.name = "Writer"

    def run(self) -> None:
        print(f"{current_thread().name}: Sending rubber duck...")
        self.conn.send("Rubber duck")

class Reader(Thread):
    def __init__(self, conn: Connection):
        super().__init__()
        self.conn = conn
        self.name = "Reader"

    def run(self) -> None:
        print(f"{current_thread().name}: Reading...")
        msg = self.conn.recv()
        print(f"{current_thread().name}: Received: {msg}")

def main() -> None:
    reader_conn, writer_conn = Pipe()
    reader = Reader(reader_conn)
    writer = Writer(writer_conn)
    threads = [
        writer,
        reader
    ]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()

$ python3 pipe.py
Writer: Sending rubber duck…
Reader: Reading…
Reader: Received: Rubber duck

【Rust】rustでもglobal vectorによるIPC処理【並列処理】

forループが終わってからスレッドが並列で動いているので、rustとpythonだと、スレッドの立ち方が微妙に違いますね。

static SIZE: u32 = 5;
static Shared_Memory: Mutex<Vec<i32>> = Mutex::new(Vec::new());

fn main() {
    for _i in 0..SIZE {
        Shared_Memory.lock().unwrap().push(-1);
    }
    consumer();
    producer();
    
    println!("{:?}", Shared_Memory);
}

fn producer() {
    let name = "Producer".to_string();
    let mut memory = Shared_Memory.lock().unwrap();
    for i in 0..SIZE {   
        println!("{}: Writing {}", name, i);
        memory[i as usize] = i as i32;
    }
}

fn consumer() {
    let name = "Consumer".to_string();
    
    let handle = thread::spawn(move || {
        let mut memory = Shared_Memory.lock().unwrap();
        for i in 0..SIZE {
            while true {
                let line = memory[i as usize];
                if line == -1{
                    println!("{}: Data not available, sleeping fro 1 second for retrying.", name);
                    thread::sleep(Duration::from_millis(500));
                }
                println!("{}: Read: ({})", name, line);
                break;
            }
        }
    });
    handle.join().unwrap();
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.33s
Running `target/debug/parallel`
Consumer: Data not available, sleeping fro 1 second for retrying.
Consumer: Read: (-1)
Consumer: Data not available, sleeping fro 1 second for retrying.
Consumer: Read: (-1)
Consumer: Data not available, sleeping fro 1 second for retrying.
Consumer: Read: (-1)
Consumer: Data not available, sleeping fro 1 second for retrying.
Consumer: Read: (-1)
Consumer: Data not available, sleeping fro 1 second for retrying.
Consumer: Read: (-1)
Producer: Writing 0
Producer: Writing 1
Producer: Writing 2
Producer: Writing 3
Producer: Writing 4
Mutex { data: [0, 1, 2, 3, 4], poisoned: false, .. }

【Python】共有メモリによるIPC【並列処理】

shared_memory = [-1] * SIZE は、[-1, -1, -1, -1, -1] になり、producerで[0, 1, 2, 3, 4]に変わる。ここがややこしい。

import time
from threading import Thread, current_thread

SIZE = 5
shared_memory = [-1] * SIZE

class Producer(Thread):
    def run(self) -> None:
        self.name = "Producer"
        global shared_memory
        for i in range(SIZE):
            print(f"{current_thread().name}: Writing {int(i)}")
            shared_memory[i - 1] = i

class Consumer(Thread):
    def run(self) -> None:
        self.name = "Consumer"
        global shared_memory
        for i in range(SIZE):
            while True:
                line = shared_memory[i]
                if line == -1:
                    print(f"{current_thread().name}: Data not available\n"
                            f"Sleeping for 1 second before retrying")
                    time.sleep(1)
                    continue
                print(f"{current_thread().name}: Read: {int(line)}")
                break

def main() -> None:
    threads = [
        Consumer(),
        Producer(),
    ]

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()
    

【Rust】Rustで新規のthreadを生成する【並列処理】

use nix::unistd::{fork, getpid, getppid, ForkResult};
use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.75s
Running `target/debug/parallel`
hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the spawned thread!
hi number 3 from the main thread!
hi number 4 from the spawned thread!
hi number 4 from the main thread!
hi number 5 from the spawned thread!

joinを呼び出すことで、実行されたことを保証する

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..5 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

所有権の移転

fn main() {
    let v = vec![1, 2, 3]; 
    let handle = thread::spawn(move|| {
        for i in 0..3 {
            println!("hi number {} from the spawned thread!", v[i]);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..3 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

なんか違うような気がするが…

use nix::unistd::{fork, getpid, getppid, ForkResult};
use std::thread;
use std::time::Duration;
use rayon::current_num_threads;

fn cpu_waster(i: u32) {
    let name = gettid::gettid().to_string();
    println!("{} doing {} work", name, i);
    thread::sleep(Duration::from_millis(5));
}

fn display_threads() {
    let str: String = "-".to_string();
    println!("{}", str.repeat(10));
    println!("Current process PID: {}", getpid());
    println!("Thread Count: {}", current_num_threads());
}

fn main() {
    let num_threads = 5;
    display_threads();

    println!("Starting {} CPU waters...", num_threads);
    let handle = thread::spawn(move|| {
        for i in 0..num_threads {
            cpu_waster(i);
        }
    });
    display_threads();

    handle.join().unwrap();
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.23s
Running `target/debug/parallel`
———-
Current process PID: 582557
Thread Count: 2
Starting 5 CPU waters…
———-
Current process PID: 582557
Thread Count: 2
582600 doing 0 work
582600 doing 1 work
582600 doing 2 work
582600 doing 3 work
582600 doing 4 work

【Python】プロセスの中で子スレッドを作成【並列処理】

import os
import time
import threading
from threading import Thread

def cpu_waster(i: int) -> None:
    name = threading.current_thread().getName()
    print(f"{name} doing {i} work")
    time.sleep(3)

def display_threads() -> None:
    print("-" * 10)
    print(f"Current process PID: {os.getpid()}")
    print(f"Thread Count: {threading.active_count()}")
    print("Active threads:")
    for thread in threading.enumerate():
        print(thread)

def main(num_threads: int) -> None:
    display_threads()

    print(f"Starting {num_threads} CPU wasters...")
    for i in range(num_threads):
        thread = Thread(target=cpu_waster, args=(i,))
        thread.start()
    
    display_threads()

if __name__ == "__main__":
    num_threads = 5
    main(num_threads)

$ python3 multithreading.py
———-
Current process PID: 580482
Thread Count: 1
Active threads:
<_MainThread(MainThread, started 281472949920800)>
Starting 5 CPU wasters…
Thread-2 (cpu_waster) doing 1 work
Thread-3 (cpu_waster) doing 2 work
Thread-4 (cpu_waster) doing 3 work
/home/vagrant/dev/rust/parallel/python/multithreading.py:7: DeprecationWarning: getName() is deprecated, get the name attribute instead
name = threading.current_thread().getName()
Thread-1 (cpu_waster) doing 0 work
Thread-5 (cpu_waster) doing 4 work
———-
Current process PID: 580482
Thread Count: 6
Active threads:
<_MainThread(MainThread, started 281472949920800)>




うーむ、中々難儀やのう

【Rust】rustで子プロセスを生成したい【並列処理】

nixを使います。

nix = “0.16.1”

use nix::unistd::{getpid};

fn main() {
    println!("My id is {}", getpid());
}

My id is 577891

### 子プロセスの生成

use nix::unistd::{fork, getpid, getppid, ForkResult};
use nix::sys::wait::waitpid;
use std::process::exit;

fn main() {
    let num_children = 2;
    start_parent(num_children);
}

fn start_parent(num_children: u32) {
    println!("Parent: I am the parent process");
    println!("Parent PID is {}", getpid());
    for i in 0..num_children {
        run_child();
    }
}

fn run_child() {
    let child_pid = match fork() {
        Ok(ForkResult::Parent {child, ..}) => {
            println!("Main({}) forked child ({})", getpid(), child);
            child
        },
        Ok(ForkResult::Child) => {
            println!("Child({}) PPID is ({})", getpid(), getppid());
            exit(0);
        },
        Err(_) => panic!("fork failed"),
    };

    match waitpid(child_pid, None) {
        Ok(status) => println!(""),
        Err(_) => println!("waitpid() failed"),
    }
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.20s
Running `target/debug/parallel`
Parent: I am the parent process
Parent PID is 579363
Main(579363) forked child (579390)
Child(579390) PPID is (579363)

Main(579363) forked child (579391)
Child(579391) PPID is (579363)

おおおおおおおおお、中々素晴らしい!

【Python】親プロセスと子プロセスを生成する【並列処理】

import os
from multiprocessing import Process

def run_child() -> None:
    print("Child: I am the child process")
    print(f"Child: Child's PID: {os.getpid()}")
    print(f"Child: Parent's PID: {os.getppid()}")

def start_parent(num_children: int) -> None:
    print("Parent: I am the parent process")
    print(f"Parent : Parent's PID: {os.getpid()}")
    for i in range(num_children):
        print(f"Starting Process {i}")
        Process(target=run_child).start()

if __name__ == "__main__":
    num_children = 3
    start_parent(num_children)

$ python3 child_processes.py
Parent: I am the parent process
Parent : Parent’s PID: 576837
Starting Process 0
Starting Process 1
Starting Process 2
Child: I am the child process
Child: Child’s PID: 576838
Child: Parent’s PID: 576837
Child: I am the child process
Child: Child’s PID: 576840
Child: Parent’s PID: 576837
Child: I am the child process
Child: Child’s PID: 576839
Child: Parent’s PID: 576837

### Rustで書きたい

use std::process;
use std::process::Command;

fn main() {
    println!("My id is {}", process::id());

    let mut command = Command::new("ls");
    if let Ok(child) = command.spawn() {
        println!("My id is {}", child.id());
    }
}

Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.15s
Running `target/debug/parallel`
My id is 577522
My id is 577561
Cargo.lock Cargo.toml python src target

Command::newはprocessってよりコマンドのprocessなのでちょっと意図しているものと違うな