Основы технологии MPI на примерах

Параллельное программирование — очень актуальное направление, т.к. большинство современных вычислительных устройств (включая телефоны) являются многоядерными или многопроцессорными. В предыдущей записи я публиковал учебник по OpenMP, однако OpenMP позволяет программировать только системы с общей памятью — в большей части, многоядерные компьютеры. Основной проблемой таких систем является плохая масштабируемость — не так легко и дешево увеличить число ядер в компьютере.

Другой класс параллельных ЭВМ — системы с распределенной памятью, к которым, в частности, относятся кластеры. Они представляют собой набор соединенных сетью компьютеров. Такая система гораздо лучше мастабируется — не составит особого труда купить и подключить в сеть дополнительную сотню компьютеров. Число вычислительных узлов в кластерах измеряется тысячами.

claster_mpi
Кластерная архитектура

Взаимодействуют узлы кластера с помощью передачи сообщений по сети, поэтому стандартом программирования таких систем является MPI (Message Passing Interface). Библиотека MPI предоставляет программисту огромный выбор способов передать сообщение между узлами, в этой статье я постараюсь описать основные способы на простых примерах.

Ключевые особенности MPI

Допустим, есть у нас кластер. Чтобы программа начала на нем выполняться, ее необходимо скопировать на каждый узел, запустить и установить связь между процессами. Эту работу берет на себя утилита mpirun (под Linux) или mpiexec (под Windows), так например, чтобы запустить 5 процессов достаточно написать:

mpirun -np 5 path/your_mpi_program

Однако программа должна быть написана определенным образом. Вообще, технология MPI позволяет как использовать модель SPMD (Single Process, Multiple Data), так и MPMD [1], в этой статье я рассматривают только первый вариант. Далее по тексту узел и процесс будут означать одно и тоже, хотя на одном узле может быть создано несколько процессов (именно так я делаю при запуске примеров статьи, т.к. отлаживаю их на персональном компьютере). Это вводная статья, поэтому тут не пойдет речь о коммуникаторах, в которые могут группироваться процессы.

Суть SPMD заключается в том, что для решения задачи запускается множество одинаковых процессов. На приведенном выше рисунке видно, что пользователь (который вводит данные и хочет получить результат) взаимодействует только с одним узлом кластера. Такой узел называется root и логика его работы должна отличаться, ведь он должен не только взаимодействовать с пользователем, но и, получив исходные данные, выполнить их рассылку остальным процессам. Каждый процесс имеет свой номер (в MPI принят термин «ранг«) в рамках каждого коммуникатора, при этом у root ранг обычно равен нулю.

Процессы обмениваются сообщениями, каждое из которых помимо номеров процесса отправителя и получателя имеет тег, а также тип и количество передаваемых элементов. Тег представляет собой целое число, с помощью которого программа может отличать одни сообщения от других. В силу того, что сегодня мы рассмотрим очень простые примеры, тег нам не особо пригодится (можно было бы везде использовать, скажем, «ноль» вместо тега). Все поступающие процессу сообщения помещаются в очередь, из которой могут быть извлечены в соответствии с запрашиваемыми параметрами (тегом, отправителем и т.п.).

В связи с тем, что MPI-программа обладает множеством особенностей, компилироваться она должна специальным компилятором. Под Linux для этого используется mpic++, а под Windows можно применять расширение для Microsoft Visual Studio. Для сборки примеров статьи под Linux я использовал примерно следующую команду:

mpic++ main.cpp -o main

Разобравшись с особенностями технологии, перейдем к примерам. В этой статье мы будем пытаться посчитать сумму элементов массива — несмотря на простоту, эта задача позволяет продемонстрировать самые различные способы передачи данных между узлами.

Операции точка-точка MPI

Блокирующие операции — MPI_Send, MPI_Recv, MPI_Probe

Операции точка-точка позволяют передавать данные между парой узлов. Самые простые функции этого вида — MPI_Send и MPI_Recv, выполняющие передачу и прием сообщения, соответственно:

