MPI笔记

本文记录MPI相关的笔记

MPI Notes

语法

  • MPI_Send(msg_buf_p, msg_size, msg_type, dest, tag, communicator)

    MPI_Send的精确行为由实现决定:可能当消息大于一个阈值,该函数将被阻塞,小于该值将被缓冲。若信息没有被正确接受,信息将会丢失或者该进程被悬挂。

  • int MPI_Isend(const void* buf, int count, MPI_Datatype datatype, int dest,int tag, MPI_Comm comm, MPI_Request *request)

  • MPI_Recv(msg_buf_p, buf_size, buf_type, source, tag, communicator, status_p)

    其中 source 可以赋值为一个特殊的常量 MPI_ANY_TAG,即可以接受任意源的信息。特殊常量 MPI_ANY_TAG 可以接受任意 tag 的信息。该函数调用即被阻塞。如果没有相匹配的信息接受,该进程将被悬挂。对于status_p 可以使用 MPI_STATUS_IGNORE

  • int MPI_Irecv(void* buf, int count, MPI_Datatype datatype, int source,int tag, MPI_Comm comm, MPI_Request *request)

  • int MPI_Get_count(status_p, type, count_p)

    获取接收到的数据量大小

  • int MPI_Reduce(input_data_p, output_data_p,count, datatype, operator, dest_process, comm)

    如果count大于1,可以用于向量进行操作。operator归约操作符。输入地址与输出地址不能一样,非法 。

  • int MPI_Allreduce(input_data_p, output_data_p, count, datatype, operator, comm)

    所有的进程都获得全局操作后的结果。在input_data_p使用MPI_IN_PLACE可以在input_data_p的存储位置放置结果内容。

    支持的操作:

    • MPI_MAX : maximum
    • MPI_MIN : minimum
    • MPI_SUM : sum
    • MPI_PROD : product
    • MPI_LAND : logical and
    • MPI_BAND : bit-wise and
    • MPI_LOR : logical or
    • MPI_BOR : bit-wise or
    • MPI_LXOR : logical exclusive or (xor)
    • MPI_BXOR : bit-wise exclusive or (xor)
    • MPI_MAXLOC : max value and location
    • MPI_MINLOC : min value and location
  • int MPI_Bcast(dta_p, count, datatype, source_proc, comm)

    广播数据

  • int MPI_Scatter(send_buf_p, send_count, send_type, recv_buf_p, recv_count, recv_type, src_proc, comm)

    会将send_buf_p上的数据分成comm_sz份,依次发送给这些进程。该方法适用于块划分法,且向量的分量个数n可以整除comm_sz的情况。send_count表示的是发送给每个进程的数据量。每个进程必须发送或者接受同样数量的数据。

  • MPI_SCATTERV(sendbuf, sendcounts, displs, sendtype, recvbuf, recvcount, recvtype, root,comm)

    实现将不同数量的数据发送之各个进程

  • int MPI_Gather(send_buf_p, send_count, send_type, recv_buf_p, recv_count, recv_type, dest_proc, comm)

    将向量的所有分量搜集至目标进程。每个进程必须发送或者接受同样数量的数据。

  • int MPI_Allgather(send_buf_p, send_count, send_type, recv_buf_p, recv_count, recv_type, comm)

    将每个进程的send_buf_p的内容串联起来,存储到每个进程的rcv_buf_p参数中,相当于进行了一次MPI_Gather后在调用MPI_Bcast

  • int MPI_Type_create_struct(int count, int array_of_blocklengths[], array_of_displacements[], array_of_types[], array_of_types[], new_type_p)

    用于构建由不同基本数据类型的元素所组成的派生数据类型

  • int MPI_Get_address(location_p, address_p)

    返回的是location_p所指向的内存单元的地址

  • int MPI_Type_ccommit(new_mpi_t_p)

    允许MPI实现为了在通信函数内使用这一新创建的数据类型

  • int MPI_Type_free(old_mpi_t_p)

    当使用完新的数据类型时,可用该函数去释放额外的存储空间

  • double MPI_Wtime(void)

    返回从过去某一时刻开始所经过的秒数

  • int MPI_Barrier(MPI_Comm comm)

    确保同一个通信子中的所有进程都完成调用该函数之前,没有进程能够提前返回。

  • int MPI_Wait(MPI_Request *request, MPI_Status *status)

  • int MPI_Waitall(int count, MPI_Request array_of_requests[], MPI_Status array_of_statuses[])

  • MPI_Waitany(count, requests, index, status)

  • MPI_Waitsome(count, requests, done, index, status)

  • int MPI_Comm_create(MPI_Comm comm, MPI_Group group, MPI_Comm *newcomm)

    按照group中的进程构建新的通信域

  • int MPI_Comm_dup(MPI_Comm comm, MPI_Comm *newcomm)

    复制通信域

  • int MPI_Group_translate_ranks(MPI_Group group1, int n, const int ranks1[], MPI_Group group2, int ranks2[])

    用于已知该进程在原通信域的rank,想知道该进程在另一个通信域上的rank.

  • int MPI_Comm_free(MPI_Comm *comm)

    释放通信域的资源

  • int MPI_Win_create(void *base, int size, int disp_unit, MPI_Info info,MPI_Comm comm, MPI_Win *win)

    创建RMA操作的内存空间窗口“window”

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    
    int main(int argc, char ** argv)
    {
        int *a;
        MPI_Win win;
        MPI_Init(&argc, &argv);
        /* create private memory */
        MPI_Alloc_mem(1000*sizeof(int), MPI_INFO_NULL, &a);
        /* use private memory like you normally would */
        a[0] = 1; a[1] = 2;
        /* collectively declare memory as remotely accessible */
        MPI_Win_create(a, 1000*sizeof(int), sizeof(int),
        MPI_INFO_NULL, MPI_COMM_WORLD, &win);
        /* Array ‘a’ is now accessibly by all processes in
        * MPI_COMM_WORLD */
        MPI_Win_free(&win);
        MPI_Free_mem(a);
        MPI_Finalize(); return 0;
    }
    
  • int MPI_Win_allocate(MPI_Aint size, int disp_unit,MPI_Info info,MPI_Comm comm, void *baseptr, MPI_Win *win)

    开辟一块空间并创建RMA操作的窗口

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    int main(int argc, char ** argv)
    {
        int *a;
        MPI_Win win;
        MPI_Init(&argc, &argv);
        /* collectively create remote accessible memory in a window */
        MPI_Win_allocate(1000*sizeof(int), sizeof(int), MPI_INFO_NULL,
        MPI_COMM_WORLD, &a, &win);
        /* Array ‘a’ is now accessible from all processes in
        * MPI_COMM_WORLD */
        MPI_Win_free(&win);
        MPI_Finalize(); return 0;
    }
    
  • int MPI_Win_create_dynamic(MPI_Info info, MPI_Comm comm, MPI_Win *win)

    创建RMA操作窗口,未来再分配内存

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    
    int main(int argc, char ** argv)
    {
        int *a;
        MPI_Win win;
        MPI_Init(&argc, &argv);
        MPI_Win_create_dynamic(MPI_INFO_NULL, MPI_COMM_WORLD, &win);
        /* create private memory */
        a = (int *) malloc(1000 * sizeof(int));
        /* use private memory like you normally would */
        a[0] = 1; a[1] = 2;
        /* locally declare memory as remotely accessible */
        MPI_Win_attach(win, a, 1000*sizeof(int));
        /* Array ‘a’ is now accessible from all processes */
        /* undeclare remotely accessible memory */
        MPI_Win_detach(win, a); free(a);
        MPI_Win_free(&win);
        MPI_Finalize(); return 0;
    }
    
  • MPI_Put(void *origin_addr, int origin_count, MPI_Datatype origin_dtype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_dtype, MPI_Win win)

    从原进程移数据到目标进程

  • MPI_Get(void *origin_addr, int origin_count,MPI_Datatype origin_dtype, int target_rank,MPI_Aint target_disp, int target_count,MPI_Datatype target_dtype, MPI_Win win)

    从目标进程获取数据至原进程

  • MPI_Accumulate(void *origin_addr, int origin_count,MPI_Datatype origin_dtype, int target_rank,MPI_Aint target_disp, int target_count,MPI_Datatype target_dtype, MPI_Op op, MPI_Win win)

    执行聚合操作,结果放置目标进程

  • MPI_Get_accumulate(void *origin_addr, int origin_count,MPI_Datatype origin_dtype, void *result_addr,int result_count, MPI_Datatype result_dtype,int target_rank, MPI_Aint target_disp,int target_count, MPI_Datatype target_dype,MPI_Op op, MPI_Win win)

    执行聚合操作,结果放置原进程

  • MPI_Fetch_and_op(void *origin_addr, void *result_addr,MPI_Datatype dtype, int target_rank,MPI_Aint target_disp, MPI_Op op, MPI_Win win)

  • MPI_Compare_and_swap(void *origin_addr, void *compare_addr,void *result_addr, MPI_Datatype dtype, int target_rank,MPI_Aint target_disp, MPI_Win win)

  • int MPI_Init_thread(int *argc, char ***argv, int required, int *provided)

    • MPI_THREAD_SINGLE Only one thread will execute.
    • MPI_THREAD_FUNNELED The process may be multi-threaded, but the application must ensure that only the main thread makes MPI calls (for the definition of main thread, see MPI_IS_THREAD_MAIN on page 488).
    • MPI_THREAD_SERIALIZED The process may be multi-threaded, and multiple threads may make MPI calls, but only one at a time: MPI calls are not made concurrently from two distinct threads (all MPI calls are “serialized”).
    • MPI_THREAD_MULTIPLE Multiple threads may call MPI, with no restrictions.
  • int MPI_Get_processor_name(char *name, int *resultlen)

