void *producer (void *param){
int i;
for (i = 1; i <= 20; i++){
pthread_mutex_lock (&m);
if(num > BUF_SIZE){ /* overflow */
exit(1);
}
while (num == BUF_SIZE){
pthread_cond_wait (&_prod, &m)
}
buffer[add] = i;
add = (add + 1) % BUF_SIZE;
num++;
pthread_mutex_unlock (&m);
pthread_cond_signal (&c_cons);
printf ("producer: inserted %d\n", i); fflush(stdout);
}
printf ("producer quiting\n"); fflush(stdout);
return 0;
}
Variable Safety Tips
– Do not forget to notify waiting threads!
– predicate change =>
signal/broadcast correct condition variable
– When in doblt broadcast
– but performance loss
– You do not need a mutex to signal/broadcast
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #define BUF_SIZE 3 /* size of shared buffer */ int buffer[BUF_SIZE]; int add = 0; int rem = 0; int num = 0; pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t c_cons = PTHREAD_COND_INITIALIZER; pthread_cond_t c_prod = PTHREAD_COND_INITIALIZER; void *producer(void *param); void *consumer(void *param);
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define BUF_SIZE 3 /* size of shared buffer */
int buffer[BUF_SIZE];
int add = 0;
int rem = 0;
int num = 0;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t c_cons = PTHREAD_COND_INITIALIZER;
pthread_cond_t c_prod = PTHREAD_COND_INITIALIZER;
void *producer(void *param);
void *consumer(void *param);
int main(int argc, char *argv[]){
pthread_t tid1, tid2;
int i;
if (pthread_create(&tid1, NULL, producer, NULL) != 0){
fprintf(stderr, "Unable to create producer thread\n");
exit(1);
}
if (pthread_create(&tid2, NULL, consumer, NULL) != 0){
fprintf(stderr, "Unable to create consumer thread\n");
exit(1);
}
pthread_join(tid1, NULL); /* wait for producer to exit */
pthread_join(tid2, NULL); /* wait for consumer to exit */
printf("Parent quiting\n");
}
Pthread Mutexes
“to solve mutual exclusion problems among concurrent threads”
Birrell’s mechanisms: Pthreads:
-mutex
pthread_mutex_t aMutex; // mutex type
-Lock(mutex)
// explicit lock
int pthread_mutex_lock(pthread_mutex_t
*mutex);
// explicit unlock
int pthread_mutex_unlock(pthread_mutex_t
*mutex);
Other mutex operation
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr); // mutex attributes == specifies mutex behavior when // a mutex is shared among processes
int pthread_mutex_trylock(pthread_mutex_t *mutex);
-shared data should always be accessed through a single mutex
-mutex scope must be visible to all
-globally order lock
for all threads, lock mutexes in order
-always unlock a mutex
always unlock the correct mutex
Pthread Condition Variables
-condition
pthread_cond_t aCond; // type of cond variable
-wait
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
-signal
int pthread_cond_signal(pthread_cond_t *cond);
-broadcast
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr); // attributes --e.g., if it's shared int pthread_cond_destory(pthread_cond_t *cond);
PThread Creation example2
#include#include #define NUM_THREADS 4 void *threadFunc (void *arg){ int *p = (int*)pArg; int myNum = *p; printf("Thread number %d\n", myNum); return 0; } int main(void){ int i; pthread_t tid[NUM_THREADS]; for (i=0; i < NUM_THREADS; i++){ /* create/fork threads */ pthread_create(&tid[i], NULL, threadFunc, &i); } for (i=0; i < NUM_THREADS; i++){ pthread_join(tid[i], NULL); } return 0; }
PThread Creation
#include <stdio.h>
#include <pthread.h>
void *foo (void *arg){
printf("Foobar!\n");
pthread_exit(NULL);
}
int main(void){
int i;
pthread_t tid;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSYEM);
pthread_create(NULL, &attr, foo, NULL);
return 0;
}
Compiling PThreads
1. #include
2. compile source with -lpthread or -pthread
Intro to OS ~ ==> gcc -o main main.c -lpthread
3. check return values of common functions
#include#include #define NUM_THREADS 4 void *hello (void *arg){ printf("Hello Thread\n"); return 0; } int main(void){ int i; pthread_t tid[NUM_THREADS]; for (i=0; i < NUM_THREADS; i++){ /* create/fork threads */ pthread_create(&tid[i], NULL, hello, NULL); } for (i=0; i < NUM_THREADS; i++){ pthread_join(tid[i], NULL); } return 0; }
PThreads
PThread == POSIX Threads
POSIX == Portable Operating System Interface
POSIX Thread
– POSIX versions of Birrell’s API
– specifies syntax and semantics of the operations
Thread, Fork(proc, args), Join(thread)
pthread_t aThread; int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void * (*start_routine)(void *) void *arg); int pthread_join(pthread_t thread, void **status);
stack size, inheritance, joinable, scheduling policy, priority, system/process scope
Condition Variable API
Condition type
Wait(mutex, cond)
– mutex is automatically released z re-acquired on wait
Signal(cond)
– notify only one thread waiting on condition
Broadcast(cond)
– notify all waiting threads
Mutexes
Can be accessed by reading shared variable
create new list element element set e.value = X read list and list.p_next set e.pointer = list.p_next set list.p_next = e
Mutex, Lock(mutex), blocked_threads
List<int> my_list;
Mutex m;
void safe_insert(int i){
Lock(m){
my_list.insert(i);
}
}
for i=0..10
producers[i] = fork(safe_insert, NULL)
consumer = fork(print_and_clear, my_list)
Lock(m){
list->insert(my_thread_id)
} // unlock
Lock(m){
if my_list.full -> print; clear up to limit of elements of list
else -> release lock and try again (later)
}
Condition Variable
Lock(m){
while (my_list.not_full())
Wait(m. list_full);
m_list.print_and_remove_all();
}
Lock(m){
my_list.insert(my_thread_id);
if my_list.full()
Signal(list_full);
}
Basic Thread Mechanisms
Thread data structure
– identify threads, keep track of resource usage…
mechanisms to create and manage threads
mechanisms to safely coordinate among threads
running concurrently in the same address space
Processes, Threads
VA_p1, VA_p2
Concurrency Control & Coordination
-Mutual Exclusion
exclusive access to only one thread at a time
-Mutex
waiting on other threads
– specific condition before proceeding
Threads and Thread creation
Thread type
– thread data structure
Fork(proc, args)
– create a thread
– not unix fork
Join(thread)
^ terminate a thread
Thread thread1; Shared_list list; thread1 = fork(safe_insert, 4); safe_insert(6); join(thread1); // Optional
What about I/O?
ready queue -> cpu
I/O <- I/O queue <- I/O request
time slice expired, fork a child, wait for an interrupt
Scheduler Responsibility
- maintaining the ready queue
- decision on when to context switch
P1(web server) P2(Database)
mm, cpu
Inter Process communication
IPC mechanisms
-transfer data/info between address spaces
-maintain protection and isolation
Threads to the rescue!
threads -> multithreaded process -> core1, core2, core3 …
parallelization => speed up
specialization => hot cache!
efficiency => lower mm requirement & cheaper IPC
Are threads useful on a signle CPU?
or when (#of Threads) > (# of CPUs)?
t_ctx_switch -> t_idle -> t_ctx_switch