MPI_Send(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm)
MPI_Recv(void* message, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status* status)

Функция MPI_Send выполняет передачу count элементов типа datatype, начиная с адреса определенного buf, процессу с номером dest в коммуникаторе comm. При этом сообщению присваивается некоторый тег.

Функция MPI_Recv выполняет прием, при этом она ожидает появления во входном буфере сообщения с заданными параметрами (выполнение программы не продолжится до того, как такое сообщение поступит). Функция также возвращает статус, который может содержать код ошибки.

Рассмотрим первый пример:

#include "mpi.h"
#include <iostream>

using namespace std;

const int Tag = 0;
const int root = 0;

double sum_array(double *array, int n) {
  double sum = 0;
  for (int i = 0; i < n; ++i) {
    sum += array[i];
  }
  return sum;
}

int main() {
  int rank, commSize;
  
  MPI_Init(NULL, NULL);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &commSize);
  
  double *arr, sum = 0, buffer;
  int n;
  MPI_Status status;
  
  if (root == rank) {
    cout << "n : ";
    cin >> n;
    
    arr = new double[n];
    for (int i = 0; i < n; ++i) 
      cin >> arr[i];
    
    int partSize = n/commSize;
    
    int shift = n%commSize;
    for (int i = root+1; i < commSize; ++i) {
      MPI_Send(arr + shift + partSize*i, partSize, MPI_DOUBLE, i, Tag, MPI_COMM_WORLD);
    }
    
    sum = sum_array(arr, shift + partSize);
    
    for (int i = root+1; i < commSize; ++i) {
      MPI_Recv(&buffer, 1, MPI_DOUBLE, i, Tag, MPI_COMM_WORLD, &status);
      sum += buffer;
    }
  }
  else {
    MPI_Probe(root, Tag, MPI_COMM_WORLD, &status);
    MPI_Get_count(&status, MPI_DOUBLE, &n);
    
    arr = new double[n];
    MPI_Recv(arr, n, MPI_DOUBLE, root, Tag, MPI_COMM_WORLD, &status);
    
    sum = sum_array(arr, n);
    
    MPI_Send(&sum, 1, MPI_DOUBLE, root, Tag, MPI_COMM_WORLD);
  }
  
  delete[] arr;
  
  cout << rank << " : " << sum << endl;

  MPI_Finalize();
}

Все обращения к функциям MPI должны размещаться между MPI_Init и MPI_Finalize. В качестве коммуникатора в этом примере используется MPI_COMM_WORLD, который включает в себя все процессы, запущенные для текущей задачи с помощью mpirun. Каждый процесс получает свой ранг с помощью вызова MPI_Comm_rank, количество процессов в коммуникаторе возвращает функция MPI_Comm_size.

В теле программы явно выделяется два блока кода — один выполняется главным процессом, а другой — остальными. Главный процесс занимается вводом/выводом данных, и их распределением между остальными узлами. Все остальные процессы ожидают от главного часть массива, вычисляют сумму элементов и передают результат назад. Чтобы принять массив, дочерние процессы должны сначала выделить память, но для этого необходимо знать сколько именно элементов им будет передано — сделать это можно с помощью функции MPI_Probe, которая работает также как MPI_Send (блокирует процесс до поступления в очередь сообщения с заданными параметрами), но не удаляет сообщение из очереди, а лишь возвращает его статус. Структура типа MPI_Status содержит тип элементов и их количество.

Операции MPI_Send, MPI_Probe и MPI_Recv являются синхронными (блокирующими), т.к. останавливают выполнение основного потока процесса до тех пор, пока не выполнится какое-либо действие (данные не будут записаны в сокет или не поступит сообщение с требуемыми параметрами).

Асинхронные операции — MPI_Isend

Приведенный выше пример работает прекрасно, но бутылочным горлышком является нулевой процесс, т.к. перед тем как приступить к обработке своей части массива root должен передать данные всем остальным процессами. Немного улучшить картину можно с помощью неблокирующих (асинхронных) операций — одной из них является MPI_Isend.

