Как стать автором
Обновить

IBM i: системные очереди данных

Уровень сложностиПростой
Время на прочтение17 мин
Количество просмотров1.5K

Еще одна статья из жизни под IBM i


IBM i (AS/400) является "объектной" системой, построенной на концепции "все есть объект". У каждого объекта есть имя (может меняться в течении его жизненного цикла), тип (устанавливается единожды при создании и далее не меняется) и, в некоторых случаях, атрибуты ("подтип", аналогично типу, устанавливаются при создании и не могут быть изменены). Также объект может иметь текстовое описание (необязательное) - строка до 50-ти символов, которое может быть задано как при создании, так и после.

Действия, разрешенные над данным конкретным объектом, определяются его типом. Простейший пример - тут нельзя открыть программу в hex редакторе и поправить пару байтиков - такая операция просто не предусмотрена для объекта типа *PGM.

Существует очень большое количество типов системных объектов, предоставляющих большие возможности для разработчика.

Цитата из предисловия к книге Френка Солтиса "Основы AS/400":

Задолго до того, как термин «объектно-ориентированный» широко распространился, машинный интерфейс высокого уровня AS/400, появившийся в предшествовавшей ей System/38, предоставил разработчикам приложений набор объектов (таких как очереди, индексы и файлы базы данных), которые при создании программы можно было использовать в качестве строительных блоков. Объекты придали высоким функциональным возможностям системы согласованную и простую в работе форму. Унифицированный объектный интерфейс скрывает от программистов AS/400 детали, благодаря чему они могут игнорировать сложные управляющие блоки и системные процедуры, относящиеся к внутренним процессам операционной системы (ОС). Более того, с помощью набора заранее определенных операций ОС ограничивает доступ к объектам. Это обеспечивает приложениям дополнительную защиту при исполнении.

Не так давно пришлось плотно столкнуться с такими типами объектов, как очереди данных. Здесь их два - Data Queue (*DTAQ) и User Queue (*USRQ). Ниже речь пойдет по большей части про *USRQ т.к. задача из исследовательской "что эффективнее в наших сценариях использования - *DTAQ или *USRQ" переросла в задачу по разработке удобного и лаконичного USRQ API.

Исследования очередей

Общие свойства очередей

Оба типа очередей функционально схожи между собой. И та и другая могут быть трех типов -

FIFO - классическая очередь

"Первым зашел - первым вышел". Добавление элемента происходит в конец очереди, извлечение - из начала списка.

LIFO - стек

"Последим зашел - первым вышел". И добавление и извлечение элемента происходит из/в начала списка.

KEYED - FIFO список пар ключ-значение.

Для этого типа очереди каждый элемент при добавлении снабжается ключом. При этом доступно извлечение как первого в очереди элемента, так и первого элемента, удовлетворяющего заданному условию (равно, не равно, больше, меньше, больше или равно, меньше или равно) для заданного значения ключа.

Тип очереди задается при ее создании и не может быть изменен. При создании таже указывается максимальный размер элемента - от 1 до 64512 байт для *DTAQ или до 64000 для *USRQ. Для KEYED очереди также указывается размер ключа - от 1 до 256 байт. Как реальный размер очередного элемента, так и реальный размер ключа при добавлении в очередь могут быть меньше максимально указанного при создании.

Атомарной единицей в очереди является элемент - каждая операция записи или чтения запишет или прочитает ровно один элемент, не больше и не меньше.

Также при создании указываются опции аллокации - начальное количество элементов, размер приращения и максимальное количество элементов (для *DTAQ) или максимальное количество приращений (для *USRQ). При этом действует общее для обоих очередей ограничение - максимальный размер очереди не может превышать 2Гб.

Еще одно интересное свойство очередей - они позволяют передавать указатели между заданиями таким образом, что область памяти по этому указателю принадлежащая одному заданию, становится доступной в другом (т.е. такое "расшаривание" памяти). Естественно, что это работает только для локальных очередей (в рамках одной машины) и требует явного указания при создании очереди.

