Pthread Note

本文记录pthread 相关笔记

Functions

1
2
3
4
    int pthread_create(pthread_t * thread,
                       const pthread_attr_t * attr,
                       void * (*start_routine)(void *),
                       void *arg);

Arguments:

  • thread - returns the thread id. (unsigned long int defined in bits/pthreadtypes.h)
  • attr - Set to NULL if default thread attributes are used. (else define members of the struct pthread_attr_t defined in bits/pthreadtypes.h) Attributes include:
    • detached state (joinable? Default: PTHREAD_CREATE_JOINABLE. Other option: PTHREAD_CREATE_DETACHED)
    • scheduling policy (real-time? PTHREAD_INHERIT_SCHED,PTHREAD_EXPLICIT_SCHED,SCHED_OTHER)
    • scheduling parameter
    • inheritsched attribute (Default: PTHREAD_EXPLICIT_SCHED Inherit from parent thread: PTHREAD_INHERIT_SCHED)
    • scope (Kernel threads: PTHREAD_SCOPE_SYSTEM User threads: PTHREAD_SCOPE_PROCESS Pick one or the other not both.)
    • guard size
    • stack address (See unistd.h and bits/posix_opt.h _POSIX_THREAD_ATTR_STACKADDR)
    • stack size (default minimum PTHREAD_STACK_SIZE set in pthread.h),
  • void * (*start_routine) - pointer to the function to be threaded. Function has a single argument: pointer to void.
  • *arg - pointer to argument of function. To pass multiple arguments, send a pointer to a structure.
1
void pthread_exit(void *retval);

Arguments:

  • retval - Return value of thread.

This routine kills the thread. The pthread_exit function never returns. If the thread is not detached, the thread id and return value may be examined from another thread by using pthread_join.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include<stdio.h>
#include<string.h>
#include<pthread.h>
#include<stdlib.h>
#include<unistd.h>

pthread_t tid[2];
int ret1,ret2;

void* doSomeThing(void *arg)
{
    unsigned long i = 0;
    pthread_t id = pthread_self();

    for(i=0; i<(0xFFFFFFFF);i++);

    if(pthread_equal(id,tid[0]))
    {
        printf("\n First thread processing done\n");
        ret1  = 100;
        pthread_exit(&ret1);
    }
    else
    {
        printf("\n Second thread processing done\n");
        ret2  = 200;
        pthread_exit(&ret2);
    }

    return NULL;
}

int main(void)
{
    int i = 0;
    int err;
    int *ptr[2];

    while(i < 2)
    {
        err = pthread_create(&(tid[i]), NULL, &doSomeThing, NULL);
        if (err != 0)
            printf("\ncan't create thread :[%s]", strerror(err));
        else
            printf("\n Thread created successfully\n");

        i++;
    }

    pthread_join(tid[0], (void**)&(ptr[0]));
    pthread_join(tid[1], (void**)&(ptr[1]));

    printf("\n return value from first thread is [%d]\n", *ptr[0]);
    printf("\n return value from second thread is [%d]\n", *ptr[1]);

    return 0;
}