1
2
3
int rlen = 0;
char pname[MPI_MAX_PROCESSOR_NAME+1] = {0};
MPI_Get_processor_name(pname, &rlen);

This routine returns the name of the processor on which it was called at the moment of the call.

  • int MPI_Win_allocate_shared(MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, void *baseptr, MPI_Win *win)

  • called collectively by processes in comm
  • processes in comm must be those that can access shared memory(e.g., processes on the same compute node)
  • by default, a contiguous region of memory is allocated and shared (noncontiguous allocation is also possible, and may be more efficient as each contributed region could be page aligned)
  • each process contributes size bytes to the contiguous region; size can be different for each process and can be zero
  • the contribution to the shared region is in order by rank
  • baseptr is the pointer to a process’s contributed memory (not the beginning of the shared region) in the address space of the process

how it work:

  • Process with rank 0 in comm allocates the entire shared memory region for all processes
  • Other processes attach to this shared memory region
  • The entire memory region may reside in a single locality domain, which may not be desirable
  • Therefore, using noncontiguous allocation may be advantageous

This is a collective call executed by all processes in the group of comm. On each process i, it allocates memory of at least size bytes that is shared among all processes in comm, and returns a pointer to the locally allocated segment in baseptr that can be used for load/store accesses on the calling process. The locally allocated memory can be the target of load/store accesses by remote processes; the base pointers for other processes can be queried using the function MPI_WIN_SHARED_QUERY.

  • int MPI_Win_shared_query(MPI_Win win, int rank, MPI_Aint *size, int *disp_unit, void *baseptr)

  • baseptr returns the address (in the local address space) of the beginning of the shared memory segment contributed by another process, the target rank
  • also returns the size of the segment and the displacement unit
  • if rank is MPI_PROC_NULL, then the address of the beginning of the first memory segment is returned
  • this function could be useful if processes contribute segments of different sizes (so addresses cannot be computed locally), or if noncontiguous allocation is used
  • in many programs, knowing the “owner” of each segment may not be necessary
  • MPI_Probe(int source, int tag, MPI_Comm comm, MPI_Status* status) You can think of MPI_Probe as an MPI_Recv that does everything but receive the message .
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int number_amount;
if (world_rank == 0) {
    const int MAX_NUMBERS = 100;
    int numbers[MAX_NUMBERS];
    // Pick a random amount of integers to send to process one
    srand(time(NULL));
    number_amount = (rand() / (float)RAND_MAX) * MAX_NUMBERS;

    // Send the random amount of integers to process one
    MPI_Send(numbers, number_amount, MPI_INT, 1, 0, MPI_COMM_WORLD);
    printf("0 sent %d numbers to 1\n", number_amount);
} else if (world_rank == 1) {
    MPI_Status status;
    // Probe for an incoming message from process zero
    MPI_Probe(0, 0, MPI_COMM_WORLD, &status);

    // When probe returns, the status object has the size and other
    // attributes of the incoming message. Get the message size
    MPI_Get_count(&status, MPI_INT, &number_amount);

    // Allocate a buffer to hold the incoming numbers
    int* number_buf = (int*)malloc(sizeof(int) * number_amount);

    // Now receive the message with the allocated buffer
    MPI_Recv(number_buf, number_amount, MPI_INT, 0, 0,
             MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    printf("1 dynamically received %d numbers from 0.\n",
           number_amount);
    free(number_buf);
}

多条信息整合

  1. 不同通信函数中的count参数

  2. 派生数据类型

    一个派生数据类型是由一系列的MPI基本数据类型和每个数据类型的偏移所组成的

    {(MPI_DOUBLE, 0), (MPI_DOUBLE, 16), (MPI_INT, 24)}

    每一对数据项的第一个元素表明数据类型,第二个元素是该数据项相对于起始位置的偏移。

  3. MPI_Pack / Unpack

  4. 基础数据类型

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
typedef enum _MPI_Datatype {
  MPI_DATATYPE_NULL          = 0x0c000000,
  MPI_CHAR                   = 0x4c000101,
  MPI_UNSIGNED_CHAR          = 0x4c000102,
  MPI_SHORT                  = 0x4c000203,
  MPI_UNSIGNED_SHORT         = 0x4c000204,
  MPI_INT                    = 0x4c000405,
  MPI_UNSIGNED               = 0x4c000406,
  MPI_LONG                   = 0x4c000407,
  MPI_UNSIGNED_LONG          = 0x4c000408,
  MPI_LONG_LONG_INT          = 0x4c000809,
  MPI_LONG_LONG              = MPI_LONG_LONG_INT,
  MPI_FLOAT                  = 0x4c00040a,
  MPI_DOUBLE                 = 0x4c00080b,
  MPI_LONG_DOUBLE            = 0x4c00080c,
  MPI_BYTE                   = 0x4c00010d,
  MPI_WCHAR                  = 0x4c00020e,
  MPI_PACKED                 = 0x4c00010f,
  MPI_LB                     = 0x4c000010,
  MPI_UB                     = 0x4c000011,
  MPI_C_COMPLEX              = 0x4c000812,
  MPI_C_FLOAT_COMPLEX        = 0x4c000813,
  MPI_C_DOUBLE_COMPLEX       = 0x4c001614,
  MPI_C_LONG_DOUBLE_COMPLEX  = 0x4c001615,
  MPI_2INT                   = 0x4c000816,
  MPI_C_BOOL                 = 0x4c000117,
  MPI_SIGNED_CHAR            = 0x4c000118,
  MPI_UNSIGNED_LONG_LONG     = 0x4c000819,
  MPI_CHARACTER              = 0x4c00011a,
  MPI_INTEGER                = 0x4c00041b,
  MPI_REAL                   = 0x4c00041c,
  MPI_LOGICAL                = 0x4c00041d,
  MPI_COMPLEX                = 0x4c00081e,
  MPI_DOUBLE_PRECISION       = 0x4c00081f,
  MPI_2INTEGER               = 0x4c000820,
  MPI_2REAL                  = 0x4c000821,
  MPI_DOUBLE_COMPLEX         = 0x4c001022,
  MPI_2DOUBLE_PRECISION      = 0x4c001023,
  MPI_2COMPLEX               = 0x4c001024,
  MPI_2DOUBLE_COMPLEX        = 0x4c002025,
  MPI_REAL2                  = MPI_DATATYPE_NULL,
  MPI_REAL4                  = 0x4c000427,
  MPI_COMPLEX8               = 0x4c000828,
  MPI_REAL8                  = 0x4c000829,
  MPI_COMPLEX16              = 0x4c00102a,
  MPI_REAL16                 = MPI_DATATYPE_NULL,
  MPI_COMPLEX32              = MPI_DATATYPE_NULL,
  MPI_INTEGER1               = 0x4c00012d,
  MPI_COMPLEX4               = MPI_DATATYPE_NULL,
  MPI_INTEGER2               = 0x4c00022f,
  MPI_INTEGER4               = 0x4c000430,
  MPI_INTEGER8               = 0x4c000831,
  MPI_INTEGER16              = MPI_DATATYPE_NULL,
  MPI_INT8_T                 = 0x4c000133,
  MPI_INT16_T                = 0x4c000234,
  MPI_INT32_T                = 0x4c000435,
  MPI_INT64_T                = 0x4c000836,
  MPI_UINT8_T                = 0x4c000137,
  MPI_UINT16_T               = 0x4c000238,
  MPI_UINT32_T               = 0x4c000439,
  MPI_UINT64_T               = 0x4c00083a,
  MPI_AINT                   = 0x4c00083b (_WIN64), 0x4c00043b,
  MPI_OFFSET                 = 0x4c00083c,
  MPI_FLOAT_INT              = 0x8c000000,
  MPI_DOUBLE_INT             = 0x8c000001,
  MPI_LONG_INT               = 0x8c000002,
  MPI_SHORT_INT              = 0x8c000003,
  MPI_LONG_DOUBLE_INT        = 0x8c000004
} MPI_Datatype;

实现树形reduce

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
int nodeval[P];
forall (index in (0..P-1)) {
    int tally;
    stride = 1;
    ...                        //Compute tally here
    while(stride < P) {            //Begin logic for tree
        if (index % (2*stride) == 0) {
            tally = tally + nodeval[index+stride];
            stride=2*stride
        } else {
            nodeval[index] = tally;   //Send initially to tree node
            break;      // Exit, if no longer a paraent
        }
    }
}

常用函数

  • MPI_Wtime() 返回过去某一时刻开始所经过的秒数

常用代码块

新建通信域

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include "mpi.h"
#include <cstdio>
#include <cstdlib>
#include <iostream>

#define LEN 5
using namespace std;

void func(MPI_Comm comm) {
  int rank;
  MPI_Comm_rank(comm, &rank);
  cout << "func rank: " << rank << endl;;
}

int main(int argc, char *argv[])
{
    MPI_Init(&argc, &argv);
    int global_rank, global_size;
    MPI_Comm_rank(MPI_COMM_WORLD, &global_rank);
    MPI_Comm_size(MPI_COMM_WORLD, &global_size);

    MPI_Group world_group;
    MPI_Comm_group(MPI_COMM_WORLD, &world_group);

    int n = 3;
    const int ranks[3] = {1,3,5};

    // 从world_group进程组中构造出来两个进程组
    MPI_Group group1, group2;
    MPI_Group_incl(world_group, n, ranks, &group1);
    MPI_Group_excl(world_group, n, ranks, &group2);
    // 根据group1 group2分别构造两个通信域
    MPI_Comm comm1, comm2;
    MPI_Comm_create(MPI_COMM_WORLD, group1, &comm1);
    MPI_Comm_create(MPI_COMM_WORLD, group2, &comm2);
    // 在已有进程组的基础上创建新的通信域,newcomm有两种结果:如果调用MPI_Comm_create的当前进程在group中,则newcomm就是新产生的通信域对象;如果调用MPI_Comm_create的当前进程不在group中,则newcomm就是MPI_COMM_NULL。
    int ori = 1, after;
    MPI_Group_translate_ranks(world_group, 1, &ori, group1, &after);
    if (after != MPI_PROC_NULL) {
      cout << "original rank: " << ori << " after rank: " << after << endl;
    }

    int local_rank, local_size;
    if (comm1 != MPI_COMM_NULL) {
      func(comm1);
      MPI_Comm_rank(comm1, &local_rank);
      cout << "global rank: " << global_rank << " local rank: " << local_rank << endl;
    }
    MPI_Finalize();
}

Intra-node communicator

1
2
3
4
5
6
7
8
//Use this number to split the input communicator
MPI_Get_processor_name(procname,&len);
node_key = name_to_colour(procname);
MPI_Comm_split(input,node_key,0,&local);
// local is now a communicator for the local node
// Now we can make communicators across nodes
MPI_Comm_rank(local,&lrank);
MPI_Comm_split(input,lrank,0,&cross);
1
2
3
4
5
6
MPI_Comm comm_shared = MPI_COMM_NULL;
MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &comm_shared);

