Accelerating Technology

本文记录性能调优,加速程序运行的相关技术

Deep learning

大数据AllReduce

已知集合通信对于小数据量的表现比较差,进行多次的小数据量的通信不如将小数据组成一个大数据,再进行一次集合通信。

Gradient Bucketing

Gradient bucketing is motivated by the observation that collective communications are more efficient on large tensors.

Instead of launching a dedicated AllReduce immediately when each gradient tensor becomes available, we can achieve higher throughput and lower latency if it waits for a short period of time and buckets multiple gradients into one AllReduce operation.

The optimal value of bucket size need be measured by each use cases.

Overlap computation with communication

While the layer backward phase is finished, the AllReduce operation will be activated.

prioritize gradient

In order to start the next training iteration early, we can prioritize gradient synchronizations and parameter updates based on the forward order instead of the backward order. This means gradient buckets containing the initial layers should receive higher priorities than those in the final layers.

MPI

Broadcast

Common Way

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include "core/base.h"
#include "mpi.h"

using namespace std;

void func2(MatrixD& x, int size) {
  double start, finish;
  start = MPI_Wtime();
  int np, me;
  MPI_Comm_size(MPI_COMM_WORLD,&np);
  MPI_Comm_rank(MPI_COMM_WORLD,&me);

  x.resize(size * 1000, 1000);
  if (me == 0) {
    for (int i = 0; i < 20; i++)
      (x.data())[i] = 5;
  }
  MPI_Bcast(x.data(), x.size(), MPI_DOUBLE, 0, MPI_COMM_WORLD);
  finish = MPI_Wtime();
  if (me == 0)
    cout << 8 * size  << " M broadcast cost time: " << finish - start << " seconds" << endl;
}
int main(int argc, char ** argv) {
  MPI_Init(&argc, &argv);
  MatrixD X;
  for (int i = 1; i < 1000; i++) {
    func2(X, i);
  }
  MPI_Finalize();
  return 0;
}

在500个核的情况下,进行实验可得出广播的通讯时延与通讯数据大小基本成线性关系。

data size 8M 80M 200M 400 M 800M
time 0.074 s 0.21 s 0.464 s 0.94 s 1.96 s

inter-node

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include "core/base.h"
#include "mpi.h"

using namespace std;

void func(MatrixD& x, int scale) {

  double start, finish;
  // start = MPI_Wtime();
  int np, me;
  MPI_Comm_size(MPI_COMM_WORLD,&np);
  MPI_Comm_rank(MPI_COMM_WORLD,&me);

  /* create the shared-memory (per-node) communicator */
  MPI_Comm innode_comm = MPI_COMM_NULL;
  MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &innode_comm);

  MPI_Comm internode_comm = MPI_COMM_NULL;
  int lrank;
  MPI_Comm_rank(innode_comm, &lrank);
  MPI_Comm_split(MPI_COMM_WORLD, lrank, 0, &internode_comm);

  MPI_Comm comm_shared = MPI_COMM_NULL;
  MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &comm_shared);
  int localnp, localme;
  MPI_Comm_size(comm_shared,&localnp);
  MPI_Comm_rank(comm_shared,&localme);

  double* shared_ptr = NULL;
  int size = 0;
  if (localme == 0) {
    size = 1000 * 1000 * scale;
  }


  /* allocate the shared-memory window */
  MPI_Win win_shared = MPI_WIN_NULL;
  MPI_Win_allocate_shared(sizeof(double) * size, sizeof(double), MPI_INFO_NULL, comm_shared, &shared_ptr, &win_shared);



  if (lrank == 0) {

    MPI_Group world_group, innode_group, internode_group;
    MPI_Comm_group(MPI_COMM_WORLD, &world_group);
    MPI_Comm_group(internode_comm, &internode_group);
    int global_rank = 0, internode_rank;
    MPI_Group_translate_ranks(world_group, 1, &global_rank, internode_group, &internode_rank);
    for (int i = 0; i < 20; i++)
      shared_ptr[i] = me;

    start = MPI_Wtime();
    MPI_Bcast(shared_ptr, size, MPI_DOUBLE, internode_rank, internode_comm);
    // MPI_Bcast(A, size, MPI_INT, 0, innode_comm);

    finish = MPI_Wtime();
    if (me == 0)
<<<<<<< HEAD
      cout << 8 * scale << " M broadcast cost time: " << finish - start << " seconds" << endl;
=======
    cout << 8 * scale << " M broadcast cost time: " << finish - start << " seconds" << endl;
>>>>>>> b3957219ff8fb4be792797d2db8956c60f6b05ff
  }

  // finish = MPI_Wtime();
  // if (me == 0)
  //   cout << "partial broadcast cost time: " << finish - start << " seconds" << endl;

  // int rank = 0;
  // MPI_Aint lsize = 0;
  // int ldisp = 0;
  // double* lbase = NULL;
  // MPI_Win_shared_query(win_shared, rank, &lsize, &ldisp, &lbase);
  // x.resize(10000, 10000);
  // Map<MatrixD> x_m(lbase, 10000, 10000);
  // x = x_m;

  // MPI_Barrier(MPI_COMM_WORLD);
  MPI_Win_free(&win_shared);
}



