Как стать автором
Обновить

Часть 3. MPI — Как процессы общаются? Сообщения типа точка-точка

Время на прочтение11 мин
Количество просмотров23K

В этом цикле статей речь идет о параллельном программировании с использованием MPI.

В прошлой статье мы обсудили как распределяется работа между процессами, зачем нужно знать какой процесс выполняется на конкретном потоке и как фиксировать время выполнение работы программы. Что дальше?
Иногда нам требуется остановить один/все процессы чтобы они подождали какого-либо действия системы, пользователя, пересылать одни локальные данные с процесса в другой процесс, ведь, напомню, они работают с независимой памятью и изменение переменной в одном потоке не приведет к изменению переменной с тем же именем в другом процессе, более формально процессы работают в непересекающихся множествах адресов памяти, а изменение данных в одном из них никак не повлияет на другое множество(собственно из-за того, что они непересекающиеся).


Предисловие

Практически все программы с применением технологии MPI используют не только средства для порождения процессов и их завершения, но и одну из самых важных частей, заложенных в названии самой технологии (Message Passing Interface), конечно же явная посылка сообщений между процессами. Описание этих процедур начнем, пожалуй, с операций типа точка-точка.

Давайте представим себе ту самую картинку где много компьютеров соединены линиями, стандартная иллюстрация сети(локальной), только на месте узлов будут стоять отдельные процессы, процессоры, системы и т.п. Собственно отсюда становится понятно к чему клонит название "операции типа точка-точка", два процесса общаются друг с другом, пересылают какие-либо данные между собой.

Работает это так: один из процессов, назовем его P1, какого-то коммуникатора C должен указать явный номер процесса P2, который также должен быть под коммуникатором С, и с помощью одной из процедур передать ему данные D, но на самом деле не обязательно нужно знать номер процесса, но это мы обсудим далее.

Процедуры в свою очередь разделены на 2 типа: с блокировкой и без блокировки.
1. Процедуры обмена с блокировкой просто останавливают процесс который должен принять сообщение до момента выполнения какого-либо условия.
2. Процедуры без блокировки, как их ещё называют асинхронные, они возвращаются сразу после осуществления коммуникации, немедленно.

Оба типа процедур мы затронем подробно, а пока начнем с процедур с блокировкой.

Передаем и принимаем сообщения

Наконец приступим к практической части. Для передачи сообщений используется процедура MPI_Send. Эта процедура осуществляет передачу сообщения с блокировкой. Синтаксис у нее следующий:

int MPI_Send(void* buf, int count, MPI_Datatype datatype, int dest, 
						 int msgtag, MPI_Comm comm);

Что тут есть что:
buf - ссылка на адрес по которому лежат данные, которые мы пересылаем. В случае массивов ссылка на первый элемент.
count - количество элементов в этом массиве, если отправляем просто переменную, то пишем 1.
datatype - тут уже чутка посложнее, у MPI есть свои переопределенные типы данных которые существуют в С++. Их таблицу я приведу чуть дальше.
dest - номер процесса кому отправляем сообщения.
msgtag - ID сообщения (любое целое число)
comm - Коммуникатор в котором находится процесс которому мы отправляем сообщение.

А вот как называются основные стандартные типы данных С++ определенные в MPI_Datatype:

Название в MPI

Тип даных в С++

MPI_CHAR

char

MPI_SHORT

signed short

MPI_INT

signed int

MPI_LONG

signed long int

MPI_LONG_LONG

signed long long int

MPI_UNSIGNED_*** (Вместо *** int и т.п.)

unsigned ...

MPI_FLOAT

float

MPI_DOUBLE

double

MPI_LONG_DOUBLE

long double

MPI_INT8_T

int8_t

MPI_INT16_T

int16_t

MPI_C_COMPLEX

float _Complex

Аналогичным способом указываются и другие типы данных определенные в стандартной библиотеке С/С++ - MPI_[Через _ в верхнем регистре пишем тип так как он назван в С]. Еще один пример для закрепления понимания, есть тип беззнаковых 32 битных целых чисел, назван он uint32_t, чтобы получить этот тип данных переопределенным в MPI необходимо написать следующую конструкцию: MPI_UINT32_T. То есть все вполне логично и легко, верхний регистр, вместо пробелов знаки андерскора и в начале пишем MPI.

Блокировка защитит пересылаемые данные от изменений поэтому не стоит опасаться за корретность отправленных данных, после того как коммуникация завершится и выполнится какое либо условие, то процесс спокойно продолжит заниматься своими делами.

