Rustマルチスレッドと非同期処理

処理を行うために実行されるのがスレッド
プロセスが開始されると「メインスレッド」と呼ばれるアプリの基本となる処理を実行するスレッドが実行され、この中で処理が行われる
同時に複数の処理を並行して実行する必要があるときはマルチスレッドになる

### threadによるスレッド作成
スレッドの作成は「std」というクレートに用意されている「thread」というモジュールを使う

use std::thread;

fn main() {
	thread::spawn(|| {
		println!("THread:Start!");
		println!("THread:End!");
	});

	println!("Main:Start!");
	println!("Main:End.");
}

別スレッドの実行前にメインスレッドが終了している

### スレッドの一時停止とDuration
スレッドの処理を一時的に定するには、threadの「sleep」という関数を使用する
thead::sleep(Duration:)

use std::thread;
use std::time::Duration;

fn main() {
	thread::spawn(|| {
		println!("THread:Start!");
		thread::sleep(Duration::from_millis(10));
		println!("THread:End!");
	});

	println!("Main:Start!");
	thread::sleep(Duration::from_millis(100));
	println!("Main:End.");
}

どのスレッドが処理を実行中で、どれが終了したかを常に頭に入れて処理を行う必要がある

両スレッドの実行順序を確認

use std::thread;
use std::time::Duration;

fn main() {
	thread::spawn(|| {
		for n in 1..10 {
			println!("Thread:No, {}", n);
			thread::sleep(Duration::from_millis(50));
		}
	});

	for n in 1..10 {
		println!("Main: No,{}.", n);
		thread::sleep(Duration::from_millis(100))
	}
}

sleepの持つ役割

use std::thread;
use std::time::Duration;

fn main() {
	thread::spawn(|| {
		for n in 1..100 {
			println!("Thread:No, {}", n);
		}
	});
	thread::sleep(Duration::from_millis(1));

	for n in 1..100 {
		println!("Main: No,{}.", n);
	}
}

スレッドが一時停止した時などに動く

joinHandleとjoinメソッド

use std::thread;
use std::time::Duration;

fn main() {
	println!("Main:Start!");

	let h = thread::spawn(||{
			thread::spawn(|| {
				for n in 1..6 {
					println!("Thread:No, {}", n);
					thread::sleep(Duration::from_millis(2));
				}
			});

			thread::spawn(|| {
				for n in 1..6 {
					println!("H2:No,{}", n);
					thread::sleep(Duration::from_millis(2));
				}
			});

			for n in 1..6 {
				println!("Thread:No,{}", n);
				thread::sleep(Duration::from_millis(1));
			}
	});

	let _res = h.join();
	println!("Main:End.");
}

スレッドによる値の共有

use std::thread;
use std::time::Duration;

fn main() {
	let mut num = 1;
	println!("Main:Start!");

	let h1 = thread::spawn(move || {
		println!("H1: start!");
		for n in 1..5 {
			num = 10 * n;
			println!("H1: num_h={}.", num);
			thread::sleep(Duration::from_millis(10));
		}
		println!("H1: End.");
	});

	let h2 = thread::spawn(move || {
		println!("H2:Start!");
		for n in 1..5 {
			num += n;
			println!("H2: num_h={}.", num);
			thread::sleep(Duration::from_millis(10));
		}
		println!("H2: End.");
	});

	let _res = h1.join();
	let _res = h2.join();
	println!("Main:End.");
}

### Arc/Mutexで値を共有
Mutexはスレッド間でデータを共有する際のデータ保護を行うために用いられる構造体

use std::sync::{Mutex, Arc};
use std::thread;
use std::time::Duration;

fn main(){
	let num = Arc::new(Mutex::new(1));
	println!("Main start!");

	let num_1 = Arc::clone(&num);

	let h1 = thread::spawn(move || {
		let mut num_h1 = num_1.lock().unwrap();
		println!("H1: start!");
		for n in 1..5 {
			*num_h1 += n;
			println!("H1: num_h={}.", *num_h1);
			thread::sleep(Duration::from_millis(1));
		}
		println!("H1: End.");
	});

	let num_2 = Arc::clone(&num);

	let h2 = thread::spawn(move || {
		let mut num_h2 = num_2.lock().unwrap();
		println!("H2: start!");
		for n in 1..5 {
			*num_h2 *= n;
			println!("H2: num_h={}", *num_h2);
			thread::sleep(Duration::from_millis(1));
		}
		println!("H2: End.");
	});

	let _res = h1.join();
	let _res = h2.join();

	println!("Main:: End.");
}