int localnp, localme;
MPI_Comm_size(comm_shared,&localnp);
MPI_Comm_rank(comm_shared,&localme);

异步通信

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <stdio.h>
#include "mpi.h"

int main(int argc,char** argv)
{
  int rank, size;
  int tag, destination, count;
  int buffer; //value to send

  tag = 1234;
  destination = 2; //destination process
  count = 1; //number of elements in buffer

  MPI_Status status;
  MPI_Request request = MPI_REQUEST_NULL;

  MPI_Init(&argc, &argv);

  MPI_Comm_size(MPI_COMM_WORLD, &size); //number of processes
  MPI_Comm_rank(MPI_COMM_WORLD, &rank); //rank of current process

  if (destination >= size) {
    MPI_Abort(MPI_COMM_WORLD, 1); // destination process must be under the number of processes created, otherwise abort
  }

  if (rank == 0) {
    printf("Enter a value to send to processor %d:\n", destination);
    scanf("%d", &buffer);
    MPI_Isend(&buffer, count, MPI_INT, destination, tag, MPI_COMM_WORLD, &request); //non blocking send to destination process
  }

  if (rank == destination) {
    MPI_Irecv(&buffer, count, MPI_INT, 0, tag, MPI_COMM_WORLD, &request); //destination process receives
  }

  MPI_Wait(&request, &status); //bloks and waits for destination process to receive data

  if (rank == 0) {
    printf("processor %d sent %d\n", rank, buffer);
  }
  if (rank == destination) {
    printf("processor %d got %d\n", rank, buffer);
  }

  MPI_Finalize();

  return 0;
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <iostream>
#include "mpi.h"

using namespace std;

void Sender() {
  int process_id;
  int master_id = 0;
  int process_size;
  MPI_Comm_rank(MPI_COMM_WORLD, &process_id);
  MPI_Comm_size(MPI_COMM_WORLD, &process_size);
  MPI_Status status;
  MPI_Request request = MPI_REQUEST_NULL;
  int tag = 8 * process_id;
  for (int i = 0; i < process_size; i++) {
    if (process_id != master_id && process_id == i) {
      MPI_Isend(&process_id, 1, MPI_INT, master_id, tag, MPI_COMM_WORLD, &request);
    }
  }
}

void Receiver() {
  int process_id;
  int master_id = 0;
  int process_size;
  MPI_Comm_rank(MPI_COMM_WORLD, &process_id);
  MPI_Comm_size(MPI_COMM_WORLD, &process_size);
  MPI_Status status;
  MPI_Request request = MPI_REQUEST_NULL;
  int tag;
  int msg;
  for (int i = 0; i < process_size - 1; i++) {
    if (process_id == master_id) {
      MPI_Irecv(&msg, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &request);
      MPI_Wait(&request, &status);
      cout << "msg = " << msg << " tag = "  << status.MPI_TAG << " src = " << status.MPI_SOURCE << endl;
    }
  }

}

int main(int argc, char ** argv) {
  MPI_Init(&argc, &argv);
  Sender();
  Receiver();
  MPI_Finalize();
  return 0;
}

Send with dynamic length.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <iostream>
#include "mpi.h"

using namespace std;

void Sender() {
  int process_id;
  int master_id = 0;
  int process_size;
  MPI_Comm_rank(MPI_COMM_WORLD, &process_id);
  MPI_Comm_size(MPI_COMM_WORLD, &process_size);
  MPI_Status status;
  MPI_Request request = MPI_REQUEST_NULL;

  int count = process_id + 5;
  int* msg = new int[count];
  for (int i = 0; i < count; i++)
    msg[i] = 0;
  msg[process_id] = 1;

  int tag = 8 * process_id;
  if (process_id != master_id) {
    MPI_Isend(msg, count, MPI_INT, master_id, tag, MPI_COMM_WORLD, &request);
  }
  delete[] msg;
}

void Receiver() {
  int process_id;
  int master_id = 0;
  int process_size;
  MPI_Comm_rank(MPI_COMM_WORLD, &process_id);
  MPI_Comm_size(MPI_COMM_WORLD, &process_size);
  MPI_Status status;
  MPI_Request request = MPI_REQUEST_NULL;

  int tag;
  for (int i = 0; i < process_size - 1; i++) {
    if (process_id == master_id) {
      MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
      int count;
      MPI_Get_count(&status, MPI_INT, &count);
      int* msg = new int[count];
      cout << "src = " << status.MPI_SOURCE << "count = " << count << endl;
      MPI_Recv(msg, count, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
      for (int i = 0; i < count - 1; i++) {
        cout << msg[i];
      }
      cout << endl;
      delete[] msg;
    }
  }

}

int main(int argc, char ** argv) {
  MPI_Init(&argc, &argv);
  Sender();
  Receiver();
  MPI_Finalize();
  return 0;
}

MPI RMA

对于Put/Get操作的先后顺序没有保证。

MPI提供的三种同步模型:

  1. Fence: MPI_Win_fence(int assert, MPI_Win win)

    所有进程的操作(Put/Get等)会在第二次调用fence前完成。两次epoch调用之间称为一个epoch

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//Start up MPI....
MPI_Win win;

if (rank == 0) {
    /* Everyone will retrieve from a buffer on root */
    int soi = sizeof(int);
    MPI_Win_create(buf,soi*20,soi,MPI_INFO_NULL,comm,&win); }
else {
    /* Others only retrieve, so these windows can be size 0 */
    MPI_Win_create(NULL,0,sizeof(int),MPI_INFO_NULL,comm,&win);
}

/* No local operations prior to this epoch, so give an assertion */
MPI_Win_fence(MPI_MODE_NOPRECEDE,win);

if (rank != 0) {
    /* Inside the fence, make RMA calls to GET from rank 0 */
    MPI_Get(buf,20,MPI_INT,0,0,20,MPI_INT,win);
}

/* Complete the epoch - this will block until MPI_Get is complete */
MPI_Win_fence(0,win);

/* All done with the window - tell MPI there are no more epochs */
MPI_Win_fence(MPI_MODE_NOSUCCEED,win);

/* Free up our window */
MPI_Win_free(&win)

//shut down...
  1. Post-start-complete-wait (generalized active target)

    MPI_Win_post/start(MPI_Group grp, int assert, MPI_Win win) MPI_Win_complete/wait(MPI_Win win)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
        //Start up MPI...
    MPI_Group comm_group, group;

    for (i=0;i<3;i++) {
        ranks[i] = i;     //For forming groups, later
    }
    MPI_Comm_group(MPI_COMM_WORLD,&comm_group);

    /* Create new window for this comm */
    if (rank == 0) {
        MPI_Win_create(buf,sizeof(int)*3,sizeof(int),
            MPI_INFO_NULL,MPI_COMM_WORLD,&win);
    }
    else {
        /* Rank 1 or 2 */
        MPI_Win_create(NULL,0,sizeof(int),
            MPI_INFO_NULL,MPI_COMM_WORLD,&win);
    }
    /* Now do the communication epochs */
    if (rank == 0) {
        /* Origin group consists of ranks 1 and 2 */
        MPI_Group_incl(comm_group,2,ranks+1,&group);
        /* Begin the exposure epoch */
        MPI_Win_post(group,0,win);
        /* Wait for epoch to end */
        MPI_Win_wait(win);
    }
    else {
        /* Target group consists of rank 0 */
        MPI_Group_incl(comm_group,1,ranks,&group);
        /* Begin the access epoch */
        MPI_Win_start(group,0,win);
        /* Put into rank==0 according to my rank */
        MPI_Put(buf,1,MPI_INT,0,rank,1,MPI_INT,win);
        /* Terminate the access epoch */
        MPI_Win_complete(win);
    }

    /* Free window and groups */
    MPI_Win_free(&win);
    MPI_Group_free(&group);
    MPI_Group_free(&comm_group);

    //Shut down...
  1. Lock/Unlock (passive target)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
    //Start up MPI...
    MPI_Win win;

    if (rank == 0) {
        /* Rank 0 will be the caller, so null window */
        MPI_Win_create(NULL,0,1,
            MPI_INFO_NULL,MPI_COMM_WORLD,&win);
        /* Request lock of process 1 */
        MPI_Win_lock(MPI_LOCK_SHARED,1,0,win);
        MPI_Put(buf,1,MPI_INT,1,0,1,MPI_INT,win);
        /* Block until put succeeds */
        MPI_Win_unlock(1,win);
        /* Free the window */
        MPI_Win_free(&win);
    }
    else {
        /* Rank 1 is the target process */
        MPI_Win_create(buf,2*sizeof(int),sizeof(int),
            MPI_INFO_NULL, MPI_COMM_WORLD, &win);
        /* No sync calls on the target process! */
        MPI_Win_free(&win);
    }

Examples

Eigen matrix

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <iostream>
#include <Include/Base.h>
#include "mpi.h"

using namespace std;

int main(int argc, char* argv[]){
  MPI_Init(&argc, &argv);
  MatrixD m;
  Matrix<double, Dynamic, Dynamic, ColMajor> col_m;
  int rank;
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);

  MPI_Win win;
  if (rank == 0) {
    /* Everyone will retrieve from a buffer on root */
    m.resize(6, 6);
    int soi = sizeof(double);
    for (int i = 0; i < 36; i++) {
      m(i / 6, i % 6) = i;
    }
    col_m = m;
    MPI_Win_create(col_m.data(),soi*36,soi,MPI_INFO_NULL,MPI_COMM_WORLD,&win);
  }
  else {
    /* Others only retrieve, so these windows can be size 0 */
    MPI_Win_create(NULL,0,sizeof(double),MPI_INFO_NULL,MPI_COMM_WORLD,&win);
  }

  /* No local operations prior to this epoch, so give an assertion */
  MPI_Win_fence(MPI_MODE_NOPRECEDE,win);

  if (rank != 0) {
    /* Inside the fence, make RAM calls to GET from rank 0 */
    // Matrix<double, 6, 2, ColMajor> col_m;
    col_m.resize(6, 2);
    // m.resize(6, 2);
    MPI_Get(col_m.data(), 12, MPI_DOUBLE, 0, rank * 6, 12, MPI_DOUBLE, win);
    // m = col_m;
  }

  /* Complete the epoch - this will block until MPI_Get is complete */
  MPI_Win_fence(0,win);
  if (rank != 0)
    m = col_m;
  if (rank == 0)
    cout << m << endl;
  MPI_Barrier(MPI_COMM_WORLD);
  if (rank == 1)
    cout << m << endl;
  MPI_Barrier(MPI_COMM_WORLD);
  if (rank == 2)
    cout << m << endl;
  /* All done with the window - tell MPI there are no more epochs */
  MPI_Win_fence(MPI_MODE_NOSUCCEED,win);

  /* Free up our window */
  MPI_Win_free(&win);

    //shut down...


    MPI_Finalize();
    return 0;
}

RMA example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/*
 * @Brief: file description
 * @Author: liudy
 * @Email: deyin.liu@nscc-gz.cn
 * @Date: 2021-07-25 20:35:28
 * @LastEditors: liudy
 * @LastEditTime: 2021-07-25 21:11:53
 */
#include <iostream>
#include "mpi.h"

using namespace std;

void RMA() {
  MPI_Win win;
  int buf[20];
  int rank;
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm comm = MPI_COMM_WORLD;
  /* Everyone will retrieve from a buffer on root */
  MPI_Win_create(buf, sizeof(int) * 20, sizeof(int), MPI_INFO_NULL, comm, &win);
  for (size_t i = 0; i < 20; i++) {
    buf[i] = rank * i;
  }

  /* No local operations prior to this epoch, so give an assertion */
  MPI_Win_fence(MPI_MODE_NOPRECEDE, win);

  int msg[20];
  if (rank == 1) {
    /* Inside the fence, make RMA calls to GET from rank 0 */
    MPI_Get(msg, 10, MPI_INT, 2, 10, 10, MPI_INT, win);
  }

  /* Complete the epoch - this will block until MPI_Get is complete */
  MPI_Win_fence(0, win);

  if (rank == 1) {
    for (size_t i = 0; i < 10; i++) {
      cout << msg[i] << endl;
    }
  }

  /* All done with the window - tell MPI there are no more epochs */
  MPI_Win_fence(MPI_MODE_NOSUCCEED, win);

  /* Free up our window */
  MPI_Win_free(&win);
}

int main(int argc, char* argv[]) {
  MPI_Init(&argc, &argv);
  RMA();
  MPI_Barrier(MPI_COMM_WORLD);
  MPI_Finalize();
  return 0;
}

passive example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/*
 * @Brief: file description
 * @Author: liudy
 * @Email: deyin.liu@nscc-gz.cn
 * @Date: 2021-07-25 20:35:28
 * @LastEditors: liudy
 * @LastEditTime: 2021-07-27 22:51:42
 */
#include <iostream>
#include "mpi.h"

using namespace std;

class RMA {
 public:
  void AccessDate(MPI_Win& win) {
    int rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    // int msg[20];

    if (rank == 0) {
      MPI_Win_lock(MPI_LOCK_SHARED, 1, 0, win);
      MPI_Get(msg, 10, MPI_INT, 1, 0, 10, MPI_INT, win);
      MPI_Win_flush(1, win);
      for (size_t i = 0; i < 10; i++) {
        cout << msg[i] << endl;
      }

      MPI_Get(msg, 10, MPI_INT, 1, 10, 10, MPI_INT, win);
      MPI_Win_unlock(1, win);
      cout << "=============================" << endl;
      for (size_t i = 0; i < 10; i++) {
        cout << msg[i] << endl;
      }
    }

    if (rank == 2) {
      MPI_Win_lock(MPI_LOCK_SHARED, 1, 0, win);
      MPI_Get(msg, 10, MPI_INT, 1, 10, 10, MPI_INT, win);
      MPI_Win_unlock(1, win);

      for (size_t i = 0; i < 10; i++) {
        cout << msg[i] << endl;
      }
    }
  }
  void rma() {
    MPI_Win win;
    // int buf[20];
    int rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm comm = MPI_COMM_WORLD;
    /* Everyone will retrieve from a buffer on root */
    MPI_Win_create(buf, sizeof(int) * 20, sizeof(int), MPI_INFO_NULL, comm,
                   &win);
    for (size_t i = 0; i < 20; i++) {
      buf[i] = rank * i;
    }
    AccessDate(win);
    /* Free up our window */
    MPI_Win_free(&win);
  }
  private:
  int buf[20];
  int msg[20];
};

int main(int argc, char* argv[]) {
  MPI_Init(&argc, &argv);
  RMA rma;
  rma.rma();
  MPI_Barrier(MPI_COMM_WORLD);
  MPI_Finalize();
  return 0;
}

Shared-memory

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <mpi.h>

using namespace std;

int main(int argc, char ** argv)
{
    MPI_Init(&argc, &argv);

    int np, me;
    MPI_Comm_size(MPI_COMM_WORLD,&np);
    MPI_Comm_rank(MPI_COMM_WORLD,&me);

    int rlen = 0;
    char pname[MPI_MAX_PROCESSOR_NAME+1] = {0};
    MPI_Get_processor_name(pname, &rlen);

    printf("Hello from %d of %d processors (name=%s)\n", me, np, pname);

    /* create the shared-memory (per-node) communicator */
    MPI_Comm comm_shared = MPI_COMM_NULL;
    MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &comm_shared);

    int localnp, localme;
    MPI_Comm_size(comm_shared,&localnp);
    MPI_Comm_rank(comm_shared,&localme);

    /* allocate the shared-memory window */
    MPI_Win win_shared = MPI_WIN_NULL;
    int* baseptr = NULL;
    MPI_Win_allocate_shared(sizeof(int) * 10, sizeof(int), MPI_INFO_NULL, comm_shared, &baseptr, &win_shared);
    for (int i = 0; i < 10; i++)
      baseptr[i] = me;

    if (localme == 1) {
      int rank = localme;
      MPI_Aint lsize = 0;
      int ldisp = 0;
      int* lbase = NULL;
      MPI_Win_shared_query(win_shared, rank, &lsize, &ldisp, &lbase);
      // printf("global %d of %d, local %d of %d, size=%zu, disp=%d, base=%p\n",
      //         me, np, localme, localnp, lsize, ldisp, lbase);
      for(int j = 0; j < 20; j++)
        cout << lbase[j] << " ";
      cout << endl;
    }

    MPI_Win_free(&win_shared);

    MPI_Finalize();
    return 0;
}