Оба типа очереди поддерживают одинаковый базовый набор операций - размещение элемента в очередь, извлечение элемента из очереди, "материализация очереди" - получение ее свойств (тип, емкость, текущее количество элементов и т.п.), "материализация элементов очереди" - не извлекающее чтение - позволяет посмотреть содержимое очереди без удаления из нее элементов, очистка очереди.

Различия между *DTAQ и *USRQ

*DTAQ

*USRQ

Домен объекта

Только *SYSTEM

*SYSTEM или *USER, указывается при создании объекта1

Хранение на диске

Описание объекта и содержимое (частота сохранения сообщений на диск указывается при создании - каждое сообщение или по усмотрению системы)

Только описание объекта2

Расположение объекта

Локально или удаленно

Только локально

Размещение информации об отправителе в заголовке сообщения (SenderID)

Да, если было указано при создании очереди

Нет

Журналирование3

Да

Нет

Средства работы с очередью

Создание, удаление, очистка, информация - системные API, команды языка CL
Чтение, запись, информация - системные API, SQL UDF/UDTF

Создание, удаление - системные API
Чтение, запись, информация - MI команды

Примечания:

  1. В IBM i есть пять уровней защиты системы. От 10-го уровня (отсутствие защиты), до 50-го - защита по сертификату С2 правительства США. На уровнях 40 и 50 не все MI доступны пользователю. Есть ограничения. Например, на этих уровнях работа с объектами в домене *SYSTEM через MI запрещена. Только через системные API (более высокий уровень абстракции и интеграции). А через MI можно работать только с объектами в домене *USER. Таким образом, возможно создать объект *USRQ в домене *SYSTEM на любом уровне защиты, но при этом если система работает в уровнях 40 или 50, пользоваться им будет невозможно.

  2. Несмотря на то, что все сообщения хранятся в памяти, это не память того задания, которое их туда поместило, но системная память, содержимое которой сохраняется пока работает система. Т.о., потеря данных *USRQ возможна только при рестарте сервера, что само по себе очень нештатная ситуация.

  3. Журналирование осуществляется системой и заключается в сохранении в специальных системных журналах всей информации об изменениях состояния объекта.

Видно, что *DTAQ обладает большими возможностями по сравнению с *USRQ, но, вместе с тем, проигрывает по производительности и потреблению ресурсов.

Сравнительные характеристики

Сравнение скорости работы: для каждой очереди проводилось 1 000 циклов чтение-запись по 1 000 сообщений в каждом цикле.

Итого – 1 000 000 операций чтение-запись на тест. Размер сообщения – 128 байт.
Тесты проводились как очереди без ключа (FIFO), так и для очереди с ключом (KEYED) с размером ключа 5 байт.

В сумме в каждом тесте в очередь записано 128 000 000 байт и столько же прочитано

*DTAQ

*USRQ

FIFO

11,164 сек

1,793 сек

KEYED

13,182 сек

2,557 сек

Преимущество в скорости *USRQ перед *DATQ можно оценить как 5-6 раз.

Сравнение ресурсофээфетивности проводилось специальным инструментом - Performance EXplorer (PEX) который позволяет определять долю (в рамках задания) утилизации CPU (и времени использования CPU) той или иной функцией. Результат:

QSNDDTAQ - системное API посылки сообщения в *DATQ
QRCVDTAQ - системное API чтения сообщения из *DATQ
enq - MI посылки сообщения в *USRQ
DEQUQUE WAIT - MI чтения сообщения (с таймаутом) из *USRQ. Есть и более "легкая" операция чтения - deq (без таймаута), но здесь не проверялась.

Опять видим преимущество *USRQ перед *DTAQ в плане утилизации CPU - более чем в 12 раз.

Полученные результаты показывают, что в тех сценариях, когда не требуются "расширенные" свойства очереди, целесообразно использовать более "легкую" и быструю *USRQ.

