Radix sort — выжать всё



    Приветствую всех любителей алгоритмов. Хочу Вам поведать о своих изысканиях на тему сортировок в целом и углубиться в рассмотрение radix сортировки.

    Будучи разработчиком с многолетним стажем, всё чаще стал сталкиваться со странной тенденцией в разработке программного обеспечения:

    Не смотря на развитие аппаратной части современных вычислителей и усовершенствовании алгоритмов, в целом производительность кода не только не выросла, но и местами изрядно деградировала.

    Полагаю это связано с общей идеей отдать предпочтение быстрому программированию с использованием всё более мощных фреймворков и сверхвысокоуровневых / скриптовых языков программирования. Языки подобные Ruby или Python невероятно удобны для разработчика. Множество 'синтаксического сахара', я бы даже сказал 'Мёда', ускоряют разработку в разы, если не на порядки, но какой ценой. Как пользователя, меня напрягает низкая тепловая эффективность кода, про объёмы потребляемой памяти просто промолчу, однако главный ресурс человечества – время. Оно бесследно исчезает в бесконечных абстракциях, похищается анализаторами кода, вычищается умными сборщиками мусора. Я не призываю вернуться в прошлое, отказавшись от благ современной разработки, писать “дорогой” код, лишь предлагаю задуматься над возможным устранением “бутылочных горлышек” производительности там, где это возможно в типовых задачах. Часто этого можно достичь, оптимизировав высоконагруженные участки кода.

    Как одну из базовых задач оптимизации можно выделить сортировку. Тема настолько исследована, вдоль и поперёк, что, казалось бы, на этом пути трудно что-то обнаружить интересного. Однако мы попробуем.

    Сортировать малые массивы (менее миллиона элементов) мы не станем. Даже, если это делать крайне неэффективно, почувствовать просадки достаточно сложно, так как они нивелируются производительностью современного оборудования. Другое дело большие объёмы данных (миллиарды элементов), от грамотного подбора алгоритма очень сильно варьируется скорость выполнения.

    Все алгоритмы, основанные на сравнениях, в общем случае решают задачу сортировки не лучше чем O(n * Log n). При больших n эффективность быстро снижается и изменить эту ситуацию не представляется возможным. Эту тенденцию можно исправить, отказавшись от методов, основанных на сравнении элементов. Наиболее перспективным мне видится алгоритм Radix sort. Его вычислительная сложность O(k * n), где k количество проходов по массиву. Если n достаточно велико, а k наоборот очень мало, то данный алгоритм выигрывает у O(n * Log n).
    В современной архитектуре процессоров k практически всегда сводится к числу байт сортируемого числа. Так например, для DWord (int) k = 4, что весьма не много. Это некоторая “потенциальная яма” вычислений, которая обусловлена несколькими факторами:

    • Регистры процессора заточены под 8-битные операторы на аппаратном уровне
    • Используемый буфер подсчёта в алгоритме как раз укладывается в одну линию кэша L1 –процессора. (256 * 4-байтных числа)

    В истинности этого утверждения Вы можете попробовать убедиться самостоятельно. Однако на текущий момент времени делить по биту самый оптимальный вариант. Не исключаю, что когда кэш L1 процессоров разрастётся до 256 КБайт, более выгодным станет вариант делить по границе 2-байта.

    Эффективная реализация сортировки это не только алгоритм, но и тонкая инженерная задача по оптимизации кода.

    В данном решении алгоритм состоит из нескольких этапов:

    1. Выделение памяти под временный массив, на фоне общего времени исполнения не самая дешёвая операция, как вариант можно вынести за пределы функции и подавать параметром
    2. Эффективный подсчёт по всем битам сразу за один проход с максимальной эффективностью кодогенерации
    3. Расчёт смещений, также для всех проходов сразу
    4. Собственно пошаговая сортировка по битам от младших к старшим (LSD), в виде отдельной функции

    Алгоритм LSD применяем как более быстрый (по крайней мере, в моём варианте) за счёт более ровной обработки при различных флуктуациях входных данных.
    Исходный полностью отсортированный массив является наихудшим случаем для алгоритма, так как данные всё равно будут полностью сортироваться. Зато на случайных или перемешанных данных Radix sort чрезвычайно эффективен.

    Сортировать простой массив чисел приходится редко, обычно нужен словарь вида: ключ – значение, где значение может быть индексом или указателем.
    Для универсализации применим структуру вида:

    typedef struct TNode {
      //unsigned long long key;
      unsigned int key;
      //unsigned short key;
      //unsigned  char key;
      unsigned int value;
      //unsigned int value1;
      //unsigned int value2;
    } TNode;
    

    Естественно, чем меньше битность ключа, тем быстрее работает алгоритм. Поскольку алгоритм работает не с указателями на структуру, а фактически гоняет её в памяти. При минимуме полей, мы получаем высокую скорость. С увеличением объёма полей данных структуры эффективность сильно падает.

    Раннее я уже писал заметку с реализацией Radix sort на паскаль, однако “сегрегация” языков программирования набирает невиданные ранее темпы среди завсегдатаев данного ресурса. Поэтому код для данной статьи я решил частично переписать на 'си' как более 'правильный' распространённый язык в сообществе программистов (Хотя на выходе ассемблерная кодогенерация с Pascal местами практически идентичная). При сборке рекомендую применять ключ –O2 или выше.

    В отличие от прошлой реализации, применив указатели в адресации цикла вместо операций побитного сдвига, удалось получить ещё большую прибавку в 1-2% к скорости алгоритма.

    C
    #include <stdio.h>
    #include <omp.h>
    
    #include <time.h>
    #include <windows.h>
    #include <algorithm>
    //=============================================================
    typedef struct TNode {
      //unsigned long long key;
      unsigned int key;
      //unsigned short key;
      //unsigned  char key;
      unsigned int value;
      //unsigned int value1;
      //unsigned int value2;
    } TNode;
    //=============================================================
    void RSort_step(TNode *source, TNode *dest, unsigned int n, unsigned int *offset, unsigned char sortable_bit)
    {
      unsigned char *b = (unsigned char*)&source[n].key + sortable_bit;
      TNode *v = &source[n];
      while (v >= source)
      {
        dest[--offset[*b]] = *v--;
        b -= sizeof(TNode);
      }
    }
    //=============================================================
    void RSort_Node(TNode *m, unsigned int n)
    {
      // Выделяем память под временный массив
      TNode *m_temp = (TNode*)malloc(sizeof(TNode) * n);
    
      // Заводим массив корзин
      unsigned int s[sizeof(m->key) * 256] = {0};
      // Заполняем массив корзин для всех разрядов
      unsigned char *b = (unsigned char*)&m[n-1].key;
      while (b >= (unsigned char*)&m[0].key)
      {
        for (unsigned int digit=0; digit< sizeof(m->key); digit++)
        {
          s[*(b+digit)+256*digit]++;
        }
        b -= sizeof(TNode);
      }
    
      // Пересчитываем смещения для корзин
      for (unsigned int i = 1; i < 256; i++)
      {
        for (unsigned int digit=0; digit< sizeof(m->key); digit++)
        {
          s[i+256*digit] += s[i-1+256*digit];
        }
      }
    
      // Вызов сортировки по битам от младших к старшим (LSD)
      for (unsigned int digit=0; digit< sizeof(m->key); digit++)
      {
        RSort_step(m, m_temp, n-1, &s[256*digit] ,digit);
        TNode *temp = m;
        m = m_temp;
        m_temp = temp;
      }
    
      // Если ключ структуры однобайтовый, копируем отсортированное в исходный массив
      if (sizeof(m->key)==1)
      {
        TNode *temp = m;
        m = m_temp;
        m_temp = temp;
        memcpy(m, m_temp, n * sizeof(TNode));
      }
    
      free(m_temp);
    }
    //=============================================================
    int main()
    {
      unsigned int n=10000000;
    
      LARGE_INTEGER frequency;
      LARGE_INTEGER t1, t2, t3, t4;
      double elapsedTime;
    
      TNode *m1 = (TNode*)malloc(sizeof(TNode) * n);
      TNode *m2 = (TNode*)malloc(sizeof(TNode) * n);
      srand(time(NULL));
    
      for (unsigned int i=0; i<n; i++)
      {
          m1[i].key = rand()*RAND_MAX+rand();
          m2[i].key = m1[i].key;
      }
    
      QueryPerformanceFrequency(&frequency);
      QueryPerformanceCounter(&t1);
    
      RSort_Node(m1, n);
    
      QueryPerformanceCounter(&t2);
      elapsedTime=(float)(t2.QuadPart-t1.QuadPart)/frequency.QuadPart;
      printf("The RSort:          %.5f seconds\n", elapsedTime);
    
      QueryPerformanceFrequency(&frequency);
      QueryPerformanceCounter(&t3);
    
      std::sort(m2, m2+n,[](const TNode &a, const TNode &b){return a.key < b.key;});
    
      QueryPerformanceCounter(&t4);
      elapsedTime=(float)(t4.QuadPart-t3.QuadPart)/frequency.QuadPart;
      printf("The std::sort:      %.5f seconds\n", elapsedTime);
    
      for (unsigned int i=0; i<n; i++) 
      {
        if (m1[i].key!=m2[i].key) 
        {
          printf("\n\n!!!!!\n");
          break;
        }
      }
    
      free(m1);
      free(m2);
      return 0;
    }
    


    Pascal
    program SORT;
    uses
      SysUtils, Windows;
    //=============================================================
    type TNode = record
         key : Longword;
         //value : Longword;
    end;
    type ATNode = array of TNode;
    //=============================================================
    procedure RSort_Node(var m: array of TNode);
    //------------------------------------------------------------------------------
        procedure Sort_step(var source, dest: array of TNode; len : Longword; offset: PLongword; const num: Byte);
            var b : ^Byte;
                v : ^TNode;
            begin
              b:=@source[len];
              v:=@source[len];
              inc(b,num);
              while v >= @source do
              begin
                dec(offset[b^]);
                dest[offset[b^]] := v^;
                dec(b,SizeOf(TNode));
                dec(v);
              end;
            end;
    //------------------------------------------------------------------------------
    var // Объявляем массив корзин первым, для выравнивания на стеке
        s: array[0..1023] of Longword =(
    0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0);
        i : Longword;
        b : ^Byte;
        p : Pointer;
    begin
    
      GetMem(p, SizeOf(TNode)*Length(m));
    
      // Подсчёт корзин
      b:=@m[High(m)];
      while (b >= @m[0]) do
      begin
        Inc(s[(b+3)^+256*3]);
        Inc(s[(b+2)^+256*2]);
        Inc(s[(b+1)^+256*1]);
        Inc(s[(b+0)^+256*0]);
        dec(b,SizeOf(TNode));
      end;
    
      // Пересчёт смещений для корзин
      for i := 1 to 255 do                             
      begin
        Inc(s[i+256*0], s[i-1+256*0]);
        Inc(s[i+256*1], s[i-1+256*1]);
        Inc(s[i+256*2], s[i-1+256*2]);
        Inc(s[i+256*3], s[i-1+256*3]);
      end;
    
      // Вызов сортировки по битам от младших к старшим
      Sort_step(m, ATNode(p), High(m), @s[256*0], 0);                  
      Sort_step(ATNode(p), m, High(m), @s[256*1], 1);
      Sort_step(m, ATNode(p), High(m), @s[256*2], 2);
      Sort_step(ATNode(p), m, High(m), @s[256*3], 3);
    
      FreeMem(p);
    end;
    //=============================================================
     procedure test();
      const
        n = 10000000;
      var
        m1: array of TNode;  
        i,j,k,j1,j2: Longword;
    
        iCounterPerSec: TLargeInteger;
        T1, T2: TLargeInteger; //значение счётчика ДО и ПОСЛЕ операции  
      begin
        SetLength(m1,n);
    
        for i := 0 to n - 1 do
        begin
          m1[i].key := Random(65536 * 65536);
        end;  
    
      QueryPerformanceFrequency(iCounterPerSec);//определили частоту счётчика
      QueryPerformanceCounter(T11); //засекли время начала операции
    
      RSort_Node(m1);
        
      QueryPerformanceCounter(T22);//засекли время окончания
      WRITELN('1='+FormatFloat('0.0000', (T22 - T11)/iCounterPerSec) + ' sec.');//вывели количество секунд на выполнение операции
    
        SetLength(m, 0);
      end;
      //------------------------------------------------------------------------------
    begin
      test();
      Readln();
      exit;
    end.   
    


    Я медитировал на данный код очень долго, но написать лучше мне не удалось. Возможно Вы подскажите, как сделать этот код быстрее.

    А если хочется ещё немного быстрее?

    Дальнейшим логичным шагом, как мне виделось, использовать видеокарту. На просторах интернета я встречал множество рассуждений о том, что Radix sort отлично параллелился на множестве ядер видеокарты (чуть ли не лучший способ сортировки не видеокарте).

    Поддавшись искушению получить дополнительную производительность, реализовал несколько вариантов сортировки на знакомом мне OpenCL. К сожалению у меня нет возможность проверить реализацию на топовых видеокартах, но на моей GEFORCE GTX 750 TI алгоритм проигрывал однопоточной реализации на CPU, за счёт того, что данные нужно отправить на видеокарту, а потом забрать обратно. Если не гонять данные по шине туда-сюда скорость была бы приемлемой, но всё равно не ‘фонтан’. Есть ещё одно замечание. В OpenCL нити выполнения не синхронны (рабочие группы выполняются в произвольном порядке, насколько мне известно, в CUDA это не так, поправьте, кто знает), что мешает написать в данном случае более эффективный код.

    Код из серии: можно ли создать троллейбус… обработку в OpenCL на Дельфи... но зачем?
    Переписывать на 'Си' поленился, выкладываю как есть.
    program project1;
    
    uses Cl, SysUtils, Windows, Math;
    //------------------------------------------------------------------------------
    function OCL_Get_Prog(context: cl_context; pdevice_id: Pcl_device_id; name: PChar; S:AnsiString):cl_kernel;
    var   Tex:PCHAR;
          Len:QWORD;
          PLen:PQWORD;
          Prog:cl_program;
          kernel:cl_kernel;
          Ret:cl_int;
    begin
       Tex:=@S[1];
       Len:=Length(S);
       PLen:=@LEN;
       prog:=nil;
       kernel:=nil;
    
         // создать бинарник из кода программы
         prog:= clCreateProgramWithSource(context, 1, @Tex, @Len, ret);
         if CL_SUCCESS<>ret then writeln('clCreateProgramWithSource Error ',ret);
         // скомпилировать программу
         ret:= clBuildProgram(prog, 1, pdevice_id, nil, nil, nil);
         if CL_SUCCESS<>ret then writeln('clBuildProgram            Error ',ret);
         // создать объект ядра для исполнения программы
         kernel:= clCreateKernel(prog, name, ret);
         if  ret<>CL_SUCCESS then writeln('clCreateKernel            Error ',ret);
         // удаляем программу
         clReleaseProgram(prog);
         OCL_Get_Prog:=kernel;
    end;
    //------------------------------------------------------------------------------
    var
       context:cl_context;
       kernel1, kernel2, kernel3, kernel4 :cl_kernel;
       Ret:cl_int;
    
       valueSize : QWord =0;
       s0 : PChar;
    
       platform_id:cl_platform_id;
       ret_num_platforms:cl_uint;
       ret_num_devices:cl_uint;
       device_id:cl_device_id;
       command_queue:cl_command_queue;
       S1,S2,S3,S4 : AnsiString;
       memobj1, memobj2, memobj3 :cl_mem;
       mem:Array of LongWord;
       size:LongWord;
       g_works, l_works :LongInt;
    
       iCounterPerSec: TLargeInteger;
       T1, T2, T3, T4: TLargeInteger; //значение счётчика ДО и ПОСЛЕ операции
       i,j,step:LongWord;
    
    procedure exchange_memobj(var a,b:cl_mem);
    var c:cl_mem;
    begin
      c:=a;
      a:=b;
      b:=c;
    end;
    
    begin
       //---------------------------------------------------------------------------
       S1 :=
       // Шаг сортировки 1 (Oбнуление массива счётчика)
       '__kernel void sort1(__global uint* m1) {'+
       ' uint g_id = get_global_id(0);'+
       ' m1[g_id] = 0;'+
       '}';
       //---------------------------------------------------------------------------
       S2 :=
       // Шаг сортировки 2 (Подсчёт индексов по блокам)
       '__kernel void sort2(__global uint* m1, __global uint* m2, const uint len, const uint digit) {'+
       ' uint g_id = get_global_id(0);'+
       ' uint size = get_global_size(0);'+
       ' uint a = g_id / len;'+
       ' uchar key = (m1[g_id] >> digit);'+
       ' atomic_inc(&m2[key]);'+
       ' atomic_inc(&m2[256 * a + key + 256]);'+
       '}';
       //---------------------------------------------------------------------------
       S3 :=
       // Шаг сортировки 3 (Расчёт смещения)
       '__kernel void sort3(__global uint* m1, const uint len) {'+
       ' uint l_id = get_global_id(0);'+
    
       ' for (uint i = 0; i < 8; i++) {'+
       '  uint offset = 1 << i;'+
       '  uint add = (l_id>=offset) ? m1[l_id - offset] : 0;'+
       '  barrier(CLK_GLOBAL_MEM_FENCE);'+
       '  m1[l_id] += add;'+
       '  barrier(CLK_GLOBAL_MEM_FENCE);'+
       ' }'+
    
       ' for (uint i = 1; i < 1024; i++) {'+
       '  m1[i*256+l_id + 256] += m1[(i-1)*256+l_id + 256];'+
       ' }'+
    
       '  barrier(CLK_GLOBAL_MEM_FENCE);'+
    
       ' if (l_id>0) {'+
       '  for (uint i = 0; i < 1024; i++) {'+
       '   m1[i*256+l_id + 256] += m1[l_id-1];'+
       '  }'+
       ' }'+
    
       '}';
    
       //---------------------------------------------------------------------------
       S4 :=
       // Шаг сортировки 4
       '__kernel void sort4(__global uint* m1, __global uint* m2, __global uint* m3, const uint digit, const uint len) {'+
       ' uint g_id = get_global_id(0);'+
       ' for (int i = len-1; i >= 0; i--) {'+      // обратить внимание цикл обратный!
       '  uchar key = (m1[g_id*len+i] >> digit);'+
       '  m2[--m3[g_id*256 + key + 256]] = m1[g_id*len+i];'+
       ' }'+
       '}';
       //---------------------------------------------------------------------------
    
       // Получаем доступные платформы
       ret := clGetPlatformIDs(1,@platform_id,@ret_num_platforms);
       if CL_SUCCESS<>ret then writeln('clGetPlatformIDs          Error ',ret);
       // Получаем доступные устройства
       ret := clGetDeviceIDs(platform_id, CL_DEVICE_TYPE_GPU, 1, @device_id, @ret_num_devices);
       if CL_SUCCESS<>ret then writeln('clGetDeviceIDs            Error ',ret);
    
       clGetDeviceInfo(device_id, CL_DEVICE_NAME, 0, nil, valueSize);
       GetMem(s0, valueSize);
       clGetDeviceInfo(device_id, CL_DEVICE_NAME, valueSize, s0, valueSize);
       Writeln('DEVICE_NAME:  '+s0);
       FreeMem(s0);
    
       // Создаём контекст для устройства
       context:= clCreateContext(nil, 1, @device_id, nil, nil, ret);
       if CL_SUCCESS<>ret then writeln('clCreateContext           Error ',ret);
    
       // Создаём очередь команд для контекста и устройства
       command_queue := clCreateCommandQueue(context, device_id, 0, ret);
       if CL_SUCCESS<>ret then writeln('clCreateContext           Error ',ret);
    
       //-------------------------------------------------------------
       kernel1 := OCL_Get_Prog(context, @device_id, 'sort1', S1);
       kernel2 := OCL_Get_Prog(context, @device_id, 'sort2', S2);
       kernel3 := OCL_Get_Prog(context, @device_id, 'sort3', S3);
       kernel4 := OCL_Get_Prog(context, @device_id, 'sort4', S4);
       //-------------------------------------------------------------
    
       size:=256*256*16*10;
    
       g_works := size;
       l_works := 256;
    
       Randomize;
       SetLength(mem, size);
    
       for i:=0 to size-1 do mem[i]:=random(256*256*256*256);
    
       // Создаём буферы для ввода вывода данных
       memobj1 := clCreateBuffer(context, CL_MEM_READ_WRITE, size * sizeof(cl_uint), nil, ret);
       if  ret<>CL_SUCCESS then writeln('clCreateBuffer1            Error ',ret);
       memobj2 := clCreateBuffer(context, CL_MEM_READ_WRITE, size * sizeof(cl_uint), nil, ret);
       if  ret<>CL_SUCCESS then writeln('clCreateBuffer2            Error ',ret);
       memobj3 := clCreateBuffer(context, CL_MEM_READ_WRITE, (256+256*1024) * sizeof(cl_uint), nil, ret);
       if  ret<>CL_SUCCESS then writeln('clCreateBuffer3            Error ',ret);
    
       QueryPerformanceFrequency(iCounterPerSec); //определили частоту счётчика
       QueryPerformanceCounter(T1); //засекли время начала операции
    
       // Записываем данные в буфер
       ret := clEnqueueWriteBuffer(command_queue, memobj1, CL_TRUE, 0, size * sizeof(cl_int), @mem[0], 0, nil, nil);
       if  ret<>CL_SUCCESS then writeln('clEnqueueWriteBuffer      Error ',ret);
    
       QueryPerformanceCounter(T2);//засекли время окончания
       Writeln('write       '+FormatFloat('0.0000', (T2 - T1)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
    
       QueryPerformanceFrequency(iCounterPerSec); //определили частоту счётчика
       QueryPerformanceCounter(T3); //засекли время начала операции
    
    
       for step:=0 to 3 do
       begin
    
       //-------------------------------------------------------------
       QueryPerformanceFrequency(iCounterPerSec); //определили частоту счётчика
       QueryPerformanceCounter(T1); //засекли время начала операции
       //-------------------------------------------------------------
       // 1 ШАГ (очитска массива)
       ret:= clSetKernelArg(kernel1, 0, sizeof(cl_mem), @memobj3 );
       if  ret<>CL_SUCCESS then writeln('clSetKernelArg_1_1            Error ',ret);
    
       i := 256+256*1024;
       ret:= clEnqueueNDRangeKernel(command_queue, kernel1, 1, nil, @i, nil, 0, nil, nil);
       if  ret<>CL_SUCCESS then writeln('clEnqueueNDRangeKernel_1    Error ',ret);
       //-------------------------------------------------------------
    
       clFinish(command_queue); // Ожидаем завершения всех команд в очереди
       QueryPerformanceCounter(T2);  //засекли время окончания
       Writeln('step 1      '+FormatFloat('0.0000', (T2 - T1)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
    
       QueryPerformanceFrequency(iCounterPerSec); //определили частоту счётчика
       QueryPerformanceCounter(T1);  //засекли время начала операции
       //-------------------------------------------------------------
       // 2 ШАГ (заполнение счётчиков)
    
       ret:= clSetKernelArg(kernel2, 0, sizeof(cl_mem), @memobj1 );
       if  ret<>CL_SUCCESS then writeln('clSetKernelArg_2_1            Error ',ret);
       ret:= clSetKernelArg(kernel2, 1, sizeof(cl_mem), @memobj3 );
       if  ret<>CL_SUCCESS then writeln('clSetKernelArg_2_2            Error ',ret);
       j := size div (1024);
       ret:= clSetKernelArg(kernel2, 2, sizeof(j), @j );
       if  ret<>CL_SUCCESS then writeln('clSetKernelArg_2_3            Error ',ret);
    
       j:=step*8;
       ret:= clSetKernelArg(kernel2, 3, sizeof(j), @j );
       if  ret<>CL_SUCCESS then writeln('clSetKernelArg_2_4            Error ',ret);
    
       i := size;
       ret:= clEnqueueNDRangeKernel(command_queue, kernel2, 1, nil, @i, nil, 0, nil, nil);
       if  ret<>CL_SUCCESS then writeln('clEnqueueNDRangeKernel_2    Error ',ret);
    
       //-------------------------------------------------------------
       clFinish(command_queue); // Ожидаем завершения всех команд в очереди
       QueryPerformanceCounter(T2);//засекли время окончания
       Writeln('step 2      '+FormatFloat('0.0000', (T2 - T1)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
       QueryPerformanceFrequency(iCounterPerSec);//определили частоту счётчика
       QueryPerformanceCounter(T1); //засекли время начала операции
       //-------------------------------------------------------------
       // 3 ШАГ (пересчёт смещений)
    
       ret:= clSetKernelArg(kernel3, 0, sizeof(cl_mem), @memobj3 );
       if  ret<>CL_SUCCESS then writeln('clSetKernelArg_3_1            Error ',ret);
    
       j := size;
       ret:= clSetKernelArg(kernel3, 1, sizeof(j), @j );
       if  ret<>CL_SUCCESS then writeln('clSetKernelArg_3_3            Error ',ret);
    
       j := 256;
       ret:= clEnqueueNDRangeKernel(command_queue, kernel3, 1, nil, @j, @j, 0, nil, nil);
       if  ret<>CL_SUCCESS then writeln('clEnqueueNDRangeKernel_3    Error ',ret);
    
       //-------------------------------------------------------------
       clFinish(command_queue); // Ожидаем завершения всех команд в очереди
       QueryPerformanceCounter(T2);//засекли время окончания
       Writeln('step 3      '+FormatFloat('0.0000', (T2 - T1)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
       QueryPerformanceFrequency(iCounterPerSec);//определили частоту счётчика
       QueryPerformanceCounter(T1); //засекли время начала операции
       //-------------------------------------------------------------
    
       // 4 ШАГ (сортировка)
       ret:= clSetKernelArg(kernel4, 0, sizeof(cl_mem), @memobj1 );
       if  ret<>CL_SUCCESS then writeln('clSetKernelArg_4_1            Error ',ret);
       ret:= clSetKernelArg(kernel4, 1, sizeof(cl_mem), @memobj2 );
       if  ret<>CL_SUCCESS then writeln('clSetKernelArg_4_2            Error ',ret);
       ret:= clSetKernelArg(kernel4, 2, sizeof(cl_mem), @memobj3 );
       if  ret<>CL_SUCCESS then writeln('clSetKernelArg_4_3            Error ',ret);
    
       j:=step*8;
       ret:= clSetKernelArg(kernel4, 3, sizeof(j), @j );
       if  ret<>CL_SUCCESS then writeln('clSetKernelArg_4_4            Error ',ret);
    
       j := size div (1024);
       ret:= clSetKernelArg(kernel4, 4, sizeof(j), @j );
       if  ret<>CL_SUCCESS then writeln('clSetKernelArg_4_5            Error ',ret);
    
       i := (1024);  // глобально ядер
       ret:= clEnqueueNDRangeKernel(command_queue, kernel4, 1, nil, @i, nil, 0, nil, nil);
       if  ret<>CL_SUCCESS then writeln('clEnqueueNDRangeKernel_4    Error ',ret);
       clFinish(command_queue);
       // Меняем местами указатели на массивы после каждого шага
       // Результат останется в memobj1
       exchange_memobj(memobj2, memobj1);
       //-------------------------------------------------------------
       clFinish(command_queue); // Ожидаем завершения всех команд в очереди
       QueryPerformanceCounter(T2);//засекли время окончания
       Writeln('step 4      '+FormatFloat('0.0000', (T2 - T1)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
       //-------------------------------------------------------------
       end;
    
       QueryPerformanceCounter(T4);//засекли время окончания
       Writeln('all not R/W '+FormatFloat('0.0000', (T4 - T3)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
    
    
       QueryPerformanceFrequency(iCounterPerSec);//определили частоту счётчика
       QueryPerformanceCounter(T1); //засекли время начала операции
    
       // Считываем данные из буфера
       ret:= clEnqueueReadBuffer(command_queue, memobj1, CL_TRUE, 0, size * sizeof(cl_int), @mem[0], 0, nil, nil);
       if  ret<>CL_SUCCESS then writeln('clEnqueueReadBuffer       Error ',ret);
    
       QueryPerformanceCounter(T2);//засекли время окончания
       Writeln('Read        '+FormatFloat('0.0000', (T2 - T1)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
    
    
        // Освобождаем ресурсы
       clReleaseMemObject(memobj1);           // Высвобождаем память буфера
       clReleaseMemObject(memobj2);
       clReleaseMemObject(memobj3);
       clReleaseKernel(kernel1);              // Высвобождаем объект исполнения
       clReleaseKernel(kernel2);
       clReleaseKernel(kernel3);
       clReleaseKernel(kernel4);
       clReleaseCommandQueue(command_queue);  // Высвобождаем очередь
       clReleaseContext(context);             // Высвобождаем контекст устройства
       //-------------------------------------------------------------
       SetLength(mem, 0);
       readln;
    end.  
    


    Остался ещё один не опробованный вариант – многопоточность. Уже используя только голый C и библиотеку OpenMP, я решил узнать какой эффект даст использование множества ядер CPU.

    Изначально идея заключалась в разбивке исходного массива на равные части, передача их в отдельные потоки, а затем склейка их слиянием (merge sort). Сортировка шла не плохо, а вот слияние сильно замедляло всю конструкцию, каждая склейка равносильна дополнительному проходу по массиву. Эффект был хуже чем работа в один поток. Забраковал реализацию, как не практичную.

    В итоге применил параллельную сортировку очень похожую на ту, что применялась на GPU. Вот с ней уже всё получилось на много лучше.

    Особенности параллельной обработки:
    В текущей реализации максимально возможным образом ушёл от проблем синхронизации потоков (атомарные функции так же не используются в силу того, что разные потоки читают разные, хотя и порой соседние блоки памяти). Потоки вещь не бесплатная, на их создание и синхронизацию процессор тратит драгоценные микросекунды. Сводя барьеры к минимуму можно немного сэкономить. К сожалению, поскольку для работы всех потоков используется одна и та же память кэша L3 и оперативки, общий выигрыш алгоритма не столь значителен в силу закона Амдала, при увеличении количества потоков.

    C
    #include <stdio.h>
    #include <omp.h>
    //=============================================================
    typedef struct TNode {
      //unsigned long long key;
      unsigned int key;
      //unsigned short key;
      //unsigned  char key;
      unsigned int value;
      //unsigned int value1;
      //unsigned int value2;
    } TNode;
    //=============================================================
    void RSort_Parallel(TNode *m, unsigned int n)
    {
      // Количество задействованных потоков
      unsigned char threads = omp_get_num_procs();
      // Выделяем память под временный массив
      TNode *m_temp = (TNode*)malloc(sizeof(TNode) * n);
      unsigned int *s = (unsigned int*)malloc(sizeof(unsigned int) * 256 * threads);
    
      #pragma omp parallel num_threads(threads)
      {
        TNode *source = m;
        TNode *dest = m_temp;
        unsigned int l = omp_get_thread_num();
        unsigned int div = n / omp_get_num_threads();
        unsigned int mod = n % omp_get_num_threads();
        unsigned int left_index  = l < mod ? (div + (mod == 0 ? 0 : 1)) * l : n - (omp_get_num_threads() - l) * div;
        unsigned int right_index = left_index + div - (mod > l ? 0 : 1);
    
        for (unsigned int digit=0; digit< sizeof(m->key); digit++)
        {
          unsigned int s_sum[256] = {0};
          unsigned int s0[256] = {0};
          unsigned char *b1 = (unsigned char*)&source[right_index].key;
          unsigned char *b2 = (unsigned char*)&source[left_index].key;
          while (b1 >= b2)
          {
            s0[*(b1+digit)]++;
            b1 -= sizeof(TNode);
          }
          for (unsigned int i=0; i<256; i++)
          {
            s[i+256*l] = s0[i];
          }
    
          #pragma omp barrier
          for (unsigned int j=0; j<threads; j++)
          {
            for (unsigned int i=0; i<256; i++)
            {
              s_sum[i] += s[i+256*j];
              if (j<l)
              {
                s0[i] += s[i+256*j];
              }
            }
          }
    
          for (unsigned int i=1; i<256; i++)
          {
            s_sum[i] += s_sum[i-1];
            s0[i] += s_sum[i-1];
          }
          unsigned char *b = (unsigned char*)&source[right_index].key + digit;
          TNode *v1 = &source[right_index];
          TNode *v2 = &source[left_index];
          while (v1 >= v2)
          {
            dest[--s0[*b]] = *v1--;
            b -= sizeof(TNode);
          }
          #pragma omp barrier
          TNode *temp = source;
          source = dest;
          dest = temp;
        }
      }
    
      // Если ключ структуры однобайтовый, просто копируем в исходный массив
      if (sizeof(m->key)==1)
      {
        memcpy(m, m_temp, n * sizeof(TNode));
      }
    
      free(s);
      free(m_temp);
    }
    //=============================================================
    int main()
    {
      unsigned int n=10000000;
    
      TNode *m1 = (TNode*)malloc(sizeof(TNode) * n);
      srand(time(NULL));
    
      for (unsigned int i=0; i<n; i++)
      {
        m1[i].key = rand()*RAND_MAX+rand();
      }
    
      RSort_Parallel(m1, n);
    
      free(m1);
      return 0;
    }
    


    Надеюсь было интересно.

    С уважением, Ваш покорный слуга, Rebuilder.

    P.S.

    Сравнение алгоритмов по скорости не привожу сознательно. Результаты сравнений, предложения и критика приветствуются.

    P.P.S.

    Ads
    AdBlock has stolen the banner, but banners are not teeth — they will be back

    More

    Comments 42

      +1
      А почему проход по массиву происходит задом наперед?
      Ну и цифры хорошо бы увидеть.
        0
        Для максимального ускорения процедуры расчёта смещения. Мы меняем инструкции на более быстрый пересчёт индексов. Чтоб скомпенсировать данный эффект, проще инвертировать основной цикл прохода. На производительность самого цикла ни как не влияет. Выборка данных из памяти в обоих направления идёт одинаково.
        Данные по скорости хорошие, но не хочу стать заложником ошибки выжившего, выдав свои локальные данные за факты.
          +3
          Странные у Вас данные. Стандартный префетчинг в процессорах работает «наперед», поэтому прямой проход по массиву будет значительно быстрее обратного. К тому же, довольно странно читать ключ побайтово — да, сдвиги/побитовые «И» занимают время, но не так много, как чтение из памяти; если при shuffle'е массивов такая методика еще имеет смысл, то при подсчете это необходимо убрать.
          В общем, я немного переделал Ваш код на С, и получилось чуть по-быстрее.
          Код
          #include <stdio.h>
          #include <omp.h>
          
          #include <time.h>
          #include <windows.h>
          #include <algorithm>
          //=============================================================
          typedef struct TNode {
            //unsigned long long key;
            unsigned int key;
            //unsigned short key;
            //unsigned  char key;
            unsigned int value;
            //unsigned int value1;
            //unsigned int value2;
          } TNode;
          //=============================================================
          void RSort_step(TNode *source, TNode *dest, unsigned int n, unsigned int *offset, unsigned char sortable_bit)
          {
            unsigned char *b = (unsigned char*)&source[n].key + sortable_bit;
          
            for (unsigned int i = 0; i < n; ++i)
            {
                TNode const& src = source[i];
                unsigned int off = (src.key >> (sortable_bit * 8)) & 0xFF;
                dest[offset[off]++] = src;
            }
          }
          //=============================================================
          void RSort_Node(TNode *m, unsigned int n)
          {
            // Выделяем память под временный массив
            TNode *m_temp = (TNode*)malloc(sizeof(TNode) * n);
          
            // Заводим массив корзин
            unsigned int s[sizeof(m->key) * 256] = {0};
            // Заполняем массив корзин для всех разрядов
            for (unsigned int i = 0; i < n; ++i)
            {
              unsigned int key = m[i].key;
              for (unsigned int digit=0; digit< sizeof(m->key); digit++)
              {
                s[((key >> (digit * 8)) & 0xFF) +256*digit]++;
              }
            }
          
            // Пересчитываем смещения для корзин
            for (unsigned int digit=0; digit< sizeof(m->key); digit++)
            {
              unsigned int off = 0;
              for (unsigned int i = 0; i < 256; i++)
              {
                unsigned int value = s[i+256*digit];
                s[i+256*digit] = off;
                off += value;
              }
            }
          
            // Вызов сортировки по битам от младших к старшим (LSD)
            for (unsigned int digit=0; digit< sizeof(m->key); digit++)
            {
              RSort_step(m, m_temp, n, &s[256*digit] ,digit);
              TNode *temp = m;
              m = m_temp;
              m_temp = temp;
            }
          
            // Если ключ структуры однобайтовый, копируем отсортированное в исходный массив
            if (sizeof(m->key)==1)
            {
              TNode *temp = m;
              m = m_temp;
              m_temp = temp;
              memcpy(m, m_temp, n * sizeof(TNode));
            }
          
            free(m_temp);
          }
          //=============================================================
          int main()
          {
            unsigned int n=10000000;
          
            LARGE_INTEGER frequency;
            LARGE_INTEGER t1, t2, t3, t4;
            double elapsedTime;
          
            TNode *m1 = (TNode*)malloc(sizeof(TNode) * n);
            TNode *m2 = (TNode*)malloc(sizeof(TNode) * n);
            srand(time(NULL));
          
            for (unsigned int i=0; i<n; i++)
            {
                m1[i].key = rand()*RAND_MAX+rand();
                m2[i].key = m1[i].key;
            }
          
            QueryPerformanceFrequency(&frequency);
            QueryPerformanceCounter(&t1);
          
            RSort_Node(m1, n);
          
            QueryPerformanceCounter(&t2);
            elapsedTime=(float)(t2.QuadPart-t1.QuadPart)/frequency.QuadPart;
            printf("The RSort:          %.5f seconds\n", elapsedTime);
          
            QueryPerformanceFrequency(&frequency);
            QueryPerformanceCounter(&t3);
          
            std::sort(m2, m2+n,[](const TNode &a, const TNode &b){return a.key < b.key;});
          
            QueryPerformanceCounter(&t4);
            elapsedTime=(float)(t4.QuadPart-t3.QuadPart)/frequency.QuadPart;
            printf("The std::sort:      %.5f seconds\n", elapsedTime);
          
            for (unsigned int i=0; i<n; i++) 
            {
              if (m1[i].key!=m2[i].key) 
              {
                printf("\n\n!!!!!\n");
                break;
              }
            }
          
            free(m1);
            free(m2);
            return 0;
          }

          Вывод до модификации:
          The RSort: 0.27004 seconds
          The std::sort: 0.80469 seconds

          После:
          The RSort: 0.22043 seconds
          The std::sort: 0.80015 seconds
            0
            Искренне благодарю Вас за интерес. Однако, хотел уточнить, с какими ключами Вы компилировали свой код?
            У меня: GCC WIN 10 -o3 получаются противоположные результаты.
            код
            #include <stdio.h>
            #include <stdlib.h>
            
            #include <time.h>
            #include <windows.h>
            //========================================================
            typedef struct TNode {
                unsigned int key;
                unsigned int value;
            } TNode;
            //========================================================
            void Sort_step(TNode *source, TNode *dest, unsigned int n, unsigned int *offset, unsigned char sortable_bit)
            {
              unsigned char *b = (unsigned char*)&source[n].key + sortable_bit;
              TNode *v = &source[n];
              while (v >= source)
              {
                dest[--offset[*b]] = *v--;
                b -= sizeof(TNode);
              }
            }
            //========================================================
            void RSort_Node(TNode *m, unsigned int n)
            {
              // Выделяем память под временный массив
              TNode *m_temp = (TNode*)malloc(sizeof(TNode) * n);
              // Заводим массив корзин
              unsigned int s[sizeof(m->key) * 256] = {0};
              // Заполняем массив корзин для всех разрядов
              unsigned char *b = (unsigned char*)&m[n-1].key;
              while (b >= (unsigned char*)&m[0].key)
              {
                for (unsigned int digit=0; digit< sizeof(m->key); digit++)
                {
                  s[*(b+digit)+256*digit]++;
                }
                b -= sizeof(TNode);
              }
            
              // Пересчитываем смещения для корзин
              for (unsigned int i = 1; i < 256; i++)
              {
                for (unsigned int digit=0; digit< sizeof(m->key); digit++)
                {
                  s[i+256*digit] += s[i-1+256*digit];
                }
              }
            
              // Вызов сортировки по битам от младших к старшим (LSD)
              for (unsigned int digit=0; digit< sizeof(m->key); digit++)
              {
                Sort_step(m, m_temp, n-1, &s[256*digit] ,digit);
                TNode *temp = m;
                m = m_temp;
                m_temp = temp;
              }
            
              // Если ключ структуры однобайтовый копируем отсортированное в исходный массив
              if (sizeof(m->key)==1)
              {
                TNode *temp = m;
                m = m_temp;
                m_temp = temp;
                memcpy(m, m_temp, n * sizeof(TNode));
              }
            
              free(m_temp);
            }
            //========================================================
            void RSort_step2(TNode *source, TNode *dest, unsigned int n, unsigned int *offset, unsigned char sortable_bit)
            {
              unsigned char *b = (unsigned char*)&source[n].key + sortable_bit;
            
              for (unsigned int i = 0; i < n; ++i)
              {
                  //TNode const& src = source[i]; // <- здесь оперсанд опечатка?
                  TNode const src = source[i];
                  unsigned int off = (src.key >> (sortable_bit * 8)) & 0xFF;
                  dest[offset[off]++] = src;
              }
            }
            //========================================================
            void RSort_Node2(TNode *m, unsigned int n)
            {
              // Выделяем память под временный массив
              TNode *m_temp = (TNode*)malloc(sizeof(TNode) * n);
            
              // Заводим массив корзин
              unsigned int s[sizeof(m->key) * 256] = {0};
              // Заполняем массив корзин для всех разрядов
              for (unsigned int i = 0; i < n; ++i)
              {
                unsigned int key = m[i].key;
                for (unsigned int digit=0; digit< sizeof(m->key); digit++)
                {
                  s[((key >> (digit * 8)) & 0xFF) +256*digit]++;
                }
              }
            
              // Пересчитываем смещения для корзин
              for (unsigned int digit=0; digit< sizeof(m->key); digit++)
              {
                unsigned int off = 0;
                for (unsigned int i = 0; i < 256; i++)
                {
                  unsigned int value = s[i+256*digit];
                  s[i+256*digit] = off;
                  off += value;
                }
              }
            
              // Вызов сортировки по битам от младших к старшим (LSD)
              for (unsigned int digit=0; digit< sizeof(m->key); digit++)
              {
                RSort_step2(m, m_temp, n, &s[256*digit] ,digit);
                TNode *temp = m;
                m = m_temp;
                m_temp = temp;
              }
            
              // Если ключ структуры однобайтовый, копируем отсортированное в исходный массив
              if (sizeof(m->key)==1)
              {
                TNode *temp = m;
                m = m_temp;
                m_temp = temp;
                memcpy(m, m_temp, n * sizeof(TNode));
              }
            
              free(m_temp);
            }
            //========================================================
            int main()
            {
              unsigned int n=10000000;
            
              LARGE_INTEGER frequency;
              LARGE_INTEGER t1, t2, t3, t4;
              double elapsedTime;
            
              TNode *m1 = (TNode*)malloc(sizeof(TNode) * n);
              TNode *m2 = (TNode*)malloc(sizeof(TNode) * n);
              srand(time(NULL));
            
              for (unsigned int i=0; i<n; i++)
              {
                  m1[i].key = rand()*RAND_MAX+rand();
                  m2[i].key = m1[i].key;
              }
            
              QueryPerformanceFrequency(&frequency);
              QueryPerformanceCounter(&t1);
            
              RSort_Node(m1, n);
            
              QueryPerformanceCounter(&t2);
              elapsedTime=(float)(t2.QuadPart-t1.QuadPart)/frequency.QuadPart;
              printf("RSort_Node1:      %.5f seconds\n", elapsedTime);
            
              QueryPerformanceFrequency(&frequency);
              QueryPerformanceCounter(&t3);
            
              RSort_Node2(m2, n);
              //std::sort(m2, m2+n,[](const TNode &a, const TNode &b){return a.key < b.key;});
            
              QueryPerformanceCounter(&t4);
              elapsedTime=(float)(t4.QuadPart-t3.QuadPart)/frequency.QuadPart;
              printf("RSort_Node2:      %.5f seconds\n", elapsedTime);
            
              for (unsigned int i=0; i<n; i++)
              {
                if (m1[i].key!=m2[i].key)
                {
                  printf("\n\n!!!!!\n");
                  break;
                }
              }
            
              free(m1);
              free(m2);
              return 0;
            }
            


            Результат:
            RSort_Node1: 0.19118 seconds
            RSort_Node2: 0.21374 seconds
              0
              Я компилировал в msvc с -O2. Мда, даже не знаю, в чем тут может быть дело. У Вас нет перекоса в производительности железа? Например, медленный процессор/быстрая память?
                0
                На трёх системах с CPU от Intel (core i3-i7) ситуация аналогичная. Проверить на камнях от красно-белых, сейчас нет возможности, а у них насколько иная структура кэш’а.
                А возможно, разница идёт от выбора компилятора.
                Прошу выкладывайте свои результаты, мне бы хотелось получить общую картину.
                  +1
                  Ну так-то да — разные компиляторы, разный результат.
                  Clang, как и GCC, предпочитает вариант «задом наперёд»:
                  RSort_Node1: 0.14152 seconds
                  RSort_Node2: 0.15728 seconds
                  А Паскалевский вариант с прямым направлением есть?
                  Вообще порядок обхода элементов в RSort_step важен? Если нет, то можно в этом месте распараллелить.
                    0
                    Порядок обхода элементов вообще не важен. Можно идти хоть в случайно порядке для каждой из 256 корзин. Правда есть сложность, если параллелить в данном месте, то в зависимости от входных данных нагрузка неравномерно распределиться по ядрам. Для разделения потоков выгоднее использовать параллельный код из статьи.

                    Паскаль тестировал в Lazarus, он базируется так же на GCC, результат аналогичен си’шному.
                      +1
                      Там же цикл не по корзинам, а по массиву данных. Я пробовал менять в Паскале порядок обхода на прямой — начинает сортировать в обратном порядке. Надо что-то править в инициализации корзин?
                      (пардон, в алгоритм не особо вникал — но вы ведь его и не описываете)
                      Паскаль
                          procedure Sort_step(offset: PIntArr; num: Integer; dest, source : PNodeArr; len : Longword);
                              var b : PByte;
                                  i, n : Integer;
                              begin
                      //          b:=@source[len]; // обратный
                                b:=@source[0];  // прямой
                                inc(b,num);
                                For i:=0 to len do begin
                                  n := offset[b^]-1;
                                  dest[n] := PNode(Integer(b)-num)^;
                                  offset[b^] := n;
                      //            Dec(b, SizeOf(TNode)); // обратный
                                  Inc(b, SizeOf(TNode)); // прямой
                                end;
                              end;
                      

                      Посмотрел внимательнее сишный вариант — вы не совсем правильно скопировали код picul, добавили фактически копирование структуры
                      //TNode const& src = source[i]; // < — здесь оперсанд опечатка?
                      TNode const src = source[i];
                      Я сам сишник-любитель и плохо знаю синтаксис ссылок (вот это &), но можно сделать обычный указатель:
                        TNode * src = &source[i];
                        unsigned int off = (src->key >> (sortable_bit * 8)) & 0xFF;
                        dest[offset[off]++] = *src;
                      
                      тогда получаем:
                      RSort_Node1: 0.14094 seconds
                      RSort_Node2: 0.14432 seconds
                      разница уже близка к погрешности измерений
                        0
                        Ваша скрупулезность делает Вам честь. Я обратил внимание на код автора picul, но не стал его сильно править. Через указатель действительно несколько быстрее, но всё же мой вариант его бьёт.

                        Я благодарен проявленному интересу к самому алгоритму, я разложу его соль чуть ниже. Протестируйте данный код, RSort_Node3 – это Ваши правки. Разница в скорости не так велика, но она есть, особенно при нескольких запусках программы, если брать среднее.
                        код
                        #include <stdio.h>
                        #include <stdlib.h>
                        
                        #include <time.h>
                        #include <windows.h>
                        //========================================================
                        typedef struct TNode {
                            unsigned int key;
                            unsigned int value;
                        } TNode;
                        //========================================================
                        void Sort_step(TNode *source, TNode *dest, unsigned int n, unsigned int *offset, unsigned char sortable_bit)
                        {
                          unsigned char *b = (unsigned char*)&source[n].key + sortable_bit;
                          TNode *v = &source[n];
                          while (v >= source)
                          {
                            dest[--offset[*b]] = *v--;
                            b -= sizeof(TNode);
                          }
                        }
                        //========================================================
                        void RSort_Node(TNode *m, unsigned int n)
                        {
                          // Выделяем память под временный массив
                          TNode *m_temp = (TNode*)malloc(sizeof(TNode) * n);
                          // Заводим массив корзин
                          unsigned int s[sizeof(m->key) * 256] = {0};
                          // Заполняем массив корзин для всех разрядов
                          unsigned char *b = (unsigned char*)&m[n-1].key;
                          while (b >= (unsigned char*)&m[0].key)
                          {
                            for (unsigned int digit=0; digit< sizeof(m->key); digit++)
                            {
                              s[*(b+digit)+256*digit]++;
                            }
                            b -= sizeof(TNode);
                          }
                        
                          // Пересчитываем смещения для корзин
                          for (unsigned int i = 1; i < 256; i++)
                          {
                            for (unsigned int digit=0; digit< sizeof(m->key); digit++)
                            {
                              s[i+256*digit] += s[i-1+256*digit];
                            }
                          }
                        
                          // Вызов сортировки по битам от младших к старшим (LSD)
                          for (unsigned int digit=0; digit< sizeof(m->key); digit++)
                          {
                            Sort_step(m, m_temp, n-1, &s[256*digit] ,digit);
                            TNode *temp = m;
                            m = m_temp;
                            m_temp = temp;
                          }
                        
                          // Если ключ структуры однобайтовый копируем отсортированное в исходный массив
                          if (sizeof(m->key)==1)
                          {
                            TNode *temp = m;
                            m = m_temp;
                            m_temp = temp;
                            memcpy(m, m_temp, n * sizeof(TNode));
                          }
                        
                          free(m_temp);
                        }
                        //========================================================
                        void RSort_step2(TNode *source, TNode *dest, unsigned int n, unsigned int *offset, unsigned char sortable_bit)
                        {
                          unsigned char *b = (unsigned char*)&source[n].key + sortable_bit;
                        
                          for (unsigned int i = 0; i < n; ++i)
                          {
                              //TNode const& src = source[i]; // <- здесь оперсанд опечатка?
                              TNode const src = source[i];
                              unsigned int off = (src.key >> (sortable_bit * 8)) & 0xFF;
                              dest[offset[off]++] = src;
                          }
                        }
                        //========================================================
                        void RSort_Node2(TNode *m, unsigned int n)
                        {
                          // Выделяем память под временный массив
                          TNode *m_temp = (TNode*)malloc(sizeof(TNode) * n);
                        
                          // Заводим массив корзин
                          unsigned int s[sizeof(m->key) * 256] = {0};
                          // Заполняем массив корзин для всех разрядов
                          for (unsigned int i = 0; i < n; ++i)
                          {
                            unsigned int key = m[i].key;
                            for (unsigned int digit=0; digit< sizeof(m->key); digit++)
                            {
                              s[((key >> (digit * 8)) & 0xFF) +256*digit]++;
                            }
                          }
                        
                          // Пересчитываем смещения для корзин
                          for (unsigned int digit=0; digit< sizeof(m->key); digit++)
                          {
                            unsigned int off = 0;
                            for (unsigned int i = 0; i < 256; i++)
                            {
                              unsigned int value = s[i+256*digit];
                              s[i+256*digit] = off;
                              off += value;
                            }
                          }
                        
                          // Вызов сортировки по битам от младших к старшим (LSD)
                          for (unsigned int digit=0; digit< sizeof(m->key); digit++)
                          {
                            RSort_step2(m, m_temp, n, &s[256*digit] ,digit);
                            TNode *temp = m;
                            m = m_temp;
                            m_temp = temp;
                          }
                        
                          // Если ключ структуры однобайтовый, копируем отсортированное в исходный массив
                          if (sizeof(m->key)==1)
                          {
                            TNode *temp = m;
                            m = m_temp;
                            m_temp = temp;
                            memcpy(m, m_temp, n * sizeof(TNode));
                          }
                        
                          free(m_temp);
                        }
                        //========================================================
                        void RSort_step3(TNode *source, TNode *dest, unsigned int n, unsigned int *offset, unsigned char sortable_bit)
                        {
                          unsigned char *b = (unsigned char*)&source[n].key + sortable_bit;
                        
                          for (unsigned int i = 0; i < n; ++i)
                          {
                              TNode * src = &source[i];
                              unsigned int off = (src->key >> (sortable_bit * 8)) & 0xFF;
                              dest[offset[off]++] = *src;
                          }
                        }
                        //========================================================
                        void RSort_Node3(TNode *m, unsigned int n)
                        {
                          // Выделяем память под временный массив
                          TNode *m_temp = (TNode*)malloc(sizeof(TNode) * n);
                        
                          // Заводим массив корзин
                          unsigned int s[sizeof(m->key) * 256] = {0};
                          // Заполняем массив корзин для всех разрядов
                          for (unsigned int i = 0; i < n; ++i)
                          {
                            unsigned int key = m[i].key;
                            for (unsigned int digit=0; digit< sizeof(m->key); digit++)
                            {
                              s[((key >> (digit * 8)) & 0xFF) +256*digit]++;
                            }
                          }
                        
                          // Пересчитываем смещения для корзин
                          for (unsigned int digit=0; digit< sizeof(m->key); digit++)
                          {
                            unsigned int off = 0;
                            for (unsigned int i = 0; i < 256; i++)
                            {
                              unsigned int value = s[i+256*digit];
                              s[i+256*digit] = off;
                              off += value;
                            }
                          }
                        
                          // Вызов сортировки по битам от младших к старшим (LSD)
                          for (unsigned int digit=0; digit< sizeof(m->key); digit++)
                          {
                            RSort_step3(m, m_temp, n, &s[256*digit] ,digit);
                            TNode *temp = m;
                            m = m_temp;
                            m_temp = temp;
                          }
                        
                          // Если ключ структуры однобайтовый, копируем отсортированное в исходный массив
                          if (sizeof(m->key)==1)
                          {
                            TNode *temp = m;
                            m = m_temp;
                            m_temp = temp;
                            memcpy(m, m_temp, n * sizeof(TNode));
                          }
                        
                          free(m_temp);
                        }
                        //========================================================
                        int main()
                        {
                          unsigned int n=10000000;
                        
                          LARGE_INTEGER frequency;
                          LARGE_INTEGER t1, t2, t3, t4, t5, t6;
                          double elapsedTime;
                        
                          TNode *m1 = (TNode*)malloc(sizeof(TNode) * n);
                          TNode *m2 = (TNode*)malloc(sizeof(TNode) * n);
                          TNode *m3 = (TNode*)malloc(sizeof(TNode) * n);
                          srand(time(NULL));
                        
                          for (unsigned int i=0; i<n; i++)
                          {
                              m1[i].key = rand()*RAND_MAX+rand();
                              m2[i].key = m1[i].key;
                              m3[i].key = m1[i].key;
                          }
                        
                          QueryPerformanceFrequency(&frequency);
                          QueryPerformanceCounter(&t1);
                        
                          RSort_Node(m1, n);
                        
                          QueryPerformanceCounter(&t2);
                          elapsedTime=(float)(t2.QuadPart-t1.QuadPart)/frequency.QuadPart;
                          printf("RSort_Node1:      %.5f seconds\n", elapsedTime);
                        
                          QueryPerformanceFrequency(&frequency);
                          QueryPerformanceCounter(&t3);
                        
                          RSort_Node2(m2, n);
                          //std::sort(m2, m2+n,[](const TNode &a, const TNode &b){return a.key < b.key;});
                        
                          QueryPerformanceCounter(&t4);
                          elapsedTime=(float)(t4.QuadPart-t3.QuadPart)/frequency.QuadPart;
                          printf("RSort_Node2:      %.5f seconds\n", elapsedTime);
                        
                          QueryPerformanceFrequency(&frequency);
                          QueryPerformanceCounter(&t5);
                        
                          RSort_Node3(m3, n);
                        
                          QueryPerformanceCounter(&t6);
                          elapsedTime=(float)(t6.QuadPart-t5.QuadPart)/frequency.QuadPart;
                          printf("RSort_Node3:      %.5f seconds\n", elapsedTime);
                        
                          for (unsigned int i=0; i<n; i++)
                          {
                            if ((m1[i].key!=m2[i].key)||(m1[i].key!=m3[i].key))
                            {
                              printf("\n\n!!!!!\n");
                              break;
                            }
                          }
                        
                          free(m1);
                          free(m2);
                          free(m3);
                          return 0;
                        }
                        


                        На моём ноутбуке вышло:
                        RSort_Node1: 0.16993 seconds
                        RSort_Node2: 0.18999 seconds
                        RSort_Node3: 0.17411 seconds
                          0
                          У меня примерно так же.
                          Clang 10
                          RSort_Node1: 0.14313
                          RSort_Node2: 0.15709
                          RSort_Node3: 0.14532
                          GCC 9.3
                          RSort_Node1: 0.14448
                          RSort_Node2: 0.15864
                          RSort_Node3: 0.14716

                          если параллелить в данном месте, то в зависимости от входных данных нагрузка неравномерно распределиться по ядрам.
                          Для компенсации неравномерной нагрузки есть режимы с динамическим распределением потоков. В том же OpenMP для параллельного цикла это #pragma omp parallel for schedule(dynamic)
                          Итерации цикла могут занимать разное время, но их должно быть много, значительно больше кол-ва потоков/ядер.
                            0
                            На моём компиляторе gcc version 10.2.0 (Rev5, Built by MSYS2 project) вышли такие усреднённые числа:
                            RSort_Node1: 0.18070 seconds
                            RSort_Node2: 0.17742 seconds
                            RSort_Node3: 0.17786 seconds
                            0
                            Порядок прохода по массиву вообще не важен.
                            Прямой обратный или случайный.
                            Из-за того, что индексы гуляют по всей памяти, нивелируются практически любые уловки.
                            Префетчинг тут не играет, он тут только будет мешать, забивая кэш.
                            Единственно, что мы можем сделать ускорить высоконагруженный цикл.
                            Возможно, мой код выглядит несколько не красиво с точки зрения синтаксиса, зато компилятор чётко понимает (по крайней мере, GCC), как оптимизировать циклы. Покажу на фрагменте.
                            код
                              unsigned char *b = (unsigned char*)&m[n-1].key;
                              while (b >= (unsigned char*)&m[0].key)
                              {
                                for (unsigned int digit=0; digit< sizeof(m->key); digit++)
                                {
                                  s[*(b+digit)+256*digit]++;
                                }
                                b -= sizeof(TNode);
                              }
                            

                            превращается во что то вроде
                            0x4014ba lea 0x0(%esi),%esi

                            0x4014c0 movzbl (%eax),%edx
                            0x4014c3 sub $0x8,%eax
                            0x4014c6 addl $0x1,0x20(%esp,%edx,4)
                            0x4014cb movzbl 0x9(%eax),%edx
                            0x4014cf addl $0x1,0x420(%esp,%edx,4)
                            0x4014d7 movzbl 0xa(%eax),%edx
                            0x4014db addl $0x1,0x820(%esp,%edx,4)
                            0x4014e3 movzbl 0xb(%eax),%edx
                            0x4014e7 addl $0x1,0xc20(%esp,%edx,4)

                            0x4014ef cmp %ebx,%eax
                            0x4014f1 jae 0x4014c0 <RSort_Node(TNode*, unsigned int)+80>

                            Обратите внимание, как изящно компилятор раскрыл внутренний цикл.

                            Ваш фрагмент:
                              for (unsigned int i = 0; i < n; ++i)
                              {
                                unsigned int key = m[i].key;
                                for (unsigned int digit=0; digit< sizeof(m->key); digit++)
                                {
                                  s[((key >> (digit * 8)) & 0xFF) +256*digit]++;
                                }
                              }
                            

                            превращается в
                            0x4018a3 cmp %ecx,%ebx
                            0x4018a5 jne 0x401870 <RSort_Node3(TNode*, unsigned int)+96>
                            0x4018a7 lea 0x420(%esp),%ebx
                            0x4018ae xor %esi,%esi
                            0x4018b0 lea -0x400(%ebx),%eax
                            0x4018b6 xor %edx,%edx
                            0x4018b8 lea 0x0(%esi,%eiz,1),%esi
                            0x4018bf nop
                            0x4018c0 mov (%eax),%ecx
                            0x4018c2 mov %edx,(%eax)
                            0x4018c4 add $0x4,%eax
                            0x4018c7 add %ecx,%edx
                            0x4018c9 cmp %eax,%ebx
                            0x4018cb jne 0x4018c0 <RSort_Node3(TNode*, unsigned int)+176>
                            0x4018cd add $0x100,%esi
                            0x4018d3 add $0x400,%ebx
                            0x4018d9 cmp $0x400,%esi
                            0x4018df jne 0x4018b0 <RSort_Node3(TNode*, unsigned int)+160>

                            Компилятор пытается решить в лоб, создавая больше кода.

                            На циклах в десятки миллионов повторов будет разница.

                              +1
                              На счет кеширования не могу согласиться. Индексы записи может и гуляют по памяти (как при подсчете, так и при шафле), но вот чтение из памяти происходит строго последовательно. И автоматический префетчинг здесь очень сильно поможет, если, конечно, доступ будет последовательный.
                              Хорошо, что Вы запостили компилятора. Я даже не представляю, как Вам удалось заполучить код с неразвернутым внутренним циклом. Вот как оно делается в GCC. Наверняка в этом и кроется секрет странных результатов на Вашей платформе.
                              И важно здесь то, что в этом случае происходит лишь одно чтение из памяти, вместо четырех в Вашем варианте. Это нивелирует отсутствие парочки побитовых операций, хотя да, смотрится такой вариант изящно.
                                0
                                Подскажите, какая из этих двух функций должна работать быстрее?
                                ссылка
                                  0
                                  Первая функция содержит в цикле 11 простых инструкций CPU, а вторая 12.
                                  Исходя из статистики выполнения, представленной тут, и собственных тестов, могу предположить следующее:
                                  Все запуски на AMD проигрывают по первому варианту в скорости, причём сильно. На INTEL выигрывают, тестировал на нескольких моделях (i3 – i7). Разница между подходами только в выравнивании обращений к памяти.
                                  Видимо Интеловцы умудрились как-то оптимизировать не выровненные обращения в кэш.
                                  Другой гипотезы у меня нет.

                                  Прошу сообщество прокомментировать данный посыл.
                                    0
                                    На больших данных вторая функция должна быть быстрее благодаря автоматическому префетчингу.
                                    С другой стороны, вторая функция тоже скомпилировалась не так, как я ожидал — она все равно читает TNode два раза. Возможно, лучшим подходом тут будет проход от 0 до n, но с байтовым указателем.
                                    Ну и на моем Intel'е проход вперед тоже выигрывает, а у Вас компилятор не разворачивает внутренний цикл на стадии подсчета, — от того и медленнее.
                                      0
                                      К сожалению, Вы меня не убедили в том, что ваш цикл подсчёта быстрее.
                                      Мне, наконец, удалось добиться от компилятора выстроить цикл подсчёта по вашей схеме.
                                      И знаете, выигрыша в скорости не обнаружил. Тут видимо, нужно уже переходить от подсчёта времени к подсчёту регистровых команд для точности.
                                      Замечание что Ваш код делает меньше выборок в памяти, принимаю. Однако современные CPU читают к L1 практически с той же скоростью, как и регистры. На этом принципе и базируются стековые машины.
                                      Быстродействие всё упрётся в скорость чтения и расшифровке опкодов (при условии использования только простых инструкций).
                                      Ваш цикл содержит 15 инструкций общей длиной 55 байт, мой 11 инструкций общей длиной 50 байт. Ссылка

                                      Всё же думаю разница, где то в архитектуре.

                                      И ещё ваш цикл не отработает верно если ключ будет 1,2 или 8 байтов
                                        0
                                        На этом принципе и базируются стековые машины.
                                        Которых не существует в природе в аппаратном виде. Потому что чтение даже из лучших кешей занимает время.
                                        Intel
                                        AMD
                                        Быстродействие всё упрётся в скорость чтения и расшифровке опкодов (при условии использования только простых инструкций).
                                        В смысле упрется во фронтенд? С рандомными записями в память? Не может такого быть. Можете взять VTune и убедиться.
                                        И ещё ваш цикл не отработает верно если ключ будет 1,2 или 8 байтов
                                        Ну для этих значений его можно легко подкорректировать, а вот для значений > 8 — да, будет проблема…
                                          0
                                          Благодарю за наводку на VTune, обязательно попробую.
                                          В смысле упрется во фронтенд? С рандомными записями в память? Не может такого быть.
                                          С чего бы при подсчёте записям быть рандомными? Мы последовательно читаем линейный буфер. И пишем в буфер, который чётко влезает в 4 строки кэша L1. И вообще не должен вытесняться.
                                            0
                                            Про рандомные записи — я больше говорил про shuffle-стадию, где запись разбросана практически по всему выходному массиву. Но и при подсчете запись можно считать рандомной, хотя да, там не такой большой разброс.
                        0
                        Протестировал код из Вашего ответа на мои бенчмарки у себя на ноутбуке с i5-8250U и на компьютере с Ryzen 3600 (изначальные цифры тоже были с райзена). Амперсанд вернул — это ссылка из C++, защита от лишнего копирования. Результаты:
                        i5-8250U, 8 Gb 2400 MHz:
                        RSort_Node1: 0.13478 seconds
                        RSort_Node2: 0.11913 seconds
                        R5 3600, 16 Gb 3200 MHz:
                        RSort_Node1: 0.27092 seconds
                        RSort_Node2: 0.22173 seconds

                        Вопрос о железе остается в силе — впечатление такое, что Вы используете какие-то интелы второго поколения вместе с DDR4 4400 MHz памятью.
                          0
                          Железо как раз обычное. Причина всё же в архитектуре процессора или компиляторе, непонятно. Хотелось бы выяснить.

                          А Вы могли бы собрать у себя данный код другим компилятором?
                            0
                            Скачал Codeblocks со встроенным GCC. Выбрал C++ проект, запустил Ваш код, но с амперсандом.
                            i5-8250U, 8 Gb 2400 MHz:
                            RSort_Node1: 0.14564 seconds
                            RSort_Node2: 0.13770 seconds
                            R5 3600, 16 Gb 3200 MHz:
                            RSort_Node1: 0.26932 seconds
                            RSort_Node2: 0.23173 seconds
                              0
                              Благодарю, походе дело всё же в архитектуре.
                              А Вы в Codeblocks собирали с ключами оптимизации?
                              Дело в том, что изначально рассчитывал на оптимизации компилятора. В ветке выше я немного обозначил суть подхода.
                                0
                                Собирал в Release. Какие у Вас модели процессоров?
                                  0
                                  Ryzen 3100@4.2, 3200-CL16, внутри виртуалки, gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0
                                  n@VirtualBox:~/habr-test$ g++ -fno-math-errno -march=native -ffast-math -O3  main.c -o sort
                                  n@VirtualBox:~/habr-test$ ./sort 
                                  The RSort:          0.12691 seconds
                                  The std::sort:      0.71183 seconds
                                  n@VirtualBox:~/habr-test$ g++ main.c -o sort
                                  n@VirtualBox:~/habr-test$ ./sort 
                                  The RSort:          0.38586 seconds
                                  The std::sort:      1.94089 seconds
                                  n@VirtualBox:~/habr-test$ 
                                  
                      +2

                      Странно, мне казалось, что я ещё лет 5-10 назад читал, что это поправили, и идти по памяти назад столь же эффективно.

                  –2
                  Не смотря на развитие аппаратной части современных вычислителей и усовершенствовании алгоритмов, в целом производительность кода не только не выросла, но и местами изрядно деградировала.
                  Судя по тексту вам не удалось эффективно использовать CUDA, ни SMP. Однако так сейчас и добиваются производительности — многоядерность и GPU. На этом, например, построены сегодняшние криптография и игроделание. И вот производители железа предлагают вам новые возможности, а вы «не умеете их готовить». Вы не противоречите себе?
                    0

                    Подскажите пожалуйста, я правильно понимаю что ваша сортировка выигрывает грубо говоря когда количество проходов k (оно же количество битов в элементах) грубо говоря меньше чем log (n)?
                    И если это так, то получается что все элементы массива можно воспринимать как числа от 0 до n (или до 2n при округлении логарифма вверх), а значит линейная сортировка будет прекрасно работать?

                      0
                      Простите, не до конца понял Вашу мысль.
                      Сложность алгоритма, действительно, линейна от длины массива (n). Однако из-за особенности структуры кэша процессора и организации памяти, добавляются значительные накладные расходы. Алгоритм так устроен, что ему нужно постоянно читать различные фрагменты памяти со всего массива, не последовательно, это дорого. Если бы вся память компьютера работала на частоте L1 процессора, алгоритм был бы невероятно эффективен. А так эффект не столь значителен.
                        0

                        Я говорю не о вашем алгоритме, а о предполагаемых данных. Я правильно понимаю что количество бит в них порядка log(n)?

                          0
                          Смотрите, n – длина массива в штуках, k – разрядность от типа ключа в байтах, от длины ни как не зависит. K – для целых чисел 4 байта.
                          Пример: Если очень грубо. Массив из 16777216 элементов целых чисел. (int)
                          Быстрой сортировке нужно сравнить n * Log2 16777216 = 24*n раз.
                          RadixSort понадобится (k*n) ~ (4 * n) раз + (1 * n) проход на подсчёт. Итог 5 к 24.
                          В реальности всё несколько хуже.
                            0

                            Тогда поправьте меня если я не прав, но ваше решение использует байтовое разложение числа а не битовое и именно по этому ваша сложность приблизительно n * log_256 (Max)?

                              0
                              Почему же? По тексту статьи:
                              В современной архитектуре процессоров k практически всегда сводится к числу байт сортируемого числа.
                              Это не означает, что запрещёно делить на иной множитель.
                              Int = 32 бита (4 байта) можно представить как:
                              4 — прохода по 8 бит (оптимально)
                              8 – проходов по 4 бита
                              2 — прохода по 16 бит (но это уже перебор, так как каждый проход сильно дорог)
                              Можно за 1 проход, но понадобится лишние 16 ГБ оперативки, и будет очень медленно.
                              Можно экзотику:
                              5 – проходов по 7 бит
                              6 – проходов по 6 бит
                              или др., но все они не будут оптимальными
                                0

                                Ну я понимаю что базу можно выбрать любую.
                                Просто хотел проверить правильно ли я понимаю что сложность будет такая же ( но скорее всего константа лучше) как и у построения отсортированного дерева только не бинарного а 2^7,8,9… Ичного?

                      +1
                      Если интересно, OpenCL на GF1070 всё-таки быстрее CPU, 0.097 секунды против 0.143.
                      Самый медленный этап это собственно сортировка (sort4), 0.016-0.013 * 4 прохода. Пересылка 0.010-0.013 * 2, туда и обратно.
                      В теории можно ещё ускорить, если увеличить кол-во вызовов, сейчас 1024, а у карты 1920 ядер.
                      Собрал для сравнения пример NVidia OpenCL Radix Sort, почему-то он не работает более чем с 4 млн. элементов — на 4 млн. получается 19 мс против 40 ваших с пересылкой, или 9 против 30 без пересылки.
                        0
                        Количество вызовов не всегда удаётся увеличить. 1024 это минимальное число воркитемов допустимых стандартом для видеокарты.
                        У меня CL_DEVICE_MAX_WORK_GROUP_SIZE: 1024

                        В Вашем примере используется векторизация на GPU это, да должно давать прирост.

                        uint4 rank;
                        int idx = localId*4;
                        rank.x = (preds.x) ? address.x : numtrue[0] + idx - address.x;
                        rank.y = (preds.y) ? address.y : numtrue[0] + idx + 1 - address.y;
                        rank.z = (preds.z) ? address.z : numtrue[0] + idx + 2 - address.z;
                        rank.w = (preds.w) ? address.w : numtrue[0] + idx + 3 - address.w;

                        В любом случае ожидал прироста на порядок, при мощной GPU, а тут всего несколько раз.
                          0
                          Нет, CL_DEVICE_MAX_WORK_GROUP_SIZE это ограничение для размера local work group (у вас не задаётся), а для общего кол-ва вызовов ограничений нет.

                          Векторизация на современных GPU вроде бы не очень актуальна. Помню, что какое-то поколение AMD было на VLIW, там да.
                          Основная фишка примера NVidia — более эффективный доступ к памяти, что-то вручную кэшируется в локальную память, что-то переупорядочивается для более эффективного доступа.
                          Алгоритм в целом неудобен для GPU именно из-за произвольного доступа к памяти и малого кол-ва вычислений. Все эти безумные терафлопсы просто некуда приложить.
                            0
                            Да вы правы, в версии алгоритма из статьи да. Перепутал с другой версией на локальных группах. Однако она у меня показала худшую эффективность. В той версии всё упёрлось в то, что рабочие группы из разных серий идут не последовательно, а как OpenCL захочет. Синхронизация локальных групп хоть и дешевле, но всё равно проиграла проходу по глобальной памяти.
                              0
                              Значит, в версии из статьи ничто не мешает увеличить кол-во вызовов? Я пробовал replace по всему вашему исходнику 1024 на 2048 или другое значение, получалось медленнее. Или так делать неправильно?

                              Ещё на будущее — инициализация OpenCL у вас сильно упрощённая, у меня установлены драйвера OpenCL для CPU, и GPU оказывается не в первой платформе, пришлось править.
                              Код
                              Const
                                MaxPlatforms = 4;
                              var
                                 platform_id : array [0..MaxPlatforms-1] of CL_platform_id;
                              begin
                                 // Получаем доступные платформы
                                 ret := clGetPlatformIDs(MaxPlatforms, @platform_id, @ret_num_platforms);
                                 if CL_SUCCESS<>ret then writeln('clGetPlatformIDs          Error ',ret);
                                 // Получаем доступные устройства
                                 For i:=0 to ret_num_platforms-1 do begin
                                   ret := clGetDeviceIDs(platform_id[i], CL_DEVICE_TYPE_GPU, 1, @device_id, @ret_num_devices);
                                   If ret = 0 then Break;
                                 end;
                                 if CL_SUCCESS<>ret then writeln('clGetDeviceIDs            Error ',ret);
                              
                              И это ещё не учитывает системы с несколькими GPU (напр. ноутбуки с гибридной графикой).
                                0
                                Значит, в версии из статьи ничто не мешает увеличить кол-во вызовов? Я пробовал replace по всему вашему исходнику 1024 на 2048 или другое значение
                                Вы можете устанавливать любое значение кратное входному массиву. Или после небольшой доработки и иные значения. Но как я вывел экспериментальным путём 1024 оптимальное значение. Думаю это связано с пропускной способностью памяти видеокарты. Меньше используется не вся пропускная способность линий. Больше дополнительные ядра выстраиваются в очередь на доступ к памяти, что даёт накладные расходы при синхронизации большого числа ядер.

                                Раз уж Вы заинтересовались GPU’шной версией алгоритма предлагаю вам взглянуть на следующий код. Он немного сыроват, но для теста может быть интересен.
                                код
                                uses Cl, SysUtils, Windows;
                                //------------------------------------------------------------------------------
                                function OCL_Get_Prog(context: cl_context; pdevice_id: Pcl_device_id; name: PChar; S:AnsiString):cl_kernel;
                                var   Tex:PCHAR;
                                      Len:QWORD;
                                      PLen:PQWORD;
                                      Prog:cl_program;
                                      kernel:cl_kernel;
                                      Ret:cl_int;
                                begin
                                   Tex:=@S[1];
                                   Len:=Length(S);
                                   PLen:=@LEN;
                                   prog:=nil;
                                   kernel:=nil;
                                
                                     // создать бинарник из кода программы
                                     prog:= clCreateProgramWithSource(context, 1, @Tex, @Len, ret);
                                     if CL_SUCCESS<>ret then writeln('clCreateProgramWithSource Error ',ret);
                                     // скомпилировать программу
                                     ret:= clBuildProgram(prog, 1, pdevice_id, nil, nil, nil);
                                     if CL_SUCCESS<>ret then writeln('clBuildProgram            Error ',ret);
                                     // создать объект ядра для исполнения программы
                                     kernel:= clCreateKernel(prog, name, ret);
                                     if  ret<>CL_SUCCESS then writeln('clCreateKernel            Error ',ret);
                                     // удаляем программу
                                     clReleaseProgram(prog);
                                     OCL_Get_Prog:=kernel;
                                end;
                                //------------------------------------------------------------------------------
                                var
                                   context:cl_context;
                                   kernel1, kernel2, kernel3 :cl_kernel;
                                   Ret:cl_int;
                                
                                   valueSize : QWord =0;
                                   value : Pointer = nil;
                                   s0 : PChar;
                                
                                   platform_id:cl_platform_id;
                                   ret_num_platforms:cl_uint;
                                   ret_num_devices:cl_uint;
                                   device_id:cl_device_id;
                                   command_queue:cl_command_queue;
                                   S1,S2,S3 : AnsiString;
                                   memobj1, memobj2, memobj3 :cl_mem;
                                   mem:Array of LongWord;
                                   size:LongWord;
                                   g_works, l_works :LongInt;
                                
                                
                                   iCounterPerSec: TLargeInteger;
                                   T1, T2, T3, T4: TLargeInteger; //значение счётчика ДО и ПОСЛЕ операции
                                   i,j,step:LongWord;
                                
                                procedure exchange_memobj(var a,b:cl_mem);
                                var c:cl_mem;
                                begin
                                  c:=a;
                                  a:=b;
                                  b:=c;
                                end;
                                
                                begin
                                   //---------------------------------------------------------------------------
                                   S1 :=
                                   // Ядро сортировки 1 (Подсчёт индексов по блокам)
                                   '__kernel void sort1(__global uint* m1, __global uint* m2, const uint digit, const uint len) {'+
                                   ' __private ushort p_m[256]={0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,'+
                                                               '0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,'+
                                                               '0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,'+
                                                               '0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,'+
                                                               '0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,'+
                                                               '0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,'+
                                                               '0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,'+
                                                               '0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0};'+
                                
                                   ' uint g_id = get_global_id(0);'+
                                   ' for (uint i = 0; i < len; i++) {'+
                                   '  uchar key = (m1[g_id*len+i] >> digit);'+          // Находим ключ
                                   '  p_m[key]++;'+
                                   ' }'+
                                
                                   ' for (uint i = 0; i < 256; i++) {'+
                                   '  m2[g_id*256+i] = p_m[i];'+
                                   ' }'+
                                   '}';
                                   //---------------------------------------------------------------------------
                                   S2 :=
                                
                                   // Ядро сортировки 2 (Расчёт смещения)
                                   '__kernel void sort2(__global uint* m1) {'+
                                   ' __local uint l_m[256];'+
                                   ' uint l_id = get_local_id(0);'+
                                
                                   ' for (uint i = 1; i < 1024; i++) {'+
                                   '  m1[i*256+l_id] += m1[(i-1)*256+l_id];'+
                                   ' }'+
                                   ' l_m[l_id] = m1[1023*256+l_id];'+
                                
                                   ' barrier(CLK_LOCAL_MEM_FENCE);'+
                                   ' for (uint i = 0; i < 8; i++) {'+
                                   '  uint offset = 1 << i;'+
                                   '  uint add = (l_id>=offset) ? l_m[l_id - offset] : 0;'+
                                   '  barrier(CLK_LOCAL_MEM_FENCE);'+
                                   '  l_m[l_id] += add;'+
                                   '  barrier(CLK_LOCAL_MEM_FENCE);'+
                                   ' }'+
                                
                                   ' if (l_id>0) {'+
                                   '  for (uint i = 0; i < 1024; i++) {'+
                                   '   m1[i*256+l_id] += l_m[l_id-1];'+
                                   '  }'+
                                   ' }'+
                                   '}';
                                
                                   //---------------------------------------------------------------------------
                                   S3 :=
                                   // Ядро сортировки 3
                                   '__kernel void sort3(__global uint* m1, __global uint* m2, __global uint* m3, const uint digit, const uint len) {'+
                                   ' uint g_id = get_global_id(0);'+
                                   //' for (uint i = 0; i < len; i++) {'+
                                   ' for (int i = len-1; i >= 0; i--) {'+           // обратить внимание цикл обратный!
                                   '  uchar key = (m1[g_id*len+i] >> digit);'+
                                   '  m2[--m3[g_id*256+key]] = m1[g_id*len+i];'+
                                   ' }'+
                                   '}';
                                   //---------------------------------------------------------------------------
                                
                                   // Получаем доступные платформы
                                   ret := clGetPlatformIDs(1,@platform_id,@ret_num_platforms);
                                   if CL_SUCCESS<>ret then writeln('clGetPlatformIDs          Error ',ret);
                                   // Получаем доступные устройства
                                   //ret:=clGetDeviceIDs(platform_id, CL_DEVICE_TYPE_DEFAULT, 1, @device_id, @ret_num_devices);
                                   ret := clGetDeviceIDs(platform_id, CL_DEVICE_TYPE_GPU, 1, @device_id, @ret_num_devices);
                                   if CL_SUCCESS<>ret then writeln('clGetDeviceIDs            Error ',ret);
                                
                                
                                
                                   clGetDeviceInfo(device_id, CL_DEVICE_NAME, 0, nil, valueSize);
                                   GetMem(s0, valueSize);
                                   clGetDeviceInfo(device_id, CL_DEVICE_NAME, valueSize, s0, valueSize);
                                   Writeln('DEVICE_NAME:  '+s0);
                                   FreeMem(s0);
                                
                                
                                   //clGetDeviceInfo(device_id, CL_DEVICE_MAX_WORK_GROUP_SIZE, sizeof(valueSize), @i, valueSize);
                                   //Writeln('CL_DEVICE_MAX_WORK_GROUP_SIZE: '+inttostr(i));
                                
                                   // Создаём контекст для устройства
                                   context:= clCreateContext(nil, 1, @device_id, nil, nil, ret);
                                   if CL_SUCCESS<>ret then writeln('clCreateContext           Error ',ret);
                                
                                   // Создаём очередь команд для контекста и устройства
                                   command_queue := clCreateCommandQueue(context, device_id, 0, ret);
                                   if CL_SUCCESS<>ret then writeln('clCreateContext           Error ',ret);
                                
                                   //-------------------------------------------------------------
                                   kernel1 := OCL_Get_Prog(context, @device_id, 'sort1', S1);
                                   kernel2 := OCL_Get_Prog(context, @device_id, 'sort2', S2);
                                   kernel3 := OCL_Get_Prog(context, @device_id, 'sort3', S3);
                                   //-------------------------------------------------------------
                                
                                   size:=256*256*16*10;
                                
                                
                                   g_works := size;
                                   l_works := 256;
                                
                                   Randomize;
                                   SetLength(mem, size);
                                
                                   for i:=0 to size-1 do mem[i]:=random(256*256*256*256);
                                
                                   // Создаём буферы для ввода вывода данных
                                   memobj1 := clCreateBuffer(context, CL_MEM_READ_WRITE, size * sizeof(cl_uint), nil, ret);
                                   if  ret<>CL_SUCCESS then writeln('clCreateBuffer1            Error ',ret);
                                   memobj2 := clCreateBuffer(context, CL_MEM_READ_WRITE, size * sizeof(cl_uint), nil, ret);
                                   if  ret<>CL_SUCCESS then writeln('clCreateBuffer2            Error ',ret);
                                   memobj3 := clCreateBuffer(context, CL_MEM_READ_WRITE, 256*1024 * sizeof(cl_uint), nil, ret);
                                   if  ret<>CL_SUCCESS then writeln('clCreateBuffer3            Error ',ret);
                                
                                   QueryPerformanceFrequency(iCounterPerSec);//определили частоту счётчика
                                   QueryPerformanceCounter(T3); //засекли время начала операции
                                
                                   QueryPerformanceFrequency(iCounterPerSec);//определили частоту счётчика
                                   QueryPerformanceCounter(T1); //засекли время начала операции
                                
                                   // Записываем данные в буфер
                                   ret := clEnqueueWriteBuffer(command_queue, memobj1, CL_TRUE, 0, size * sizeof(cl_int), @mem[0], 0, nil, nil);
                                   if  ret<>CL_SUCCESS then writeln('clEnqueueWriteBuffer      Error ',ret);
                                
                                   QueryPerformanceCounter(T2);//засекли время окончания
                                   Writeln('write: '+FormatFloat('0.0000', (T2 - T1)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
                                
                                
                                
                                   for step:=0 to 3 do
                                   begin
                                   //-------------------------------------------------------------
                                   QueryPerformanceFrequency(iCounterPerSec);//определили частоту счётчика
                                   QueryPerformanceCounter(T1); //засекли время начала операции
                                   //-------------------------------------------------------------
                                   // 1 ШАГ
                                   ret:= clSetKernelArg(kernel1, 0, sizeof(cl_mem), @memobj1 );
                                   if  ret<>CL_SUCCESS then writeln('clSetKernelArg_1_1            Error ',ret);
                                   ret:= clSetKernelArg(kernel1, 1, sizeof(cl_mem), @memobj3 );
                                   if  ret<>CL_SUCCESS then writeln('clSetKernelArg_1_2            Error ',ret);
                                
                                   j := step*8;
                                   ret:= clSetKernelArg(kernel1, 2, sizeof(j), @j );
                                   if  ret<>CL_SUCCESS then writeln('clSetKernelArg_1_3            Error ',ret);
                                
                                   j := size div (1024);
                                   ret:= clSetKernelArg(kernel1, 3, sizeof(j), @j );
                                   if  ret<>CL_SUCCESS then writeln('clSetKernelArg_1_4            Error ',ret);
                                
                                   i := (1024);
                                   ret:= clEnqueueNDRangeKernel(command_queue, kernel1, 1, nil, @i, nil, 0, nil, nil);
                                   if  ret<>CL_SUCCESS then writeln('clEnqueueNDRangeKernel_1    Error ',ret);
                                   //-------------------------------------------------------------
                                   clFinish(command_queue);                // Ожидаем завершения всех команд в очереди
                                   QueryPerformanceCounter(T2);//засекли время окончания
                                   Write('Run'+Inttostr(step+1)+':   '+FormatFloat('0.0000', (T2 - T1)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
                                   QueryPerformanceFrequency(iCounterPerSec);//определили частоту счётчика
                                   QueryPerformanceCounter(T1); //засекли время начала операции
                                   //-------------------------------------------------------------
                                   // 2 ШАГ
                                   ret:= clSetKernelArg(kernel2, 0, sizeof(cl_mem), @memobj3 );
                                   if  ret<>CL_SUCCESS then writeln('clSetKernelArg_2_1            Error ',ret);
                                
                                   j := 256;
                                   ret:= clEnqueueNDRangeKernel(command_queue, kernel2, 1, nil, @j, @j, 0, nil, nil);
                                   if  ret<>CL_SUCCESS then writeln('clEnqueueNDRangeKernel_2    Error ',ret);
                                   //-------------------------------------------------------------
                                   clFinish(command_queue);                // Ожидаем завершения всех команд в очереди
                                   QueryPerformanceCounter(T2);//засекли время окончания
                                   Write('  '+FormatFloat('0.0000', (T2 - T1)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
                                   QueryPerformanceFrequency(iCounterPerSec);//определили частоту счётчика
                                   QueryPerformanceCounter(T1); //засекли время начала операции
                                   //-------------------------------------------------------------
                                   // 3 ШАГ
                                   ret:= clSetKernelArg(kernel3, 0, sizeof(cl_mem), @memobj1 );
                                   if  ret<>CL_SUCCESS then writeln('clSetKernelArg_3_1            Error ',ret);
                                   ret:= clSetKernelArg(kernel3, 1, sizeof(cl_mem), @memobj2 );
                                   if  ret<>CL_SUCCESS then writeln('clSetKernelArg_3_2            Error ',ret);
                                   ret:= clSetKernelArg(kernel3, 2, sizeof(cl_mem), @memobj3 );
                                   if  ret<>CL_SUCCESS then writeln('clSetKernelArg_3_3            Error ',ret);
                                
                                   j:=step*8;
                                   ret:= clSetKernelArg(kernel3, 3, sizeof(j), @j );
                                   if  ret<>CL_SUCCESS then writeln('clSetKernelArg_3_4            Error ',ret);
                                
                                   j := size div (1024);
                                   ret:= clSetKernelArg(kernel3, 4, sizeof(j), @j );
                                   if  ret<>CL_SUCCESS then writeln('clSetKernelArg_3_5            Error ',ret);
                                
                                   i := (1024);  // глобально ядер
                                   ret:= clEnqueueNDRangeKernel(command_queue, kernel3, 1, nil, @i, nil, 0, nil, nil);
                                   if  ret<>CL_SUCCESS then writeln('clEnqueueNDRangeKernel_3    Error ',ret);
                                   clFinish(command_queue);
                                   // Меняем местами указатели на массивы после каждого шага
                                   // Результат останется в memobj1
                                   exchange_memobj(memobj2, memobj1);
                                   //-------------------------------------------------------------
                                   clFinish(command_queue);                // Ожидаем завершения всех команд в очереди
                                   QueryPerformanceCounter(T2);//засекли время окончания
                                   Writeln('  '+FormatFloat('0.0000', (T2 - T1)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
                                   //-------------------------------------------------------------
                                   end;
                                
                                   QueryPerformanceFrequency(iCounterPerSec);//определили частоту счётчика
                                   QueryPerformanceCounter(T1); //засекли время начала операции
                                
                                   // Считываем данные из буфера
                                   ret:= clEnqueueReadBuffer(command_queue, memobj1, CL_TRUE, 0, size * sizeof(cl_int), @mem[0], 0, nil, nil);
                                   if  ret<>CL_SUCCESS then writeln('clEnqueueReadBuffer       Error ',ret);
                                
                                   QueryPerformanceCounter(T2);//засекли время окончания
                                   Writeln('Read:  '+FormatFloat('0.0000', (T2 - T1)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
                                
                                   QueryPerformanceCounter(T4);//засекли время окончания
                                   Writeln('ALL:   '+FormatFloat('0.0000', (T4 - T3)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
                                
                                    // Освобождаем ресурсы
                                   clReleaseMemObject(memobj1);           // Высвобождаем память буфера
                                   clReleaseMemObject(memobj2);           // Высвобождаем память буфера
                                   clReleaseMemObject(memobj3);           // Высвобождаем память буфера
                                   clReleaseKernel(kernel1);              // Высвобождаем объект исполнения
                                   clReleaseKernel(kernel2);              // Высвобождаем объект исполнения
                                   clReleaseKernel(kernel3);              // Высвобождаем объект исполнения
                                   clReleaseCommandQueue(command_queue);  // Высвобождаем очередь
                                   clReleaseContext(context);             // Высвобождаем контекст устройства
                                
                                   //-------------------------------------------------------------
                                   SetLength(mem, 0);
                                   readln;
                                
                                end.

                                Здесь можно увеличивать число глобальных ядер и это даёт некоторый эффект

                      Only users with full accounts can post comments. Log in, please.