The threads library provides three synchronization mechanisms:

  • mutexes - Mutual exclusion lock: Block access to variables by other threads. This enforces exclusive access by a thread to a variable or set of variables.
  • joins - Make a thread wait till others are complete (terminated**.
  • condition variables - data type pthread_cond_t

Muteses:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
/* Note scope of variable and mutex are the same */
pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
int counter=0;

/* Function C */
void functionC()
{
   pthread_mutex_lock( &mutex1 );
   counter++
   pthread_mutex_unlock( &mutex1 );
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

void *functionC(void*);
pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
int  counter = 0;

main() {
  int rc1, rc2;
  pthread_t thread1, thread2;

  /* Create independent threads each of which will execute functionC */

  if ((rc1 = pthread_create(&thread1, NULL, functionC, NULL))) {
    printf("Thread creation failed: %d\n", rc1);
  }

  if ((rc2 = pthread_create(&thread2, NULL, functionC, NULL))) {
    printf("Thread creation failed: %d\n", rc2);
  }

  /* Wait till threads are complete before main continues. Unless we  */
  /* wait we run the risk of executing an exit which will terminate   */
  /* the process and all threads before the threads have completed.   */

  pthread_join(thread1, NULL);
  pthread_join(thread2, NULL);

  exit(0);
}

void *functionC(void*) {
  pthread_mutex_lock(&mutex1);
  counter++;
  printf("Counter value: %d\n", counter);
  pthread_mutex_unlock(&mutex1);
}

Joins:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <pthread.h>
#include <stdio.h>

#define NTHREADS 10
void *thread_function(void *);
pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
int counter = 0;

int main() {
  pthread_t thread_id[NTHREADS];
  int i, j;

  for (i = 0; i < NTHREADS; i++) {
    pthread_create(&thread_id[i], NULL, thread_function, NULL);
  }

  for (j = 0; j < NTHREADS; j++) {
    pthread_join(thread_id[j], NULL);
  }

  /* Now that all threads are complete I can print the final result.     */
  /* Without the join I could be printing a value before all the threads */
  /* have been completed.                                                */

  printf("Final counter value: %d\n", counter);
  return 0;
}

void *thread_function(void *dummyPtr) {
  printf("Thread number %ld\n", pthread_self());
  pthread_mutex_lock(&mutex1);
  counter++;
  pthread_mutex_unlock(&mutex1);
}

Condition Variables:

Functions used in conjunction with the condition variable:

  • Creating/Destroying:
    • pthread_cond_init
    • pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
    • pthread_cond_destroy
  • Waiting on condition:
    • pthread_cond_wait
    • pthread_cond_timedwait - place limit on how long it will block.
  • Waking thread based on condition:
    • pthread_cond_signal
    • pthread_cond_broadcast - wake up all threads blocked by the specified condition variable.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

pthread_mutex_t count_mutex     = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t condition_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t  condition_cond  = PTHREAD_COND_INITIALIZER;

void *functionCount1(void*);
void *functionCount2(void*);
int  count = 0;
#define COUNT_DONE  10
#define COUNT_HALT1  3
#define COUNT_HALT2  6

int main()
{
   pthread_t thread1, thread2;

   pthread_create( &thread1, NULL, functionCount1, NULL);
   pthread_create( &thread2, NULL, functionCount2, NULL);
   pthread_join( thread1, NULL);
   pthread_join( thread2, NULL);

   exit(0);
   return 0;
}

void *functionCount1(void*)
{
   for(;;)
   {
      pthread_mutex_lock( &condition_mutex );
      while( count >= COUNT_HALT1 && count <= COUNT_HALT2 )
      {
         pthread_cond_wait( &condition_cond, &condition_mutex );
      }
      pthread_mutex_unlock( &condition_mutex );

      pthread_mutex_lock( &count_mutex );
      count++;
      printf("Counter value functionCount1: %d\n",count);
      pthread_mutex_unlock( &count_mutex );

      if(count >= COUNT_DONE) return(NULL);
    }
}

void *functionCount2(void*)
{
    for(;;)
    {
       pthread_mutex_lock( &condition_mutex );
       if( count < COUNT_HALT1 || count > COUNT_HALT2 )
       {
          pthread_cond_signal( &condition_cond );
       }
       pthread_mutex_unlock( &condition_mutex );

       pthread_mutex_lock( &count_mutex );
       count++;
       printf("Counter value functionCount2: %d\n",count);
       pthread_mutex_unlock( &count_mutex );

       if(count >= COUNT_DONE) return(NULL);
    }
}

Example

Thread Creation and Termination

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

void *print_message_function(void *ptr);

int main() {
  pthread_t thread1, thread2;
  char *message1 = "Thread 1";
  char *message2 = "Thread 2";
  int iret1, iret2;

  /* Create independent threads each of which will execute function */

  iret1 =
      pthread_create(&thread1, NULL, print_message_function, (void *)message1);
  iret2 =
      pthread_create(&thread2, NULL, print_message_function, (void *)message2);

  /* Wait till threads are complete before main continues. Unless we  */
  /* wait we run the risk of executing an exit which will terminate   */
  /* the process and all threads before the threads have completed.   */

  pthread_join(thread1, NULL);
  pthread_join(thread2, NULL);

  printf("Thread 1 returns: %d\n", iret1);
  printf("Thread 2 returns: %d\n", iret2);
  exit(0);
  return 0;
}

void *print_message_function(void *ptr) {
  char *message;
  message = (char *)ptr;
  printf("%s \n", message);
}

Single parameter

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <iostream>
#include <pthread.h>

using namespace std;

void* say_hello(void* args) {
  int tid = *((int*)args);
  cout << "thread_id #" << tid << " Hello world! " << endl;
  pthread_exit(NULL);
}

int main() {
  pthread_t tids[5];
  int id[5];
  for (int i = 0; i < 5; i++) {
    id[i] = i;
    int ret = pthread_create(&tids[i], NULL, say_hello, (void*)&(id[i]));
    if (ret != 0) {
      cout << "pthread_create error" << ret << endl;
    }
  }
  pthread_exit(NULL);
  return 0;
}

multiple parameters

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <iostream>
#include <cstdlib>
#include <pthread.h>

using namespace std;

#define NUM_THREADS     5

struct thread_data{
  int  thread_id;
  char *message;
};

void *PrintHello(void *threadarg) {
  struct thread_data *my_data;

  my_data = (thread_data*) threadarg;

  cout << "Thread ID : " << my_data->thread_id ;
  cout << " Message : " << my_data->message << endl;

  pthread_exit(NULL);
}

int main() {
  pthread_t threads[NUM_THREADS];
  struct thread_data td[NUM_THREADS];
  int rc;
  int i;

  for (i = 0; i < NUM_THREADS; i++) {
    cout << "main() : creating thread, " << i << endl;
    td[i].thread_id = i;
    td[i].message = (char *)"This is message";
    rc = pthread_create(&threads[i], NULL, PrintHello, (void *)&td[i]);
    if (rc) {
      cout << "Error:unable to create thread," << rc << endl;
      exit(-1);
    }
  }
  pthread_exit(NULL);
}

Returning result

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

struct thread_args
{
  int a;
  double b;
};


struct thread_result
{
  long x;
  double y;
};

void *thread_func(void *args_void)
{
  thread_args *args = (thread_args*)args_void;
  /* The thread cannot return a pointer to a local variable */
  thread_result *res = (thread_result*)malloc(sizeof *res);

  res->x  = 10 + args->a;
  res->y = args->a * args->b;
  return res;
}

int main()
{
  pthread_t threadL;
  thread_args in = { .a = 10, .b = 3.141592653 };
  void *out_void;
  thread_result *out;

  pthread_create(&threadL, NULL, thread_func, &in);
  pthread_join(threadL, &out_void);
  out = (thread_result*)out_void;
  printf("out -> x = %ld\tout -> b = %f\n", out->x, out->y);
  free(out);

  return 0;
}

MPI+Pthread

在同一个进程内,不同的thread可以进行MPI的通信

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <iostream>

using namespace std;


void worker(void* data) {
  int* array = (int*)data;
  int receive_data[50];
  // MPI_Recv(receive_data, 50, MPI_INT, );
  MPI_Recv(receive_data, 50, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  for (int i = 0; i < 50; i++)
    cout << receive_data[i] << " ";
  cout << endl;
}

int main(int argc, char *argv[]) {
  int provided;
  MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
  int array[100];
  for (int i = 0; i < 100; i++) {
    array[i] = i + 2;
  }
  pthread_t thread1;
  pthread_create(&thread1, NULL, worker, NULL);
  MPI_Send(array, 50, MPI_INT, 0, 0, MPI_COMM_WORLD);
  MPI_Finalize();
  return 0;
}
updatedupdated2022-01-122022-01-12