Разработка USRQ API

В связи с тем, что использование *USRQ оказалось востребованным, но работа с ней ведется на достаточно низком уровне и требует достаточно большого объема кода (например, MI чтения с таймаутом DEQWAIT, если очередь пустая, по истечении таймаута просто "выкидывает" системное исключение, которое требуется перехватывать и обрабатывать), было принято решение разработать простое и удобное в использовании USRQ API с высокой степенью интеграции и возможностью его использования из программ как на С/С++, так и на RPG (основной язык для работы с БД и реализации бизнес-логики на этой платформе).

При проектировании API был сформулирован ряд условий, часть из которых является специфическим для наших сценариев использования.

  1. Два уровня конфигурации. Первый - параметры создания объекта-очереди. Второй - параметры работы с очередью (количество попыток посылки сообщения в очередь, интервал времени между попытками, таймаут при чтении сообщения из очереди...).
    Поскольку (специфика) используемые объекты создаются не в рантайме, а на уровне развертывания поставки, первый уровень конфигурации задается в поставке и не хранится. Второй же уровень должен хранится в отдельной таблице настроек, единой для всех пользователей очередей (у каждого пользователя свой идентификатор настроек). Более того, эта конфигурация должна быть "теплой" - перечитываться в процессе работы с заданным интервалом. Это уж для сопровождения - они должны иметь возможность изменять параметры работы с той или иной очередью без перезапуска задания, ее использующего.

  2. Для сопровождения должны быть доступны средства создания/удаления очередей, освобождения ресурсов всех связанных с заданием очередей, информация об очереди... Все это как командами системного языка CL, так и средствами SQL - хранимые процедуры, UDF/UDTF.

  3. Системные исключения должны перехватываться и трансформироваться в "структурированную ошибку" (еще один механизм типичный для IBM i - использование ошибки в формате код + параметры с хранением текста в специальных *MSGF - message file)

  4. Интерфейсы должны быть доступны как для использования из программ на С/С++, так и на RPG (преимущественно на RPG).

  5. Использование динамической памяти должно быть сведено к минимуму в целях повышения эффективности.

  6. Должна быть возможность передавать данные отправителя сообщения (системный номер задания, имя задания, имя пользователя задания)

Для разработки был выбран С со статическим выделением памяти. Единственное место, где использовалась динамическая аллокация - выделение памяти для материализации сообщений очереди (загрузка сообщений без их удаления из очереди). Это связано с особенностью реализации *USRQ - там нет возможности материализовать сообщения выборочно - только все разом. Т.е. нужно иметь возможность выделения буфера размером до 2Гб (из этих же соображений выбрана модель памяти TERASPACE - там как раз можно выделить динамически до 2Гб одним блоком). А поскольку эта операция достаточно редкая, то постоянно держать в статике по 2ГБ на очередь (а их в одном задании может быть несколько) слишком расточительно (да же с учетом физических 12Тб RAM на сервере).

Вся остальная память - статика.

Для хранения параметров "открытой" очереди (буфера, ошибки, настройки работы с очередью, системный указатель на объект) используется таблица на 128 элементов (это на одно задание - с большим запасом). Фактически используется идеология работы с файлами в С - внутри таблица с информацией о файле, при открытии файла возвращается handle - фактически - индекс советующего данному файл элемента таблицы. Здесь ровно тоже самое. Работа с очередью начинается с "подключения" к ней с указанием имени объекта и идентификатор настроек в таблице конфигураций. По имени объекта посредством MI rslvsp (resolve system pointer) получается системный указатель на объект (который потом используется при всех операциях с данной очередью),

  #pragma exception_handler(UsrQExeptHandler, 0, _C1_ALL, _C2_ALL, _CTLA_HANDLE_NO_MSG)
    pSP = rslvsp(_Usrq, pQueItem->UsrQName, pQueItem->UsrQLib, _AUTH_ALL);
  #pragma disable_handler