スレッドのデッドロック

use std::sync::{Mutex, Arc};
use std::thread;
use std::time::Duration;

fn main(){
	let num1 = Arc::new(Mutex::new(0));
	let num2 = Arc::new(Mutex::new(0));

	let value1a = Arc::clone(&num1);
	let value2a = Arc::clone(&num2);

	let value1b = Arc::clone(&num1);
	let value2b = Arc::clone(&num2);

	let h1 = thread::spawn(move || {
		let mut num1 = value1a.lock().unwrap();
		thread::sleep(Duration::from_millis(50));
		let mut num2 = value2a.lock().unwrap();
		*num1 += 10;
		*num2 += 20;
	});

	let h2 = thread::spawn(move || {
		let mut num2 = value2b.lock().unwrap();
		thread::sleep(Duration::from_millis(50));
		let mut num1 = value1b.lock().unwrap();
		*num1 += 100;
		*num2 += 200;
	});

	h1.join().unwrap();
	h2.join().unwrap();

	println!("end");
}

チャンネルの利用

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main(){
	let (tx, rx) = mpsc::channel();
	println!("Main: start!");
	let h1 = thread::spawn(move ||{
		let mut num = 1;
		println!("H1: start!");
		for n in 1..5 {
			num += n;
			tx.send(num).unwrap();
			println!("H1: num={}.", num);
			thread::sleep(Duration::from_millis(10));
		}
		println!("H1: End.");
	});

	let h2 = thread::spawn(move ||{
		println!("H2: start!");
		for n in 1..5 {
			let num_recv = rx.recv().unwrap();
			println!("H2: num_recv={}.", num_recv);
			thread::sleep(Duration::from_millis(20));
		}
		println!("H2: End.");
	});
	let _res = h1.join();
	let _res = h2.join();
	println!("Main: End.");
}

相互に送受信

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main(){
	let (tx1, rx1) = mpsc::channel();
	let (tx2, rx2) = mpsc::channel();
	tx2.send(0).unwrap();
	println!("Main: start!");

	let h1 = thread::spawn(move ||{
		let mut num = 1;
		println!("H1: start!");
		for n in 1..5 {
			let val = rx2.recv().unwrap();
			num += n;
			println!("H1: num={}.", num);
			tx1.send(num).unwrap();
			thread::sleep(Duration::from_millis(10));
		}
		println!("H1: End.");
	});
	thread::sleep(Duration::from_millis(5));
	let h2 = thread::spawn(move ||{
		println!("H2: start!");
		for n in 1..5 {
			let val = rx1.recv().unwrap();
			let num = val * 2;
			println!("H2: num={}.", num);
			tx2.send(num).unwrap();
			thread::sleep(Duration::from_millis(10));
		}
		println!("H2: End.");
	});
	let _res = h1.join();
	let _res = h2.join();
	println!("Main: End.");
}

### 同期チャンネル

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main(){
	let (tx1, rx1) = mpsc::sync_channel(1);
	let (tx2, rx2) = mpsc::sync_channel(1);
	tx2.sync_send(0).unwrap();
	println!("Main: start!");

	let h1 = thread::spawn(move ||{
		let mut num = 1;
		println!("H1: start!");
		for n in 1..5 {
			let val = rx2.recv().unwrap();
			num += n;
			println!("H1: num={}.", num);
			tx1.sync_send(num).unwrap();
			thread::sleep(Duration::from_millis(10));
		}
		println!("H1: End.");
	});
	thread::sleep(Duration::from_millis(5));
	let h2 = thread::spawn(move ||{
		println!("H2: start!");
		for n in 1..5 {
			let val = rx1.recv().unwrap();
			let num = val * 2;
			println!("H2: num={}.", num);
			tx2.sync_send(num).unwrap();
			thread::sleep(Duration::from_millis(10));
		}
		println!("H2: End.");
	});
	let _res = h1.join();
	let _res = h2.join();
	println!("Main: End.");
}