【Python】threadingによる並列処理(マルチスレッド)

threadingというライブラリを使用する
今いるスレッドを確認

import threading
import warnings
warnings.simplefilter('ignore')

print(threading.currentThread().getName())

$ python3 main.py
MainThread

### threading.Threadでmainとは別にスレッドを作成する

import threading
import time
import warnings
warnings.simplefilter('ignore')

def boil_udon():
    print(" ■ thread :", threading.currentThread().getName())

    print(' うどんを茹でます。')
    time.sleep(3)
    print(' うどんが茹で上がりました。')

if __name__ == "__main__":
    print(" ■ thread :", threading.currentThread().getName())

    print('うどんを作ります。')

    # スレッドを作成
    thread1 = threading.Thread(target=boil_udon)
    thread1.start()
    thread1.join()

    print('うどんの盛り付けをします。')
    print('うどんができました。')

$ python3 main.py
■ thread : MainThread
うどんを作ります。
■ thread : Thread-1 (boil_udon)
うどんを茹でます。
うどんが茹で上がりました。
うどんの盛り付けをします。
うどんができました。

### スレッドを更に追加する

import threading
import time
import warnings
warnings.simplefilter('ignore')

def boil_udon():
    print(" ■ thread :", threading.currentThread().getName())

    print(' うどんを茹でます。')
    time.sleep(3)
    print(' うどんが茹で上がりました。')

def make_tuyu():
    print(" ■ thread :", threading.currentThread().getName())

    print(' うどんの汁を作ります。')
    time.sleep(2)
    print(' うどんの汁ができました。')


if __name__ == "__main__":
    print(" ■ thread :", threading.currentThread().getName())

    print('うどんを作ります。')

    # スレッドを作成
    thread1 = threading.Thread(target=boil_udon)
    thread2 = threading.Thread(target=make_tuyu)

    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()

    print('うどんの盛り付けをします。')
    print('うどんができました。')

$ python3 main.py
■ thread : MainThread
うどんを作ります。
■ thread : Thread-1 (boil_udon)
うどんを茹でます。
■ thread : Thread-2 (make_tuyu)
うどんの汁を作ります。
うどんの汁ができました。
うどんが茹で上がりました。
うどんの盛り付けをします。
うどんができました。

threadが作られると、同時に処理されていることがわかる
ThreadPoolExecutorの場合は、単純に1つの処理を複数のスレッドで実行するが、threadingの場合は、プログラム内容を指定してスレッドを作成することができる

Pythonでマルチスレッドとマルチプロセス

### シングルスレッド

from concurrent.futures import ThreadPoolExecutor
import time
def func():
    time.sleep(1)
start = time.time()
for i in range(8):
    func()
print(time.time() - start)

$ python3 single.py
8.064586639404297

### マルチスレッド(同時に実行)

from concurrent.futures import ThreadPoolExecutor
import time
def func():
    time.sleep(1)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as e:
    for i in range(8):
        e.submit(func)
print(time.time() - start)

$ python3 multithread.py
2.0498673915863037

### マルチプロセス(複数のプロセスで同時に処理)

from concurrent.futures import ProcessPoolExecutor
import time
def func():
    time.sleep(1)
start = time.time()
with ProcessPoolExecutor(max_workers=4) as e:
    for i in range(8):
        e.submit(func)
print(time.time() - start)

$ python3 multiprocess.py
2.1424620151519775

import os
import time
import datetime
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

x1, x2, y1, y2 = -1.8, 1.8, -1.8, 1.8
c_real, c_imag = -0.62772, -0.42193

def calculate_z_serial_purepython(maxiter, zs, cs):
    output = [0] * len(zs)
    for i in range(len(zs)):
        n = 0
        z = zs[i]
        c = cs[i]
        if (i % 100) == 0:
            time.sleep(0.0001)
        while abs(z) < 2 and n < maxiter:
            z = z * z + c
            n += 1
        output[i] = n
    return output

def calc_pure_python(desired_width, max_iterations):
    x_step = (float(x2 - x1) / float(desired_width))
    y_step = (float(y1 - y2) / float(desired_width))
    x = []
    y = []
    ycoord = y2
    while ycoord > y1:
        y.append(ycoord)
        ycoord += y_step
    xcoord = x1
    while xcoord < x2:
        x.append(xcoord)
        xcoord += x_step

    zs = []
    cs = []
    for ycoord in y:
        for xcoord in x:
            zs.append(complex(xcoord, ycoord))
            cs.append(complex(c_real, c_imag))
    
    output = calculate_z_serial_purepython(max_iterations, zs, cs)