Теперь поговорим о приеме этих сообщений. Для этого в MPI определена процедура MPI_Recv. Она осуществляет, соответственно, блокирующий прием данных. Синтаксис выглядит вот так:

int MPI_Recv(void* buf, int count, MPI_Datatype datatype, int source,
						 int tag, MPI_Comm comm, MPI_Status* status);

Что тут есть что:
buf - ссылка на адрес по которому будут сохранены передаваемые данные.
count - максимальное количество принимаемых элементов.
datatype - тип данных переопределенный в MPI(по аналогии с Send).
source - номер процесса который отправил сообщение.
tag - ID сообщения которое мы принимаем (любое целое число)
comm - Коммуникатор в котором находится процесс от которого получаем сообщение.
status - структура, определенная в MPI которая хранит информацию о пересылке и статус ее завершения.

Тут все практически идентично процедуре Send, только появился аргумент статуса пересылки. Зачем он нужен? Не всегда нужно явно указывать от какого процесса приходит сообщение, какой тег сообщения мы принимаем, чтобы избавиться от неопределенности MPI сохраняет информацию которая не указана в процессе-преемнике явно и мы можем к ней обратиться. Например чтобы узнать процесс который отправил сообщение и тэг этого сообщения:

MPI_Status status;
MPI_Recv(&buffer, 1, MPI_Float, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status)

tag = status.MPI_SOURCE
source = status.MPI_SOURCE

Здесь представлен кусочек возможного кода в процессе который принимает данные не более одного float элемента от любого процесса с любым тегом сообщения. Чтобы узнать какой процесс прислал это сообщение и с каким тэгом нужно собственно воспользоваться структурой MPI_Status.

Заметим появление констант MPI_ANY_SOURCE и MPI_ANY_TAG, они явно указывают, что можно принимать сообщения от любого процесса с любым тэгом.

Дабы закрепить знания об этих двух процедурах приведу пример как это выглядит на практике, данная программа определяет простые числа на заданном интервале:

#include <stdio.h>
#include <time.h>
#include "mpi.h"

#define RETURN return 0
#define FIRST_THREAD 0

int* get_interval(int, int, int*);
inline void print_simple_range(int, int);
void wait(int);

int main(int argc, char **argv)
{
	// инициализируем необходимые переменные
	int thread, thread_size, processor_name_length;
	int* thread_range, interval;
	double cpu_time_start, cpu_time_fini;
	char* processor_name = new char[MPI_MAX_PROCESSOR_NAME * sizeof(char)];

	MPI_Status status;
	interval = new int[2];
	
	// Инициализируем работу MPI
	MPI_Init(&argc, &argv);
	
	// Получаем имя физического процессора
	MPI_Get_processor_name(processor_name, &processor_name_length);
	
	// Получаем номер конкретного процесса на котором запущена программа
	MPI_Comm_rank(MPI_COMM_WORLD, &thread);
	
	// Получаем количество запущенных процессов
	MPI_Comm_size(MPI_COMM_WORLD, &thread_size);
	
	// Если это первый процесс, то выполняем следующий участок кода
	if(thread == FIRST_THREAD)
	{
		// Выводим информацию о запуске
		printf("----- Programm information -----\n");
		printf(">>> Processor: %s\n", processor_name);
		printf(">>> Num threads: %d\n", thread_size);
		printf(">>> Input the interval: ");

		// Просим пользователья ввести интервал на котором будут вычисления
		scanf("%d %d", &interval[0], &interval[1]);

		// Каждому процессу отправляем полученный интервал с тегом сообщения 0. 
		for (int to_thread = 1; to_thread < thread_size; to_thread++) 
      MPI_Send(&interval, 2, MPI_INT, to_thread, 0, MPI_COMM_WORLD);

		// Начинаем считать время выполнения
		cpu_time_start = MPI_Wtime();
	}
	// Если процесс не первый, тогда ожидаем получения данных
	else 
    MPI_Recv(&interval, 2, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);

	// Все процессы запрашивают свой интервал
	range = get_interval(thread, thread_size, interval);

	// После чего отправляют полученный интервал в функцию которая производит вычисления
	print_simple_range(range[0], range[1]);

	// Последний процесс фиксирует время завершения, ожидает 1 секунду и выводит результат
	if(thread == thread_size - 1)
	{
		cpu_time_fini = MPI_Wtime();
		wait(1);
		printf("CPU Time: %lf ms\n", (cpu_time_fini - cpu_time_start) * 1000);
	}

	MPI_Finalize();
	RETURN;
}