MPI_MAXLOC 使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
struct {
  double val;
  int rank;
} in[2], out[2];
int i, myrank, root = 0;

MPI_Comm_rank(MPI_COMM_WORLD, &myrank);

if (myrank == 0) {
  in[0].val = 10;
  in[0].rank= 0;
  in[1].val = 1;
  in[1].rank = 0;
}

if (myrank == 1) {
  in[0].val = 1;
  in[0].rank= 1;
  in[1].val = 5;
  in[1].rank = 1;
}

if (myrank == 2) {
  in[0].val = 2;
  in[0].rank= 2;
  in[1].val = 2;
  in[1].rank = 2;
}

MPI_Reduce(in, out, 2, MPI_DOUBLE_INT, MPI_MAXLOC, root, MPI_COMM_WORLD);
if (myrank == 0) {
  for (i = 0; i < 2; ++i) {
    cout << out[i].val << " " << out[i].rank << endl;
  }
}

若是出现 两个进程内的 val 值相等,则 MPI 会继续比较 rank 的值选取 rank 值更小的。

mpi 支持的类型

support type description
[ MPI_FLOAT_INT] float and int
[ MPI_DOUBLE_INT] double and int
[ MPI_LONG_INT] long and int
[ MPI_2INT] pair of int
[ MPI_SHORT_INT] short and int
[ MPI_LONG_DOUBLE_INT] long double and int