if __name__ == "__main__":

    max_workers = os.cpu_count()
    start = datetime.datetime.now()
    for i in range(16):
        calc_pure_python(desired_width=500, max_iterations=100)
    elapsed = datetime.datetime.now() - start
    print("SingleThread: {}ms".format(elapsed.seconds*1000 + elapsed.microseconds/1000))

    start = datetime.datetime.now()
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        for i in range(16):
            executor.submit(calc_pure_python, 500, 100)
    elapsed = datetime.datetime.now() - start
    print("MultiThread: {}ms".format(elapsed.seconds*1000 + elapsed.microseconds/1000))

    # マルチプロセスの場合
    start = datetime.datetime.now()
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        for i in range(16):
            executor.submit(calc_pure_python, 500, 100)
    elapsed = datetime.datetime.now() - start
    print("MultiProcess: {}ms".format(elapsed.seconds*1000 + elapsed.microseconds/1000))

$ python3 main.py
SingleThread: 17934.699ms
MultiThread: 11256.051ms
MultiProcess: 8493.925ms

os.cpu_count() でCPUのコアを取得できるんか…
処理に時間がかかる箇所をマルチプロセスで実装すれば良さそうではあるな…

マルチスレッドとマルチタスク

– マルチコアシステムは2つ以上のコアが搭載されたシングルプロセッサCPU。L2キャッシュやフロントサイドバスなどを共有。コストを大幅に削減

– マルチタスクは複数のタスクでCPUなどの処理リソースを共有する方式。マルチタスクは各計算タスクを素早く切り替えることで、同時に行われているように見せる機能

– マルチコアは、異なるタスクに対して、複数の計算エンジンが独立して動作する。プロセスを別々のCPUコア間で分割することで、複数のアプリケーションを効率的に実行できる

– マルチスレッド処理: マルチタスク処理の理論をアプリケーションに適用したもので、1つのアプリケーション内のSiriを個々のスレッドに分割し、各スレッドは並列に実行される

マルチタスク処理

task.c

#include "kernel.h"
#include "kernel_id.h"
#include "ecrobot_interface.h"

#define COUNT 500 /* カウント数を500に定義 */

DeclareTask(Task1);
DeclareTask(Task2);

void ecrobot_device_initialize(){}

void ecrobot_device_terminate(){}

void user_1ms_isr_type2(void){}

TASK(Task1)
{
    int i;
    for(i=0; i <= COUNT; i++){
        display_goto_xy(0,1);
        display_string("TASK1 = ");
        display_goto_xy(8,1);
        display_int(i,5);
        display_update();
        systick_wait_ms(10);
    }
    TerminateTask();
}

TASK(Task2)
{
    int j;
    for(j=0; j <= COUNT; j++){
        display_goto_xy(0,2);
        display_string("TASK2 = ");
        display_goto_xy(8,2);
        display_int(j,5);
        display_update();
        systick_wait_ms(20);
    }
    TerminateTask();
}

tasks.oil

#include "implementation.oil"

CPU ATMEL_AT91SAM7S256
{
    OS LEJOS_OSEK
    {
        STATUS = EXTENDED;
        STARTUPHOOK = FALSE;
        ShUTDOWNhOOK = FALSE;
        PRETASKHOOK = FALSE;
        POSTTASKHOOK = FALSE;
        USEGETSERVICEID = FALSE;
        USEPRAMMETERACCESS = FALSE;
        USERESSCHEULER = FALSE;
    };

    APPMODE appmode1{};

    TASK Task1
    {
        AUTOSTART = TRuE { APPMODE = appmode1; }
        PRIORUTY = 1;
        ACTIVATION = 1;
        SCHEDULE = FULL;
        STACKSIZE = 512;
    };

    TASK Task2
    {
        AUTOSTART = TRuE { APPMODE = appmode1; }
        PRIORUTY = 2;
        ACTIVATION = 1;
        SCHEDULE = FULL;
        STACKSIZE = 512;
    };
};
#include "kernel.h"
#include "kernel_id.h"
#include "ecrobot_interface.h"

#define COUNT 500 /* カウント数を500に定義 */

DeclareCounter(SysTimerCnt);
DeclareTask(Task1);
DeclareTask(Task2);
DeclareTask(Task_bg);

void ecrobot_device_initialize(){}

void ecrobot_device_terminate(){}

void user_1ms_isr_type2(void){
    SignalCounter(SysTimerCnt); /* カウンタをIncrementする */
}

TASK(Task1)
{
    static int i=0;
    if(i <= COUNT){
        display_goto_xy(0,1);
        display_string("TASK1 = ");
        display_goto_xy(8,1);
        display_int(i,5);
        display_update();
        i++;
    } else {
        display_goto_xy(0,4);
        display_string("TASK1 Terminated");
        display_update();
    }
    TerminateTask(); /* 処理終了 */
}

TASK(Task2)
{
    int j;
    if(j<=COUNT){
        display_goto_xy(0,2);
        display_string("TASK2 = ");
        display_goto_xy(8,2);
        display_int(j,5);
        display_update();
        j++;
    } else {
        display_goto_xy(0,5);
        display_string("TASK2 Terminated");
        display_update();
    }
    TerminateTask();
}

task_bg.c

#include "kernel.h"
#include "kernel_id.h"
#include "ecrobot_itnerface.h"