(поскольку MI не возвращают ошибок, а сразу бросают системное исключение, необходимо быть уверенным что перед вызовом MI установлен exeption handler) дальше получаются данные MI matqat - "материализация" очереди - получение ее свойств (тип, размер сообщения, ключа и т.п.)

  #pragma exception_handler(UsrQExeptHandler, 0, _C1_ALL, _C2_ALL, _CTLA_HANDLE_NO_MSG)        
    matqat(&mat_template, pQueItem->spQueue);
  #pragma disable_handler

где mat_template определена как

    typedef _Packed struct _MQAT_Template_T {
                                        /* Materialize queue attrs   */
      int              Template_Size;   /* Size of this template     */
      int              Bytes_Used;      /* Bytes used                */
      _Gen_Mat_ID_T    Object;          /* object identification     */
      int              Exist      : 1;  /* Permanent queue           */
      int              Spc_Attr   : 1;  /* variable length space     */
      int              In_Context : 1;  /* inserted in a context     */
      int              Acc_Group  : 1;  /* member of access group    */
      int              reserved1  : 9;
      int              Init_Space : 1;  /* initialize space flag     */
      int              reserved2  :18;
      char             reserved3[4];
      int              Space_Size;      /* size of space             */
      char             Init_Value;      /* initial value of space    */
      int              Space_Align: 1;  /* space alignment           */
      int              reserved4  : 2;  /* @B0C                      */
      int              Mach_Align: 1;  /* machine space alignment@B0A*/
      int              reserved8  : 1;  /* @B0C                      */
      int              Main_Pool  : 1; /* main storage pool selection*/
      int              reserved5  : 1;
      int              Block_XFer : 1;  /* block transfer on AS mod  */
      char             reserved6;
      _SYSPTR          Context;         /* context                   */
      _SYSPTR          Access_Group;    /* Access Group              */
      int              Pointers   : 1;  /* Contains pointers         */
      int              Q_Type     : 2;  /* queue type:               */
                                        /*   00 = keyed, 01 = LIFO   */
                                        /*   10 = FIFO               */
      int              Over_Flow  : 1;  /* extend on overflow        */
      int              User_Ext   : 1;  /* user specified max ext@C0A*/
      int              Reclaim    : 1;  /* reclaim storage on mt @C0A*/
      int              reserved7  : 1;  /*                       @C2C*/
      int              Lockable   : 1;  /* lockable              @C2A*/

      int              Max_Msgs;        /* Max # of messages         */
      int              Num_Msgs;        /* Current # of messages     */
      int              Extension;       /* extension value           */
      short            Key_Length;      /* key length                */
      int              Max_Size;        /* Maximum message size      */
      char             reserved9;       /*                       @C0A*/
      int              Max_Extend;      /* Max number of extends @C0A*/
      int              Cur_Extend;      /* Cur number of extends @C0A*/
      int              Init_Msgs;       /* Initial num messages  @C0A*/
      char             Last_Reclaim[8]; /* Time of last storage      */
                                        /* reclaim               @C2A*/
      char             reserved10[8];   /* reserved              @C2A*/
    } _MQAT_Template_T;

и, наконец, из таблицы конфигураций по идентификатору читаются параметры работы с очередью. Все это записывается в соответствующий элемент таблицы и пользователю возвращается handle - индекс элемента.

Выглядит это примерно так. С прототип

extern "C" int USRQ_Connect(char* __ptr128 Name, char* __ptr128 Lib, 
                            char* __ptr128 CfgId, char* __ptr128 pError);

модиифкатор __ptr128 нужен т.к. внутри API используется модель памяти TERASPACE с 64бит указателями, а потребители ее обычно работают в модели памяти SINGLE LEVEL с 128бит указателями.

RPG прототип:

// Подключение к существующей очереди
// Возвращает handle объекта, -1 при ошибке подключения
dcl-pr USRQ_ConnectQueue int(10) extproc(*CWIDEN : 'USRQ_Connect') ;
  Name      char(10)                   const;                                  // Имя очереди
  Lib       char(10)                   const;                                  // Библиотека
  CfgId     char(10)                   const;                                  // Id конфигурации в таблице
  Error     char(37)                   options(*omit);                         // Ошибка
end-pr;

Дальше все просто - есть функции отсылки сообщения (с ключом или без ключа в зависимости от типа очереди), получения сообщения (без ключа или с ключом и условием его применения), функции загрузки сообщений в память без удаления из очереди и затем получения отдельного сообщения из буфера, функции получения параметров очереди (в т.ч. текущего количества сообщений в ней. Все эти функции уже работают с полученным при "подключении" к очереди handle (фактически - готовым системным указателем на объект) что экономит время и ресурсы в отличии от системных API для работы с *DTAQ (QSNDDTAQ/QRCVDTAQ), которые обращаются к объекту по имени (он, конечно, кешируют системный указатель, но только с прошлого вызова - это работает когда постоянно обрушаетесь к одной очереди, а если к 2-3-м по очереди, то системный указатель будет получаться при каждом обращении).


В принципе, с *DTAQ можно работать через те же самые MI (весь код будет на 99.999% тот же самый за исключением того, что при получении системного указателя на объект нужно указать тип объекта не _Usrq, а _Dtaq), что будет несколько быстрее системных API (хотя бы за счет кеширования системного указателя), но проблема в том, что *DTAQ создается только в домене *SYSTEM, следовательно, такое решение будет работать только на низких уровнях защиты (не выше 30-го) - дальше уже система не даст работать через MI с объектами в домене *SYSTEM.


Для обеспечения возможности передачи SenderID сообщение в очереди имеет сигнатуру (нужна чтобы понимать что это сообщение "нашего" формата, с заголовком) и заголовок

typedef _Packed struct tagMessageBuffer {
  char              Signature[MSG_SIGNLEN];
  QP0W_Job_ID_T     SenderJobID;
  char              Buffer[0];
};

где QP0W_Job_ID_T определено как стандартное полное имя задания (из которого было отправлено сообщение)

/*--------------------------------------------------------------------*/
/* Type Name: QP0W_Job_ID_T                                           */
/*                                                                    */
/* Purpose:   Input/output template for returning the qualified job   */
/*            name and internal job identifier for a process          */
/*                                                                    */
/*--------------------------------------------------------------------*/

typedef struct QP0W_Job_ID_T {
    char    jobname[10];            /* Job name                       */
    char    username[10];           /* User name                      */
    char    jobnumber[6];           /* Job number                     */
    char    jobid[16];              /* Internal job identifier        */
} QP0W_Job_ID_T;

Для конечного пользователя API все это не видно - он посылает данные, которые помещаются в поле Buffer, заголовок добавляется автоматически. При приеме - аналогично. Пользователь получит то, что содержится в поле Buffer и, отдельно, если попросит, SenderID. Т.е. вся эта возня с форматами заголовков конечного пользователя не касается, это все внутреннее.

Некоторые проблемы возникли с созданием очереди. В отличии от *DATQ где все просто - указывается начальная емкость (количество сообщений), размер приращения (на сколько сообщений увеличивать емкость при заполнении) и максимальный размер - количество сообщений, или "по умолчанию" - 16Мб, или "максимально возможный" - 2Гб, с *USRQ немного сложнее.
Начальная емкость и размер приращения указывается аналогично *DTAQ, а вот с максимальной все сложно. Там нужно указывать "максимальное количество приращений очереди". По умолчанию - 0 (тогда система выберет сама так, чтобы суммарный размер очереди не вылез за границы 16Мб. Но если хочется больше, то тут уже вступает в силу ограничение суммарного размера очереди в 2Гб. И вот тут в системе где-то явно внутренняя ошибка. Указываем заведомо завышенное значение, вызываем API создания очереди, потом matqat и в поле _MQAT_Template_T.Max_Extend видим что там значение меньше того, что заказывали. Т.е. система якобы сама установила ограничение "под максимальный размер".
Вроде бы все хорошо, но... Считаем максимальное количество сообщений в очереди -
_MQAT_Template_T.Init_Msgs + _MQAT_Template_T.Extension * _MQAT_Template_T.Max_Extend и пытаемся полностью заполнить очередь, поместив в нее полученное количество сообщений. И... В какой-то момент вылетаем по системному исключению "очередь заполнена". Причем, задолго (на сотни сообщений) до обещанного лимита. На сколько точно - в разных случаях по разному. Есть какая-то корреляция с максимальным размером сообщения, но поймать закономерность не удалось.

И это проблема. Потому что пользователь при создании очереди должен иметь возможность указать начальный размер, размер приращения и максимальный размер - по умолчанию, или максимальное количество сообщений в явном виде, или максимальный размер очереди. И последние два варианта должны быть пересчитаны в "максимальное количество приращений" так, чтобы общий размер очереди не вышел за границы 2Гб (системное ограничение на размер объекта).

Увы, но точного решения тут найти не удалось. Пришлось вводить ряд искусственных допущений и ограничений так, чтобы максимальное количество сообщений в очереди, пусть и заниженное относительно реального предела, давало гарантию того, что все они поместятся в очередь.

Прототип функции создания очереди на С:

extern "C" _RPG_ind USRQ_Create(char* __ptr128 Name, char* __ptr128 Lib, eQueueType eSeq, 
                                char* __ptr128 Desc, int nMsgSize, int nKeyLen, int nInitMsgs, 
                                int nExtMsgs, int nMaxMsgs, char* __ptr128 pError);

и на RPG

// Создание новой очереди
// Возвращает индикатор успешности
// Если очередь уже существует - возвращает ошибку
dcl-pr USRQ_CreateQueue ind extproc(*CWIDEN : 'USRQ_Create') ;
  Name      char(10)                   const;                                  // Имя очереди
  Lib       char(10)                   const;                                  // Библиотека
  eSeq      int(10)                    value;                                  // Тип очереди queKeyd/queLIFO/queFIFO
  Desc      char(50)                   const;                                  // Описание
  nMsgSize  int(10)                    value;                                  // Макс. размер сообщения
                                                                               // Максимально допустимое значение - 64000 байт
  nKeyLen   int(10)                    value;                                  // Размер ключа (игнорируется для не queKeyd)
                                                                               // Максимально допустимое значение - 256 байт
  nInitMsgs int(10)                    value;                                  // Начальное количество сообщеий
  nExtMsgs  int(10)                    value;                                  // Колчество сообщений в приращении
  nMaxMsgs  int(10)                    value;                                  // Максимальное количество сообщений
  Error     char(37)                   options(*omit);                         // Ошибка
end-pr;

Так или иначе, но получить удовлетворяющее поставленным условиям USRQ API таки удалось. Все это оформлено в виде "сервисной программы" - *SRVPGM - аналог динамической библиотеки.

Ограничения:

  • Очередь не может содержать указателей (у нас эта функция не используется, но требует дополнительных действий в виде выравнивания сообщений на 16 байт)

  • Очередь всегда создается в домене *USER (мы работаем на уровне защиты 40 - создадим очередь в *SYSTEM и не сможем с ней работать)

  • Public Autority (права доступа) всегда устанавливаются в *ALL (в принципе, это можно поменять для уже существующего объекта командой WRKOBJ)

CL команды для работы с очередью

Следующий этап - минимальный набор команд системного языка CL для работы с очередью.

Команда в CL фактически является интерфейсом к какой-либо программе. Отличие в том, что программа запускается через

CALL <имя программы> [<(параметры)>]

а команда -

<Имя команды> [<Имя параметра>(<значение параметра>) ... ]

Если не указан какой-то из обязательных параметров - выводится интерфейс для ввода. Если не указан необязательный параметр - подставляется его значение по умолчанию.

Минимально необходимый набор команд:

  • CRTUSRQ - создание очереди

  • DLTUSRQ - удаление очереди

  • DSPUSRQ - свойств очереди

  • CLRUSRQ - очистка очереди

  • RCLUSRQRES - освобождение все выделенных заданию ресурсов USRQ API (отключение от всех очередей, освобождение динамических буферов, если таковые выделялись и т.п.)

Дело нехитрое. Пишется соответствующая программа, использующая нужные функции USRQ API и затем "оборачивается" описанием команды.

Например, для создания очереди пишется программа на RPG

      // --------------------------------------------------
      // Prototype for main procedure
      // --------------------------------------------------
      dcl-proc CRTUSRQPGM;
        dcl-pi *n;
          dsQName     likeds(t_dsQName);
          MaxLen      uns(5);
          Descr       char(50);
          Seq         char(1);
          KeyLen      uns(5);
          dsSize      likeds(t_dsSize);
          ReCreate    char(1);
        end-pi ;

Которая потом связывается с описанием команды

             CMD        PROMPT('Create User Queue')
             PARM       KWD(USRQ) TYPE(USRQNAM) MIN(1) +
                          PROMPT('User queue name')
 USRQNAM:    QUAL       TYPE(*NAME) LEN(10)
             QUAL       TYPE(*NAME) LEN(10) +
                          PROMPT('Library')
             PARM       KWD(MAXLEN) TYPE(*UINT2) RANGE(1 63948) +
                          MIN(1) PROMPT('Maximum entry data length')
             PARM       KWD(DESC) TYPE(*CHAR) LEN(50) +
                          PROMPT('Description')
             PARM       KWD(SEQ) TYPE(*CHAR) LEN(6) RSTD(*YES) +
                          DFT(*FIFO) +
                          SPCVAL((*FIFO F) (*LIFO L) (*KEYED K)) +
                          PROMPT('Queue type')
             PARM       KWD(KEYLEN) TYPE(*UINT2) +
                          RANGE(1 256) SPCVAL((*NONE 0)) +
                          DFT(*NONE) PROMPT('Entry key length')
             PARM       KWD(SIZE) TYPE(USRQSZ) +
                          PROMPT('Queue size')
 USRQSZ:     ELEM       TYPE(*INT4) +
                          SPCVAL((*MAX -2)) +
                          DFT(*MAX) +
                          PROMPT('Maximum entries')
             ELEM       TYPE(*INT4) DFT(16) +
                          PROMPT('Initial entries') 
             ELEM       TYPE(*INT4) DFT(16) +
                          PROMPT('Extention size')
             PARM       KWD(RCRT) TYPE(*CHAR) LEN(4) RSTD(*YES) +
                          DFT(*NO) +
                          SPCVAL((*YES Y) (*NO N)) +
                          PROMPT('Delete if exist?')

Также к нее пишется "подсказка" в специальном формате. В результате имеем

Интерфейс создается системой автоматически по описанию команды. Специально ничего рисовать не надо.

При нажатии F1

или на конкретном поле

Ну и аналогично для остальных команд.

SQL процедуры для работы с очередями

Такие интерфейсы больше нужны для сопровождения. И тут, также, минимальный набор:

  • USER_QUEUE_INFO - UDTF, возвращающая строку с параметрами заданной очереди

  • USER_QUEUE_INFO - VIEW, позволяющее выбирать одну или несколько очередей и получать информацию о них. В основе - UDTF USER_QUEUE_INFO

  • USER_QUEUE_ENTRIES - UDTF, выводящее список сообщений в очереди на данный момент (без их удаления из очереди)

  • CLEAR_USER_QUEUE - хранимая процедура для очистки очереди (удаление всех элементов)

В перспективе SQL средства для посылки и получения сообщения в/из очереди.

Тут никаких секретов - создается отдельная *SRVPGM где на каждый SQL объект пишется процедура с соответствующим интерфейсом и затем создается уже сам объект с привязкой к соответствующей процедуре. Все это достаточно неплохо документировано и не представляет каких-то сложностей.

В результате получаем

select * from table(USER_QUEUE_INFO('TSTQUE')) as t;

Аналогично с использованием View

select * from USRQ_INFO where USER_QUEUE_NAME = 'TSTQUE';

Если хотим посмотреть содержимое очереди

select * from table(USER_QUEUE_ENTRIES('TSTQUE')) as t;

Поскольку данная очередь с ключом, можем задать условие для ключа:

select * from table(USER_QUEUE_ENTRIES('TSTQUE', 'Y2KU', 'KEY', '0000000500', 'EQ')) as t;

В данном случае - все сообщения, у которых значение ключа равно ('EQ') '0000000500'

Аналогичные процедуры есть и для *DTAQ (но там они изначально в системе существуют).

Ну и зачем все это нужно?

Начнем с того, что все, что работает на IBM i, работает в каком-то задании (job). Фоновом или интерактивном (терминальная сессия пользователя - это тоже отдельное задание). Каждое задание изолировано от остальных - своя память, свои настройки окружения, свой, ведомый системой, joblog и т.п. Фактически - контейнер. Таких заданий одновременно крутится на сервере тысячи. И часто возникает необходимость в передаче данных из одного задания в другое (или раздача данных от одного задания нескольким). Да, есть механизм общей памяти, но с точки зрения разработки это достаточно дорогое удовольствие - нужно много внимания уделять процессам синхронизации. И в тех ситуациях, когда скорость передачи информации между заданиями не является узким местом, куда выгоднее использовать системные каналы связи.

В качестве такого транспорта можно использовать сокеты (например, socketpair или локальные именованные UNIX sockets). Или пайпы. Или очереди. Плюсы очередей

  • Очередь доступна всем в обоих направлениях

  • Очередь с ключом можно использовать для сообщений, адресованных конкретному получателю

  • Очередь контролируема - всегда можно посмотреть сколько сообщений находится в ней с данный момент времени

  • Очередь работает с сообщениями (пакетами) а не потоком байт

Достаточно типичной для нас является параллельная обработка большого (десятки и сотни миллионов) количества элементов, каждый из которых обрабатывается независимо от других. И стандартным решением таких задач является такой вот подход:

  • Есть два программы - головная и обработчик. Есть "конвейер" - транспорт куда одно задание выкладывает данные, а другие - разбирают.

  • Запускается головное задание. Оно инициализирует конвейер и запускает нужное количество заданий-обработчиков

  • Головное задание осуществляет подготовку набора данных для обработки. Например, это может быть выборка из одной или нескольких таблиц по заданным условиям.

  • Выбранные данные (элементы) группируются в пакеты (10-100 элементов в пакете) и выкладываются на конвейер. Когда данные закончились - выкладываются пустые пакеты-терминаторы. В количестве равном количеству обработчиков

  • Каждый обработчик забирает с конвейера очередной пакет и обрабатывает содержащиеся в нем элементы. Обработал все - берет следующий пакет и так далее пока не получит пустой пакет-терминатор. Как получил - все. Данных больше нет, можно завершать работу.

  • После окончания раздачи головное задание просто ждет пока все обработчики не закончат работу и после этого завершается само.

Здесь скорость транспорта не играет особой роли - основное время - обработка пакета. И вот тут очереди подходят как нельзя лучше. С ними не нужно думать о совместном доступе нескольких заданий - это все решает система. Всегда можно определить текущее количество пакетов в очереди что позволяет оценивать скорость разбора (и, при необходимости, увеличивать или уменьшать количество обработчиков или притормозить раздачу если очередь заполнилась выше заданного уровня).

Вот одно из типичных применений очереди.


Теги:
Хабы:
Всего голосов 5: ↑5 и ↓0+6
Комментарии15

Публикации

Истории

Работа

Программист С
31 вакансия

Ближайшие события