MPI-IO

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <iostream>
#include "mpi.h"

using namespace std;

void TestWrite() {
  int size, rank;
  int n, m;
  float *array;
  MPI_File fh;
  MPI_Status status;
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);

  MPI_File_open(MPI_COMM_WORLD, "data", MPI_MODE_CREATE | MPI_MODE_WRONLY,
                MPI_INFO_NULL, &fh);
  unsigned long c = (unsigned long)rank;
  MPI_File_write_at(fh, rank * sizeof(unsigned long), &c, 1, MPI_UNSIGNED_LONG, &status);
  cout << "write " << c << endl;
  MPI_File_close(&fh);
}

void TestRead() {
  int size, rank;
  int n, m;
  float *array;
  MPI_File fh;
  MPI_Status status;
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);

  MPI_File_open(MPI_COMM_WORLD, "data", MPI_MODE_RDONLY,
                MPI_INFO_NULL, &fh);
  unsigned long c;
  MPI_File_read_at(fh, rank * sizeof(unsigned long), &c, 1, MPI_UNSIGNED_LONG, &status);
  cout << "read " << c << endl;
  MPI_File_close(&fh);
}
int main(int argc, char* argv[]) {
  MPI_Init(&argc, &argv);
  TestWrite();
  TestRead();
  MPI_Finalize();
  return 0;
}