#define TEMPO 10
#define VOLUME 50

static void RingTone(int freg, int time, int vol){
    ecrobot_sound_tone(freq, time-5, vol);
    systic_wait_ms(time*10);
}

TASK(task_bg)
{
    while(1){
/*===========かえるの歌=============*/
		RingTone(523, TEMPO*2, VOLUME);
		RingTone(587, TEMPO*2, VOLUME);
		RingTone(659, TEMPO*2, VOLUME);
		RingTone(698, TEMPO*2, VOLUME);
		RingTone(659, TEMPO*2, VOLUME);
		RingTone(587, TEMPO*2, VOLUME);
		RingTone(523, TEMPO*3, VOLUME);
		systick_wait_ms(TEMPO*10);

		RingTone(659, TEMPO*2, VOLUME);
		RingTone(698, TEMPO*2, VOLUME);
		RingTone(784, TEMPO*2, VOLUME);
		RingTone(880, TEMPO*2, VOLUME);
		RingTone(784, TEMPO*2, VOLUME);
		RingTone(698, TEMPO*2, VOLUME);
		RingTone(659, TEMPO*3, VOLUME);
		systick_wait_ms(TEMPO*10);

		RingTone(523, TEMPO*2, VOLUME);
		systick_wait_ms(TEMPO*2*10);
		RingTone(523, TEMPO*2, VOLUME);
		systick_wait_ms(TEMPO*2*10);
		RingTone(523, TEMPO*2, VOLUME);
		systick_wait_ms(TEMPO*2*10);
		RingTone(523, TEMPO*2, VOLUME);
		systick_wait_ms(TEMPO*2*10);

		RingTone(523, TEMPO, VOLUME);
		RingTone(523, TEMPO, VOLUME);
		RingTone(587, TEMPO, VOLUME);
		RingTone(587, TEMPO, VOLUME);
		RingTone(659, TEMPO, VOLUME);
		RingTone(659, TEMPO, VOLUME);
		RingTone(698, TEMPO, VOLUME);
		RingTone(698, TEMPO, VOLUME);
		RingTone(659, TEMPO, VOLUME);
		systick_wait_ms(TEMPO*10);
		RingTone(587, TEMPO, VOLUME);
		systick_wait_ms(TEMPO*10);
		RingTone(523, TEMPO*3, VOLUME);
		systick_wait_ms(TEMPO*10);
/*==================================*/
	}

    display_goto_xy(0,6);
    display_string("TASKbgTerminated");
    display_update();

    TerminateTask();
}

task_cycle.oil

#include "implementation.oil"

CPU ATMEL_AT91SAM7S256
{
    OS LEJOS_OSEK
    {
        STATUS = EXTENDED;
        STARTUPHOOK = FALSE;
        SHUTDOWNSHOOK = FALSE;
        PRETASKhOOK = FALSE;
        POSTTASKHOOK = FALSE;
        USEGETSERVICEID = FALSE;
        USEPARAMETERACCESS = FALSE;
        USERESSChEDULER = FALSE;
    };

    APPMODE appmode1{};

    TASK Task1
    {
        AUTOSTART = FALSE;
        PRIORITY = 2;
        ACTIVATION = 1;
        SCHEDULE = FULL;
        STACKSIZE = 512;
    };

    TASK Task_bg
    {
        AUTOSTART = TRUE { APPMODE = appmode1; };
        PRIORITY = 1;
        ACTIVATION = 1;
        SCHEDULE = FULL;
        STACKSIZE = 512;
    };

    COUNTER SysTimerCnt
    {
        MINCYCLE = 1;
        MAXALLOWEDVALUE = 10000;
        TICKSPERBASE = 1; // One tick is equal to 1msec
    }

    ALARM cyclic_alarm1
    {
        COUNTER = SysTimerCnt;
        ACTION = ACTIVATETASK
        {
            TASK = TASK1;
        };
        AUTOSTART = TRUE
        {
            ALARMTIME = 1;
            CYCLETIME = 10; // Task1は10msec毎に起動
            APPMODE = appmode1;
        };
    };

    ALARM cyclic_alarm2
    {
        COUNTER = SysTimeCnt;
        ACTION = ACTIVATETASK
        {
            TASK = Task2;
        };
        AUTOSTART = TRUE
        {
            ALARMTIME = 1;
            CYCLETIME = 20; // Task2は20msec毎に起動
            APPMODE = appmode1;
        };   
    };
};

一緒に実行させる命令文がいまいちよくわからんね。。。

【CPU】Task Status Segment

レジスタの状態をメモリに書き込む

struct TSS32 {
	int backlink, esp0, ss0, esp1, ss1, esp2, ss2, cr3;  // タスクの設定
	int eip, eflags, eax, ecx, ebx, esp, ebp, esi, edi;  // 32bitレジスタ eipは拡張命令ポインタ 
	int es, cs, ss, ds, fs, gs;                          // 16bitレジスタ 
	int ldtr, iomap;
}