Представляю вашему вниманию вторую часть перевода статьи A guide to inter-process communication in Linux.
Первая часть перевода была посвящена общему введению в курс дела и механизму разделяемого хранилища (shared storage). В этой части будут рассмотрены механизмы каналов (именованных и неименованных) и очереди сообщений.
Приятного чтения!
Использование каналов и очередей сообщений
В этом разделе описываются каналы, которые используются для соединения и взаимодействия процессов между собой. У канала есть вход (write end) для записи байтов и выход (read end) для чтения этих байтов в порядке FIFO (first in, first out). При обычном использовании один процесс пишет в канал, а другой процесс читает из того же канала. Байты сами по себе могут представлять что угодно: номера, записи о сотрудниках, цифровые фильмы и т.д.
Существует две разновидности каналов: именованные и неименованные. Ими можно пользоваться как через командную строку, так и в программах: это будет показано в примерах. В данном разделе также будут рассмотрены очереди памяти (memory queue), которые (незаслуженно!) вышли из моды.
В примерах кода из первого раздела была освещена проблема состояния гонки (race condition), как при работе с файлами (file-based), так и при работе с памятью (memory-based). Этот вопрос, разумеется, актуален и для канального IPC - в данном разделе это будет рассмотрено. В примерах кода для каналов и очередей памяти используются API, одобренные POSIX, а основной целью стандартов POSIX является потокобезопасность (thread-safety).
Рассмотрим man страницу функции mq_open , которая относится к API очередей памяти. Эта страница включает в себя раздел Атрибуты, в котором есть небольшая таблица:
Interface | Attribute | Value |
---|---|---|
mq_open() | Thread safety | MT-Safe |
Значение MT-Safe (где MT расшифровывается как multi-threaded) означает, что функция mq_open потокобезопасна (thread-safe), и, в свою очередь, процессобезопасна (process-safe): под выполнением процесса подразумевается выполнение одного из его потоков, и, поскольку состояние гонки не может возникнуть среди потоков одного и того же процесса, оно не произойдет и между потоками разных процессов. Атрибут MT-Safe гарантирует, что при вызове mq_open состояние гонки не возникнет. В общем, канальный IPC является concurrent-safe (хотя и в дальнейших примерах кода будет указываться предостережение).
Неименованные каналы
Начнем с вымышленного примера - команды, показывающей, как работают неименованные каналы. На всех современных системах в командной строке вертикальная черта |
представляет собой неименованный канал. Предположим, что %
- символ приглашения командной строки (command line prompt), и рассмотрим данную команду:
% sleep 5 | echo "Hello, world!" ## writer to the left of |, reader to the right
Утилиты sleep
и echo
выполняются как отдельные процессы, а неименованный канал позволяет им "общаться" друг с другом. Однако, "вымышленность" примера заключается в том, что никакой коммуникации не происходит. На экране появляется приветствие Hello, world!;
затем, через 5 секунд, появляется символ приглашения командной строки, означающее, что и sleep
, и echo
завершены. В чём дело?
В соответствии с синтаксисом вертикальной черты, процесс слева (sleep
) пишет, а процесс справа (echo
) - читает. По умолчанию, читающий процесс (reader) блокируется до тех пор, пока в канале не появятся байты, которые можно прочитать, а пишущий (writer) - после записи всех байтов - завершает работу отправкой маркера остановки потока (end-of-stream marker). (Даже если writer преждевременно завершает работу, маркер остановки потока будет отправлен reader-у). Неименованный канал существует, пока writer и reader не завершат работу.
В примере выше процесс sleep
не пишет ни одного байта в канал, но завершает работу через 5 секунд, что вызывает отправку маркера остановки потока в канал. В это время процесс echo
немедленно выводит Hello, world!
в стандартный вывод (на экран), т.к. процесс не прочитал ни одного байта из канала, поэтому и ждать ему нечего. Как только процессы sleep
и echo
завершают работу, неименованный канал - так и не использованный для коммуникации - закрывается и выводится символ приглашения командной строки.
Ниже представлен более полезный пример, использующий два неименованных канала. Предположим, содержимое файла test.dat
выглядит следующим образом:
this
is
the
way
the
world
ends
Команда
% cat test.dat | sort | uniq
перенаправляет вывод из процесса cat
(concatenate) в процесс sort
для сортировки данных; затем отсортированные данные направляются в процесс uniq
для удаления дубликатов (в данном случае, сокращает два the
до одного):
ends
is
the
this
way
world
Рассмотрим программу, состоящую из двух процессов, взаимодействующих друг с другом через неименованный канал (см. Пример 1).
Пример 1. Программа pipeUN: два процесса, взаимодействующих друг с другом через неименованный канал
#include <sys/wait.h> /* wait */
#include <stdio.h>
#include <stdlib.h> /* exit functions */
#include <unistd.h> /* read, write, pipe, _exit */
#include <string.h>
#define ReadEnd 0
#define WriteEnd 1
void report_and_exit(const char* msg) {
perror(msg);
exit(-1); /** failure **/
}
int main() {
int pipeFDs[2]; /* two file descriptors */
char buf; /* 1-byte buffer */
const char* msg = "Nature's first green is gold\n"; /* bytes to write */
if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");
pid_t cpid = fork(); /* fork a child process */
if (cpid < 0) report_and_exit("fork"); /* check for failure */
if (0 == cpid) { /*** child ***/ /* child process */
close(pipeFDs[WriteEnd]); /* child reads, doesn't write */
while (read(pipeFDs[ReadEnd], &buf, 1) > 0) /* read until end of byte stream */
write(STDOUT_FILENO, &buf, sizeof(buf)); /* echo to the standard output */
close(pipeFDs[ReadEnd]); /* close the ReadEnd: all done */
_exit(0); /* exit and notify parent at once */
}
else { /*** parent ***/
close(pipeFDs[ReadEnd]); /* parent writes, doesn't read */
write(pipeFDs[WriteEnd], msg, strlen(msg)); /* write the bytes to the pipe */
close(pipeFDs[WriteEnd]); /* done writing: generate eof */
wait(NULL); /* wait for child to exit */
exit(0); /* exit normally */
}
return 0;
}
Программа выше использует системную функцию fork
для создания процесса. Хоть программа представляет собой всего один файл с исходным кодом, в процессе ее выполнения (успешного!) присутствует многопроцессность (multi-processing).
Ниже кратко представлено о том, как работает библиотечная функция fork
.
Функция fork
, вызванная в родительском процессе, возращает -1 родителю в случае ошибки. В pipeUN вызов выглядит так:
pid_t cpid = fork();
В этом примере возвращаемое значение записывается в переменную cpid
целочисленного типа pid_t
. (Каждый процесс имеет собственный process ID, положительное целое число, идентифицирующее этот процесс). Создание нового процесса через fork
может завершится с ошибкой по нескольким причинам, одна из них - переполнение таблицы процессов - структуры, которую использует система для отслеживания процессов. Процессы-зомби (о них будет рассказано позже), могут вызвать переполнение таблицы процессов, если их не "собирать" (harvest).
Если вызов fork
завершается успешно, создаётся новый дочерний (child) процесс и возвращается одно значение родительскому процессу и другое значение дочернему процессу. И родительский, и дочерний процессы выполняют один и тот же код, который следует после вызова fork
. (Дочерний процесс наследует копии всех переменных, объявленных к этому моменту родительским). В частности, успешный вызов fork
возвращает:
Ноль - в дочерний процесс;
ID дочернего процесса - в родительский процесс.
Для отделения кода, предназначенного для родительского процесса, от кода для дочернего процесса, используется if/else или эквивалентная конструкция. В данном примере:
if (0 == cpid) { /*** child ***/
...
}
else { /*** parent ***/
...
}
Если создание форка процесса завершается успешно, pipeUN продолжает выполнение следующим образом. Для хранения двух файловых дескрипторов (один - для записи в канал, второй - для чтения из канала) есть массив целых чисел:
int pipeFDs[2]; /* two file descriptors */
где pipeFDs[0]
- дескриптор для чтения, pipeFDs[1]
- дексриптор для записи. Успешный вызов функции pipe
, выполненный непосредственно перед вызовом fork
, заполняет массив этими файловыми дескрипторами.
if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");
У родительского и дочернего процессов есть копии этих двух файловых дескрипторов, однако, в соответствии с паттерном разделения ответственности (separation of concerns), каждому из процессов требуется только один дескриптор. В нашем примере родительский процесс пишет, а дочерний - читает, хотя роли могут быть и обратными. Поэтому первый оператор в коде дочернего if-блока закрывает канальный вход для записи (write end):
close(pipeFDs[WriteEnd]); /* called in child code */
а первый оператор в коде родительского else-блока закрывает канальный выход для чтения (read end):
close(pipeFDs[ReadEnd]); /* called in parent code */
Затем родительский процесс записывает несколько байтов (коды ASCII) в неименованный канал, а дочерний процесс считывает их и перенаправляет их в стандартный вывод.
Необходимо пояснить про вызов функции wait
в коде для родительского процесса. Как только дочерний процесс был создан, он становится максимально независим от своего родителя. Дочерний процесс может выполнять различный код, который не имеет ничего общего с родительским. Однако, с помощью механизма сигналов система уведомляет родителя, когда дочерний процесс завершит свою работу.
Что будет, если родительский процесс завершится раньше дочернего? В данном случае, если меры предосторожности не будут приняты, дочерний процесс становится процессом-зомби и о нём остаётся запись в таблице процессов. Меры предосторожности бывают двух типов. Один подход - когда родительский процесс информирует систему, что для него не имеет значения, когда завершится работа дочернего процесса:
signal(SIGCHLD, SIG_IGN); /* in parent: ignore notification */
Второй подход - когда родительский процесс выполняет wait
вплоть до завершения работы дочернего процесса, тем самым обеспечивая, что он переживёт дочерний процесс. В pipeUN используется второй подход - при вызове
wait(NULL); /* called in parent */
в родительском коде.
Вызов wait
подразумевает, что родительский процесс будет ожидать завершения работы дочернего процесса, и в pipeUN будет только один дочерний процесс. (Аргумент NULL
может быть заменен адрес целочисленной переменной, в которую запишется статус завершения (exit status) дочернего процесса). Для более гибкого управления существует функция waitpid
, используемая, например, для выбора конкретного дочернего процесса среди нескольких.
В pipeUN предприняты еще одни меры предосторожности. Когда в родительском процессе заканчивает ждать, он завершает работу обычным вызовом функции exit
. При этом дочерний процесс завершается вызовом __exit
для ускорения процесса оповещения о своем завершении. По сути, дочерний процесс сообщает системе - как можно скорее сообщите родительскому процессу, что я завершил работу.
Если два процесса пишут в один и тот же неименованный канал, могут ли байты перемешаться (interleave) между собой? Например, если процесс П1 пишет:
foo bar
в канал, а процесс П2 параллельно пишет:
baz baz
в тот же самый канал, кажется, что в канале в этот момент может быть что угодно, например:
baz foo baz bar
Стандарт POSIX гарантирует, что перемешивания байтов не пройзодет, пока в канал не будет записано больше, чем PIPE_BUF
байтов. На системах Linux, размер PIPE_BUF
соответствует 4096 байтам. Для того, чтобы обойти эту проблему, я предпочитаю в каналах использовать только один writer и только один reader.
Именованные каналы
У неименованного канала нет файла поддержки (backing file): система поддерживает в памяти буфер, используемый для передачи байтов от writer-а к reader-у. Как только writer и reader завершает работу, буфер освобождается, и неименованный канал закрывается. У именованного канала, наоборот, имеется файл поддержи и отдельный API.
Взглянем на еще один пример с командной строкой, чтобы понять суть работы именованных каналов. Шаги представлены ниже:
1) Открыть два терминала. Рабочий каталог у них должен быть один и тот же.
2) В одном из терминалов введите следующие команды (символ приглашения командной строки - %
, мои комментарии начинаются с ##
):
% mkfifo tester ## creates a backing file named tester
% cat tester ## type the pipe's content to stdout
Сначала в терминале ничего не должно отобразиться, поскольку в именованный канал еще ничего не было записано.
3) Во втором терминале введите команду:
% cat > tester ## redirect keyboard input to the pipe
hello, world! ## then hit Return key
bye, bye ## ditto
<Control-C> ## terminate session with a
## Control-C
Что бы не было введено в этот терминал, оно будет отображаться в первом. Как только нажато Ctrl+C, в обоих терминалах появляется символ приглашения командной строки: канал закрылся.
4) "Прибираемся" - удаляем файл, который реализует именованный канал:
% unlink tester
Как следует из названия утилиты mkfifo
, именованный канал еще называют FIFO, поскольку "первый байт на вход = первый байт на выход" (first byte in is the first byte out). Существует библиотечная функция с именем mkfifo
, создающая именованный канал в программах, и она будет использована в следующем примере, который состоит из двух процессов: один пишет в именованный канал, а другой из него читает.
Пример 2. Программа fifoWriter
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <stdio.h>
#define MaxLoops 12000 /* outer loop */
#define ChunkSize 16 /* how many written at a time */
#define IntsPerChunk 4 /* four 4-byte ints per chunk */
#define MaxZs 250 /* max microseconds to sleep */
int main() {
const char* pipeName = "./fifoChannel";
mkfifo(pipeName, 0666); /* read/write for user/group/others */
int fd = open(pipeName, O_CREAT | O_WRONLY); /* open as write-only */
if (fd < 0) return -1; /** error **/
int i;
for (i = 0; i < MaxLoops; i++) { /* write MaxWrites times */
int j;
for (j = 0; j < ChunkSize; j++) { /* each time, write ChunkSize bytes */
int k;
int chunk[IntsPerChunk];
for (k = 0; k < IntsPerChunk; k++)
chunk[k] = rand();
write(fd, chunk, sizeof(chunk));
}
usleep((rand() % MaxZs) + 1); /* pause a bit for realism */
}
close(fd); /* close pipe: generates an end-of-file */
unlink(pipeName); /* unlink from the implementing file */
printf("%i ints sent to the pipe.\n", MaxLoops * ChunkSize * IntsPerChunk);
return 0;
}
Программу выше можно свести к следующему:
1) Программа создаёт именованный канал для записи:
mkfifo(pipeName, 0666); /* read/write perms for user/group/others */
int fd = open(pipeName, O_CREAT | O_WRONLY);
где pipeName
- имя файла поддержки, передаваемое в mkfifo
как первый аргумент. Затем именованный канал открывается с помощью уже известного вызова функции open, возвращающей файловый дескриптор.
2) Для большего реализма fifoWriter
пишет не все данные за раз, а пишет фрагментами (chunk), между отправкой которых засыпает на рандомное количество микросекунд. Суммарно, в именованный канал запишется 768000 4-байтных целых чисел.
3) После закрытия именованного канала, fifoWriter отсоединяет (unlink) файл поддержки:
close(fd); /* close pipe: generates end-of-stream marker */
unlink(pipeName); /* unlink from the implementing file */
Система освобождает файл поддержки, как только все процессы, подключенный к каналу, выполнят операцию отсоединения. В этом примере два процесса (fifoWriter и fifoReader) отсоединяют файл.
Две программы должны выполняться в разных терминалах с общим рабочим каталогом. Однако, fifoWriter должен стартовать перед fifoReader, поскольку именно первый процесс создаёт канал. Затем fifoReader обращается к уже созданному именованному каналу (см. пример 3).
Пример 3. Программа fifoReader
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
unsigned is_prime(unsigned n) { /* not pretty, but gets the job done efficiently */
if (n <= 3) return n > 1;
if (0 == (n % 2) || 0 == (n % 3)) return 0;
unsigned i;
for (i = 5; (i * i) <= n; i += 6)
if (0 == (n % i) || 0 == (n % (i + 2))) return 0;
return 1; /* found a prime! */
}
int main() {
const char* file = "./fifoChannel";
int fd = open(file, O_RDONLY);
if (fd < 0) return -1; /* no point in continuing */
unsigned count = 0, total = 0, primes_count = 0;
while (1) {
int next;
int i;
ssize_t count = read(fd, &next, sizeof(int));
if (0 == count) break; /* end of stream */
else if (count == sizeof(int)) { /* read a 4-byte int value */
total++;
if (is_prime(next)) primes_count++;
}
}
close(fd); /* close pipe from read end */
unlink(file); /* unlink from the underlying file */
printf("Received ints: %u, primes: %u\n", total, primes_count);
return 0;
}
Программу выше можно свести к следующему:
1) Поскольку fifoWriter создаёт именованный канал, fifoReader-у требуется только вызвать open
для обращения к каналу через файл поддержки:
const char* file = "./fifoChannel";
int fd = open(file, O_RDONLY);
Файл открывается только для чтения.
2) Затем программа попадает в (потенциально) бесконечный цикл, в каждой итерации которого она пытается считать 4-байтный фрагмент. Вызов read
:
ssize_t count = read(fd, &next, sizeof(int));
возращает 0 в конце потока (end-of-stream); в этом случае fifoReader выходит из цикла, закрывает именованный канал, и отсоединяет файл поддержки перед завершением работы.
3) После чтения 4-байтного целого числа, fifoReader проверяет, является ли число простым (prime).
В пробном запуске программы среди полученых 768,000 целых чисел было насчитано 37,682 простых. На повторных пробных запусках, fifoReader успешно считал все байты, записанные fifoWriter. В этом нет ничего удивительного - два процесса выполняются на одном и том же компьютере, что исключает проблемы с сетью. Именованные каналы - надёжный, эффективный и широко используемый механизм IPC.
Ниже представлен вывод двух программ; каждая запущена из разных терминалов, но в одном и том же рабочем каталоге:
% ./fifoWriter
768000 ints sent to the pipe.
% ./fifoReader
Received ints: 768000, primes: 37682
Очереди сообщений
Каналы строго соответствуют поведению FIFO: первый записанный байт - первый считанный байт, второй записанный байт - второй считанный байт, и так далее. Очереди сообщений могут как вести себя таким же образом, так и более гибко - фрагменты данных могут быть получены и не в порядке FIFO.
Очередеь сообщений - последовательность сообщений, состоящих из двух частей:
полезная нагрузка (payload) - массив байтов (`char` в С);
тип, представленный положительным целым числом; тип категоризирует сообщения для возможности более гибкого получения.
Рассмотрим очередь сообщений, где каждое из сообщений отмечено целым числом:
+-+ +-+ +-+ +-+
sender--->|3|--->|2|--->|2|--->|1|--->receiver
+-+ +-+ +-+ +-+
Из этих четырёх сообщений, то, что обозначено 1, находится в передней части, т.е. ближе всего к получателю. Следом идут два сообщения с отметкой 2, и сообщение с отметкой 3 в конце. Если строгое FIFO поведение соблюдается, тогда сообщения должны быть получены в порядке 1-2-2-3. Однако, очередь сообщений поддерживает и другой порядок получения. К примеру, сообщения могут быть получены в порядке 3-2-1-2.
Пример mqueue состоит из двух программ: sender - пишущая в очередь сообщений и receiver - читающая из очереди сообщений. Обе программы включают заголовочный файл queue.h, показанный в Примере 4 ниже.
Пример 4. Заголовочный файл queue.h
#define ProjectId 123
#define PathName "queue.h" /* any existing, accessible file would do */
#define MsgLen 4
#define MsgCount 6
typedef struct {
long type; /* must be of type long */
char payload[MsgLen + 1]; /* bytes in the message */
} queuedMessage;
В заголовочном файле определена структура queuedMessage
с полями payload
(массив байтов) и type
(целое число). В файле также определяются символьные константы (директивы #define
), первые две из которых используются для генерации ключа, который, в свою очередь, используется для получения ID очереди сообщений. ProjectID
может быть любым положительным целым числом, а PathName
должен быть существующим и доступным файлом - в нашем случае это файл queue.h
.
Операторы, осуществляющие настройку sender и receiver:
key_t key = ftok(PathName, ProjectId); /* generate key */
int qid = msgget(key, 0666 | IPC_CREAT); /* use key to get queue id */
ID qid
по факту является аналогом файлового дескриптора для очереди сообщений.
Пример 5. Программа sender
#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <string.h>
#include "queue.h"
void report_and_exit(const char* msg) {
perror(msg);
exit(-1); /* EXIT_FAILURE */
}
int main() {
key_t key = ftok(PathName, ProjectId);
if (key < 0) report_and_exit("couldn't get key...");
int qid = msgget(key, 0666 | IPC_CREAT);
if (qid < 0) report_and_exit("couldn't get queue id...");
char* payloads[] = {"msg1", "msg2", "msg3", "msg4", "msg5", "msg6"};
int types[] = {1, 1, 2, 2, 3, 3}; /* each must be > 0 */
int i;
for (i = 0; i < MsgCount; i++) {
/* build the message */
queuedMessage msg;
msg.type = types[i];
strcpy(msg.payload, payloads[i]);
/* send the message */
msgsnd(qid, &msg, MsgLen + 1, IPC_NOWAIT); /* don't block */
printf("%s sent as type %i\n", msg.payload, (int) msg.type);
}
return 0;
}
Программа выше отправляет 6 сообщений - по два сообщения каждого типа: первые сообщения с типом 1, следующие два - с типом 2, два последних - с типом 3. Отправка сообщений:
msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT);
выполняется как неблокирующая (флаг IPC_NOWAIT
), поскольку размер сообщений небольшой. В этом случае единственно возможная опасность - переполненная очередь (что в случае нашего примера - маловероятно) - может привести к ошибке отправки. Программа ниже тоже получает сообщения с использованием флага IPC_NOWAIT
.
Пример 6. Программа receiver
#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include "queue.h"
void report_and_exit(const char* msg) {
perror(msg);
exit(-1); /* EXIT_FAILURE */
}
int main() {
key_t key= ftok(PathName, ProjectId); /* key to identify the queue */
if (key < 0) report_and_exit("key not gotten...");
int qid = msgget(key, 0666 | IPC_CREAT); /* access if created already */
if (qid < 0) report_and_exit("no access to queue...");
int types[] = {3, 1, 2, 1, 3, 2}; /* different than in sender */
int i;
for (i = 0; i < MsgCount; i++) {
queuedMessage msg; /* defined in queue.h */
if (msgrcv(qid, &msg, MsgLen + 1, types[i], MSG_NOERROR | IPC_NOWAIT) < 0)
puts("msgrcv trouble...");
printf("%s received as type %i\n", msg.payload, (int) msg.type);
}
/** remove the queue **/
if (msgctl(qid, IPC_RMID, NULL) < 0) /* NULL = 'no flags' */
report_and_exit("trouble removing queue...");
return 0;
}
Программа receiver не создаёт очередь сообщений, хотя API предоставляет такую возможность. В receiver-е вызов:
int qid = msgget(key, 0666 | IPC_CREAT);
вводит в заблуждение флагом IPC_CREAT
; однако этот флаг подразумевает "создать при необходимости, иначе - получить доступ". Программа sender вызывает msgsnd
для отправки сообщений, тогда как receiver вызывает msgrcv
для их получения. В нашем примере sender отправляет сообщения в порядке 1-1-2-2-3-3, однако receiver будет их получать в порядке 3-1-2-3-2 для демонстрации, что очереди сообщений не ограничены только порядком FIFO.
% ./sender
msg1 sent as type 1
msg2 sent as type 1
msg3 sent as type 2
msg4 sent as type 2
msg5 sent as type 3
msg6 sent as type 3
% ./receiver
msg5 received as type 3
msg1 received as type 1
msg3 received as type 2
msg2 received as type 1
msg6 received as type 3
msg4 received as type 2
Вывод выше показывает, что sender и receiver могут быть запущены в одном и том же терминале. Также в выводе показано, что очередь сообщений сохраняется даже после того, как процесс sender завершает работу. Очередь удаляется только после того, как процесс receiver явным образом удалит её вызовом msgctl
:
if (msgctl(qid, IPC_RMID, NULL) < 0) /* remove queue */
Подведение итогов
API каналов и очередей сообщений являются однонаправленными: один процесс пишет, другой - читает. Существует реализация двунаправленных именованных каналов, но, по моему мнению - чем механизм IPC проще, тем лучше. Как отмечалось ранее, популярность очередей сообщений упала, но без веской причины; очереди являются еще одним инструментов из набора инструментов IPC.