int* get_interval(int proc, int size, int interval)
{
	// Функция для рассчета интервала каждого процесса
	int* range = new int[2];
	int interval_size = (interval[1] - interval[0]) / size;

	range[0] = interval[0] + interval_size * proc;
	range[1] = interval[0] + interval_size * (proc + 1);
	range[1] = range[1] == interval[1] - 1 ? interval[1] : range[1];
	return range;
}

inline void print_simple_range(int ibeg, int iend)
{
	// Прострейшая реализация определения простого числа
	bool res;
	for(int i = ibeg; i <= iend; i++)
	{
		res = true;
		while(res)
		{
			res = false;
			for(int j = 2; j < i; j++) if(i % j == 0) res = true;
			if(res) break;
		}
		res = not res;
		if(res) printf("Simple value ---> %d\n", i);

	}
}
void wait(int seconds)
 {
 	// Функция ожидающая в течение seconds секунд
	clock_t endwait;
	endwait = clock () + seconds * CLOCKS_PER_SEC ;
	while (clock() < endwait) {};
}

Поясню основные моменты и идею. Здесь передача сообщений задействована дабы остановить все процессы кроме первого до того момента пока пользователь не введет необходимые данные. Как только мы вводим требуемый интервал для вычисления, то процессы получают эту информацию от того потока, который занимался ее сбором и начинают свою работу независимо. В конце последний процесс фиксирует время вычислений, ожидает одну секунду(дабы остальные уж точно завершили за это время свою работу) и выводит результат расчета времени. Почему же именно последний?
Нужно учитывать тот факт, что интервал который приходит от пользователя разбивается на N равных частей, а значит последнему процессу достанется часть с самыми большими числами, вследствие вычисления займут наибольшее время именно в нем, а значит и программа в худшем случае отработает именно за это время.

Делаем прием данных более гибким

Не всегда мы имеем представление для конкретного процесса о том какой длины придут данные, для определения существует как раз выше упомянутая структура MPI_Status и некоторые процедуры помогающие эту информацию оттуда извлечь.

Первая процедура которую мы обсудим следующая:

int MPI_Get_count(MPI_Status* status, MPI_Datatype datatype, int* count);

По структуре status процедура определяет сколько данных типа datatype передано соответствующим сообщением и записывает результат по адресу count.

То есть буквально, если мы получаем сообщение от какого-либо процесса и не знаем сколько точно там передано данных, то можем вызвать процедуру MPI_Get_count и узнать какое количество ячеек памяти мы можем считать заполненными корректными данными(если конечно их отправляющий процесс корректно формирует).

Также есть еще одна процедура MPI_Get_elements. По синтаксису они отличаются лишь названиями, но назначение слегка разное. Если в сообщении передаются данные не базового типа, а типа который является производным от базовых(то есть составлен с помощью базовых типов), то нам вернет не количество этих данных, а именно количество данных базового типа. Однако в случае если передаются данные базового типа, то функции вернут одинаковые значения.

Также иногда случается так, что нам надо просто пропустить отправку сообщения, но саму процедуру из кода исключать не хочется, либо делать лишние условия в коде не рационально. В таких случаях процесс может отправить сообщение не существующему процессу, номер такого процесса определен константой MPI_PROC_NULL. В случае если мы передаем сообщение такому процессу, то процедура сразу завершается с кодом возврата SUCCESS.

Хорошо, мы можем принимать на вход какие либо данные и не знать сколько их точно поступает. В таком случае нужно рационально выделять какой-то объем памяти для их сохранения(буферизации). Возникает логичный вопрос о том какой объем выделять, в этом нам поможет процедура MPI_Probe. Она позволяет получить информацию о сообщении которое ожидает в очереди на прием не получая самого сообщения. Синтаксис ее выглядит следующим образом:

int MPI_Probe(int source, int tag, MPI_Comm comm, MPI_Status* status);

Тут мы также определяем от какого процесса получаемое сообщение, с каким тэгом, какой коммуникатор связывает эти процессы и передаем структуру которая запишет необходимую информацию.

Теперь на очень простом примере соединим эти процедуры вместе:

#include <iostream>
#include "mpi.h"

using namespace std;

void show_arr(int* arr, int size)
{
	for(int i=0; i < size; i++) cout << arr[i] << " ";
	cout << endl;
}