Функции неблокирующего обмена имеют такие же аргументы как и их блокирующие аналоги, но в качестве дополнительного параметра принимают параметр типа MPI_Request. Чтобы нулевой процесс в приведенном выше примере работал асинхронно достаточно изменить лишь этот фрагмент кода:

for (int i = root+1; i < commSize; ++i) {
  MPI_Isend(arr + shift + partSize*i, partSize, 
            MPI_DOUBLE, i, Array, MPI_COMM_WORLD, &request);
  MPI_Request_free(&request);
}

Теперь функция передачи возвращает управление немедленно, сама же передача происходит параллельно с выполнением других команд процесса. Узнать о ходе выполнения асинхронной операции мы сможем с помощью специальных функций:

int MPI_Wait(MPI_Request *request, MPI_Status *status);
int MPI_Test(MPI_Request *request, int *flag, MPI_Status *status);

Функция MPI_Wait является блокирующей — она останавливает выполнение процесса до тех пор, пока передача, связанная с request не будет завершена. Функция MPI_Test является неблокирующей (не дожидается окончания выполнения операции) — о том была завершена операция или нет, она сигнализирует с помощью флага (true — завершена).

В нашем примере в использовании функций MPI_Wait и MPI_Test нет необходимости, т.к. синхронно выполняется сбор результатов вычислений процессов. Главный процесс инициирует передачу данных и приступает к обработке своей части массива, после этого синхронно ожидает поступления данных от каждого по очереди процесса. Кстати, асинхронной может быть не только передача, но и прием данных — функция MPI_Irecv.

Чтобы избежать ошибок, необходимо представлять как именно может быть реализована работа неблокирующих функций. Например, MPI_Isend инициирует передачу данных, которая выполняется в отдельном потоке параллельно. По окончанию передачи этот поток должен изменить переменную request. Это значит, что такой код вполне может привести к ошибкам:

for (int i = root+1; i < commSize; ++i) {
  MPI_Request request;
  MPI_Isend(arr + shift + partSize*i, partSize, MPI_DOUBLE, 
            i, Array, MPI_COMM_WORLD, &request);
}

Тут на каждой итерации цикла создается экземпляр переменной типа MPI_Request, инициируется передача и объект разрушается (память освобождается). Это значит, что поток, выполняющий передачу обратится к памяти, которая будет освобождена (и возможно уже повторно распределена для других целей), а значит — программа будет вести себя непредсказуемо. Вывод — объект request должен существовать до завершения асинхронной передачи.

Вызов функции MPI_Request_free необходим в случаях если не требуется ждать окончания асинхронной передачи и при этом хочется использовать один экземпляр MPI_Request — в нашем случае один и тот же объект используется для передачи данных всем дочерним процессам. За более детальной информацией о работе асинхронных операций MPI предлагаю обратиться к стандарту (раздел 3.7 Nonblocking Communication) [3].

Буферизованная передача — MPI_Bsend

В стандарте MPI сказано, что функция MPI_Send вернет управление после того, как сообщение будет записано в выходной буфер, я же выше писал что это сокет (пусть это не совсем точно) — стандарт в этом месте дает некоторую свободу, возможны различные реализации. Однако, «выходной буфер» в стандарте — это однозначно не область в оперативной памяти, т.к. для работы с буфером в ОЗУ предусмотрена другая функция — MPI_Bsend.

Функция MPI_Send записывает данные в сокет и только после этого возвращает управление. Функция MPI_Isend вернет управление сразу, но нам все равно нельзя будет изменять передаваемые данные, пока они не будут записаны в сокет. При этом, работа с сокетом выполняется медленнее, чем с оперативной памятью — поэтому ускорить работу программы в ряде случаев можно путем копирования данных в некоторый буфер в оперативной памяти и передачей данных из этого буфера. Изменять отправляемые данные мы сможем сразу после того, как они будут скопированы. Примерно так и работает функция MPI_Bsend. Сложность реализации такой операции вручную заключается также в том, что после отправки данных память из под буфера нужно освободить.