int main(int argc, char ** argv) {
  MPI_Init(&argc, &argv);
  MatrixD X;
  for (int i = 1; i <= 100; i++)
    func(X, i);
  MPI_Finalize();
  return 0;
}

在500个核的情况下,进行实验可得出广播的通讯时延与通讯数据大小基本成线性关系。

data size 8M 80M 200M 400 M 800M
time 0.02 s 0.14 s 0.358 s 0.761 s 1.55 s

inter-node then in-node

广播数据到各个进程

  • 方式1: 采用节点间先广播,然后节点内在进行广播
  • 方式2: 直接基于进程进行广播

在超算平台上进行实验得到的结论是:如果数据量不大的情况下两种方式耗时差不多,无明显差异,但是当数据量达到400M时候,方式1比方式2耗时更大。可利用的加速方式是可以让数据在节点间进行广播,然后在节点内采用内存共享的方式进行数据的获取。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <mpi.h>

using namespace std;

int main(int argc, char ** argv)
{
    MPI_Init(&argc, &argv);

    int np, me;
    MPI_Comm_size(MPI_COMM_WORLD,&np);
    MPI_Comm_rank(MPI_COMM_WORLD,&me);

    /* create the shared-memory (per-node) communicator */
    MPI_Comm innode_comm = MPI_COMM_NULL;
    MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &innode_comm);

    MPI_Comm internode_comm = MPI_COMM_NULL;
    int lrank;
    MPI_Comm_rank(innode_comm, &lrank);
    MPI_Comm_split(MPI_COMM_WORLD, lrank, 0, &internode_comm);

    int size = 1000000000;
    int* A = new int [size];
    if (me == 0) {
      for (int i = 0; i < size; i++)
        A[i] = i + 5;
    }

    // double start, finish;
    // start = MPI_Wtime();
    // if (lrank == 0) {
    //   MPI_Group world_group, innode_group, internode_group;
    //   MPI_Comm_group(MPI_COMM_WORLD, &world_group);
    //   MPI_Comm_group(internode_comm, &internode_group);
    //   int global_rank = 0, internode_rank;
    //   MPI_Group_translate_ranks(world_group, 1, &global_rank, internode_group, &internode_rank);
    //   MPI_Bcast(A, size, MPI_INT, internode_rank, internode_comm);
    //   MPI_Bcast(A, size, MPI_INT, 0, innode_comm);
    // } else {
    //   MPI_Bcast(A, size, MPI_INT, 0, innode_comm);
    // }
    // finish = MPI_Wtime();
    // if (me == 0)
    //   cout << "broadcast cost time: " << finish - start << " seconds" << endl;

    double start, finish;
    start = MPI_Wtime();
    MPI_Bcast(A, size, MPI_INT, 0, MPI_COMM_WORLD);
    finish = MPI_Wtime();
    if (me == 0)
      cout << "common broadcast cost time: " << finish - start << " seconds" << endl;
    MPI_Finalize();
    return 0;
}

reference

  1. PyTorch Distributed: Experiences on Accelerating Data Parallel Training

  2. Parity models: Erasure-coded resilience for prediction serving systems

updatedupdated2021-11-062021-11-06