int main(int argc, char **argv)
{
	int size, rank;
	MPI_Init(&argc, &argv);
	MPI_Comm_size(MPI_COMM_WORLD, &size);
	MPI_Comm_rank(MPI_COMM_WORLD, &rank);

	if(rank == 0)
	{
		int* arr = new int[size];
		for(int i=0; i < size; i++) arr[i] = i;
		for(int i=1; i < size; i++) MPI_Send(arr, i, MPI_INT, i, 5, MPI_COMM_WORLD);
	}
	else
	{
		int count;
		MPI_Status status;

		MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
		MPI_Get_count(&status, MPI_INT, &count);
		int* buf = new int[count];

		MPI_Recv(buf, count, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
		
		cout << "Process:" << rank << " || Count: " << count << " || Array: ";
		show_arr(buf, count);
	}
	MPI_Finalize();
	return 0;
}

Что тут происходит?
В данной программе первый процесс создает массив размером равным количеству процессов и заполняет его номерами процессов по очереди. Потом соответствующему процессу он отправляет такое число элементов этого массива, какой номер у этого процесса. Напрмер: процесс 1 получит 1 элемент, процесс 2 получит 2 элемента этого массива и так далее.

Следующие же процессы должны принять это сообщение и для этого смотрят в очередь сообщений, видят это сообщение, собирают информацию в структуру status, после чего получают длину переданного сообщения и создают буфер для его сохранения, после чего просто выводят то, что получили. Для 5 запущенных процессов результат будет вот таким:

Process:1 || Count: 1 || Array: 0 
Process:2 || Count: 2 || Array: 0 1 
Process:4 || Count: 4 || Array: 0 1 2 3 
Process:3 || Count: 3 || Array: 0 1 2 

Собственно 4 результата потому что нулевой процесс занимается отправкой этих сообщений.

И еще несколько типов процедур посылки

Отметим, что вызов и возврат из процедуры MPI_Send далеко не гарантирует того, что сообщение покинуло данный процесс, было получено процессом, которому оно отправлено. Гарантия дается только на то, что мы после этой процедуры можем использовать передаваемые данные в своих целях не опасаясь того, что они как-то изменятся в отправленном сообщении.

Для того чтобы повысить степень определенности существуют еще несколько процедур которые никак не отличаются по синтаксису от MPI_Send, но отличаются по типу взаимодействия процессов и в способе отправки.

Первая из них это процедура MPI_Bsend. Такая процедура осуществляет передачу сообщения с буферизацией. Если процесс которому мы отправляем сообщение еще не запросил его получения, то информация будет записана в специальный буфер и процесс продолжит работу. Сообщение об ошибке возможно в случае, если места в буфере не хватит для сообщения, однако об этом размере может позаботиться и сам программист.

Вторая процедура это MPI_Ssend. Эта процедура синхронизирует потоки в процессе передачи сообщений. Возврат из этой процедуры произойдет ровно тогда, когда прием этого сообщения будет инициализирован процессом-получателем. То есть такая процедура заставляет процесс-отправитель ожидать приема сообщения, поэтому оба процесса участвующих в коммуникации продолжат работу после приема сообщения одновременно, а значит синхронизируются.

И еще одна процедура - MPI_Rsend. Она осуществляет передачу сообщения по готовности. Такой процедурой нужно пользоваться аккуратно, так как она требует чтобы процесс-получатель мыл уже готов принять это сообщение. Для того чтобы она выполнилась корректно необходимо заранее позаботиться о синхронизации процессов, либо явно знать, что этот процесс будет находиться в стадии ожидании получения сообщения к моменту вызова Rsend.


Резюме

Ну вот мы и ознакомились(а для опытных освежили в памяти) основные процедуры передачи сообщений типа точка-точка с блокировкой. В следующей статье я постараюсь показать на практике все изложенные ранее принципы и объяснить как написать программу которая будет выяснять одни из основополагающих характеристик техники используемой при параллелизации вычислений - латнетность и пропускная способность между процессами. А пока вот краткая сводка того что я здесь изложил:

Процедура/Константа/Структура

Назначение

MPI_Send

Отправка сообщения

MPI_Recv

Прием сообщения

MPI_Status

Структура статуса сообщения

MPI_ANY_SOURCE

Константа "Любому процессу"

MPI_ANY_TAG

Константа "Любой тег сообщения"

MPI_Get_count

Получить количество данных по статусу

MPI_Get_elements

Получить количество базовых элементов по статусу

MPI_Probe

Получить данные о сообщении без его приема

MPI_PROC_NULL

Константа-идентификатор не существующего процесса

MPI_Ssend

Отправка сообщения которая осуществляет синхронизацию процессов

MPI_Bsend

Отправка сообщения которая осуществляет буферизацию

MPI_Rsend

Отправка сообщения по готовности. Требует инициализации приема у процесса-получателя.

На этом пока все, приятного отдыха, хабравчане.

Теги:
Хабы:
Всего голосов 4: ↑4 и ↓0+4
Комментарии3

Публикации

Истории

Работа

Ближайшие события