Чтобы выделенная область памяти использовалась в качестве буфера при передачи в буферизованном режиме — необходимо использовать функцию MPI_Buffer_attach. Буфер может состоять из нескольких «присоединенных» областей. В разделе 3.6.1 стандарта [3] говорится о том, что буфер может быть устроен как циклическая очередь сообщений, отправленные сообщения из буфера удаляются, позже на их место могут быть записаны другие сообщения.

При использовании буферизованного режима исходный код будет не сильно отличаться от приведенного выше, ниже приведен только фрагмент, которого коснулись изменения:

int message_buffer_size = n*sizeof(double) + MPI_BSEND_OVERHEAD;
double* message_buffer = (double*)malloc(message_buffer_size);
MPI_Buffer_attach(message_buffer, message_buffer_size);
    
int shift = n%commSize;
for (int i = root+1; i < commSize; ++i) {
  MPI_Bsend(arr + shift + partSize*i, partSize, MPI_DOUBLE, i, Tag, MPI_COMM_WORLD);
}
    
sum = sum_array(arr, shift + partSize);
    
for (int i = root+1; i < commSize; ++i) {
  MPI_Recv(&buffer, 1, MPI_DOUBLE, i, Tag, MPI_COMM_WORLD, &status);
  sum += buffer;
}
    
MPI_Buffer_detach(message_buffer, &message_buffer_size);
free(message_buffer);

Перед использованием функции выделяется память под буфер, т.к. в этой памяти размещается очередь — следовательно есть некоторые издержки (размер которых зависит от конкретной реализации стандарта), поэтому необходимо выделять чуть больше памяти (MPI_BSEND_OVERHEAD). Выделенная область прикрепляется к буферу, затем используется MPI_Bsend (аргументы функции полностью совпадают с MPI_Send, буфер в качестве аргумента не передается, т.к. он является глобальным для процесса). После того, как все сообщения будут переданы — буфер можно отсоединить, а память освободить.

Коллективные операции. Пример использования MPI_Reduce

Коллективные операции выполняются всеми процессами указанного коммуникатора. Ниже приведена картинка из стандарта [3], на которой показана суть некоторых операций:

MPI_collective_communication
Коллективные операции MPI

Верхняя часть схемы иллюстрирует операцию MPI_Bcast, которая позволяет передать некоторые данные с одного узла кластера на все остальные. Нижняя — соответствует операциям MPI_Scatter и MPI_Gather. Если у нас на узле U есть массив из N элементов и его части необходимо передать на P узлов кластера — можно использовать функцию MPI_Scatter. Проблем не возникнет если N делится нацело на P, т.к. при выполнении MPI_Scatter все узлы получат одинаковое количество элементов. Обратную операцию выполняет MPI_Gather, т.е. собирает данные со всех P узлов на узел U.

Эти операции являются синхронными и используют MPI_Send (это закреплено стандартом), однако существуют асинхронные аналоги — MPI_Ibcast, MPI_Igather и MPI_Iscatter.

Операция MPI_Bcast теоретически (зависит от реализации библиотеки) может работать более эффективно и выполняться за \(O(log(n))\) операций вместо \(O(n)\).

MPI_Reduce_and_Bcast_efficiency
Эффективная реализация MPI_Reduce и MPI_Bcast

На приведенной схеме цветом выделен узел, на котором находятся передаваемые данные. В начале работы такой узел один. После первой передачи данные есть уже на двух узлах, оба они могут участвовать в передачи. При реализации такой схемы для передачи данных на 1000 узлов будет достаточно 10 операций. Таким же образом может работать операция MPI_Reduce:

A more efficient implementation is achieved by taking advantage of associativity and using a logarithmic tree reduction. [3]