需要注意MPI_File_openMPI_File_close是collective operation.

常用函数

  1. Function for determining which ranks are common to a compute node:
1
MPI_Comm_split_type (comm, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm);
  1. Function for mapping group ranks to global ranks:
1
MPI_Group_translate_ranks

注意

  • 大部分MPI实现只允许MPI_COMM_WORLD的0号进程访问标准输入stdin

  • P2P通信函数MPI_Send, MPI_Isend, MPI_Recv, MPI_Irecv可以混合使用。比如可以使用MPI_Recv接收MPI_Isend发的数据。

  • 在使用异步通信的时候使用MPI_Barrier函数只能保证所有的进程都执行到这里跟数据的收发是否完成没有关系。要保证和检测异步通信的"完成"需使用MPI_Wait,MPI_Test。对于MPI_Isend, 其中的“完成”并不是只整个通信完成(可能数据被缓冲到系统的buffer了),只是表明发送数据的地址可以被重用。对于MPI_Irecv, 其中“完成”是指用来接收的buffer收到了信息,可以访问信息了。

内存泄漏检测

安装软件

1
2
sudo apt install valgrind
sudo apt install valgrind-mpi

使用

1
2
mpic++ -g *.cpp
mpirun -n 2 valgrind --tool=memcheck --leak-check=full ./a.out

参考

  1. Parallel Computation One-sided communication

  2. MPI Remote Memory Access Programming (MPI3-RMA)and Advanced MPI Programming

  3. Lecture 35: More on One Sided Communication

  4. Remote Memory Acccess

updatedupdated2021-11-122021-11-12