Операция MPI_Reduce не просто передает данные, но и выполняет над ними заданную операцию. В нашем примере применить ее можно вместо сбора результатов вычисления сумм:

if (root == rank) {
  cout << "n : ";
  cin >> n;
  
  arr = new double[n];
  for (int i = 0; i < n; ++i) 
    cin >> arr[i];
  
  int partSize = n/commSize;
  
  int shift = n%commSize;
  for (int i = root+1; i < commSize; ++i) {
    MPI_Send(arr + shift + partSize*i, partSize, MPI_DOUBLE, i, Tag, MPI_COMM_WORLD);
  }
  
  sum = sum_array(arr, shift + partSize);
}
else {
  MPI_Probe(root, Tag, MPI_COMM_WORLD, &status);
  MPI_Get_count(&status, MPI_DOUBLE, &n);
  
  arr = new double[n];
  MPI_Recv(arr, n, MPI_DOUBLE, root, Tag, MPI_COMM_WORLD, &status);
  
  sum = sum_array(arr, n);
}

double global_sum = 0;
MPI_Reduce(&sum, &global_sum, 1, MPI_DOUBLE, MPI_SUM, root, MPI_COMM_WORLD);

if (rank == root) {
  cout << "sum: " << global_sum << endl;
}

Операция MPI_Reduce может выполняться не только над числами, но и над массивами (при этом будет применена к каждому его элементу отдельно).

Заключение

Целью статьи было продемонстрировать ряд функций библиотеки MPI и показать, что реализовать их вручную не так легко — этим привлечь внимание к библиотеке.

Пример с функцией MPI_Isend наглядно демонстрирует насколько сложно реализовать аналогичное поведение вручную. Дело не только в том, что передача выполняется в отдельном потоке. Сложно придумать механизм лучший, чем работа с MPI_Request, однако к объекту request может параллельно обращаться несколько потоков, поэтому все эти операции защищаются семафорами.

Еще более наглядным является пример с MPI_Bsend, т.к. вручную реализовать циклический буфер сообщений (который, кстати, тоже защищается семафорами) — не простая задача. Однако, в библиотеке MPI гораздо больше функций типа точка-точка. Различие между ними заключается в преставлении о «завершенной передаче», т.е. моменте когда блокирующая операция вернет управление или для асинхронной операции завершится MPI_Wait.

Примеры статьи являются полностью искусственными — именно по этой причине я не стал приводить таблицу с временем выполнения программы для различных вариантов реализации. Дело в том, что кластерная архитектура (а следовательно и MPI) подходит не всех типов задач — необходимо учитывать затраты на передачу данных. Так, для вычисления на кластере суммы элементов массива необходимо выполнить, порядка \(O(n)\) операций передачи, но \(O(\frac{n}{p})\) операций сложения может быть выполнено параллельно. Трудоемкость вычислений будет оцениваться \(O(n)\) при любом количестве узлов кластера. Подробнее об оценке трудоемкости параллельных алгоритмов советую прочитать в книге Миллера [4].

Вспомогательная литература

  1. MPMD Launch Mode [Электронный ресурс] – режим доступа: https://software.intel.com/en-us/mpi-developer-guide-linux-mpmd-launch-mode. Дата обращения: 08.02.2018.
  2. Подключение MPI в Visual Studio [Электронный ресурс] – режим доступа: https://pro-prof.com/forums/topic/подключение-mpi-в-visual-studio. Дата обращения: 08.02.2018.
  3. MPI: A Message-Passing Interface Standard Version 3.1 \\ Message Passing Interface Forum, June 4, 2015 [Электронный ресурс] – режим доступа: mpi-forum.org/docs/mpi-3.1/mpi31-report.pdf. Дата обращения: 08.02.2018.
  4. Миллер, Р. Последовательные и параллельные алгоритмы: Общий подход / Р. Миллер, Л. Боксер ; пер. с англ. – М. : БИНОМ. Лаборатория знаний, 2006. – 406 с.

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *