Правильно ли я понимаю: Решение сводится к созданию системы линейных уравнений и их решению, но с ограничением типа (char)?
Насколько производительно такое решение возможно ли за разумное время посчитать задачу коммивояжера, допустим, на маршруте 16-20 пунктов.
Значит, в версии из статьи ничто не мешает увеличить кол-во вызовов? Я пробовал replace по всему вашему исходнику 1024 на 2048 или другое значение
Вы можете устанавливать любое значение кратное входному массиву. Или после небольшой доработки и иные значения. Но как я вывел экспериментальным путём 1024 оптимальное значение. Думаю это связано с пропускной способностью памяти видеокарты. Меньше используется не вся пропускная способность линий. Больше дополнительные ядра выстраиваются в очередь на доступ к памяти, что даёт накладные расходы при синхронизации большого числа ядер.
Раз уж Вы заинтересовались GPU’шной версией алгоритма предлагаю вам взглянуть на следующий код. Он немного сыроват, но для теста может быть интересен.
код
uses Cl, SysUtils, Windows;
//------------------------------------------------------------------------------
function OCL_Get_Prog(context: cl_context; pdevice_id: Pcl_device_id; name: PChar; S:AnsiString):cl_kernel;
var Tex:PCHAR;
Len:QWORD;
PLen:PQWORD;
Prog:cl_program;
kernel:cl_kernel;
Ret:cl_int;
begin
Tex:=@S[1];
Len:=Length(S);
PLen:=@LEN;
prog:=nil;
kernel:=nil;
// создать бинарник из кода программы
prog:= clCreateProgramWithSource(context, 1, @Tex, @Len, ret);
if CL_SUCCESS<>ret then writeln('clCreateProgramWithSource Error ',ret);
// скомпилировать программу
ret:= clBuildProgram(prog, 1, pdevice_id, nil, nil, nil);
if CL_SUCCESS<>ret then writeln('clBuildProgram Error ',ret);
// создать объект ядра для исполнения программы
kernel:= clCreateKernel(prog, name, ret);
if ret<>CL_SUCCESS then writeln('clCreateKernel Error ',ret);
// удаляем программу
clReleaseProgram(prog);
OCL_Get_Prog:=kernel;
end;
//------------------------------------------------------------------------------
var
context:cl_context;
kernel1, kernel2, kernel3 :cl_kernel;
Ret:cl_int;
valueSize : QWord =0;
value : Pointer = nil;
s0 : PChar;
platform_id:cl_platform_id;
ret_num_platforms:cl_uint;
ret_num_devices:cl_uint;
device_id:cl_device_id;
command_queue:cl_command_queue;
S1,S2,S3 : AnsiString;
memobj1, memobj2, memobj3 :cl_mem;
mem:Array of LongWord;
size:LongWord;
g_works, l_works :LongInt;
iCounterPerSec: TLargeInteger;
T1, T2, T3, T4: TLargeInteger; //значение счётчика ДО и ПОСЛЕ операции
i,j,step:LongWord;
procedure exchange_memobj(var a,b:cl_mem);
var c:cl_mem;
begin
c:=a;
a:=b;
b:=c;
end;
begin
//---------------------------------------------------------------------------
S1 :=
// Ядро сортировки 1 (Подсчёт индексов по блокам)
'__kernel void sort1(__global uint* m1, __global uint* m2, const uint digit, const uint len) {'+
' __private ushort p_m[256]={0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,'+
'0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,'+
'0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,'+
'0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,'+
'0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,'+
'0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,'+
'0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,'+
'0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0};'+
' uint g_id = get_global_id(0);'+
' for (uint i = 0; i < len; i++) {'+
' uchar key = (m1[g_id*len+i] >> digit);'+ // Находим ключ
' p_m[key]++;'+
' }'+
' for (uint i = 0; i < 256; i++) {'+
' m2[g_id*256+i] = p_m[i];'+
' }'+
'}';
//---------------------------------------------------------------------------
S2 :=
// Ядро сортировки 2 (Расчёт смещения)
'__kernel void sort2(__global uint* m1) {'+
' __local uint l_m[256];'+
' uint l_id = get_local_id(0);'+
' for (uint i = 1; i < 1024; i++) {'+
' m1[i*256+l_id] += m1[(i-1)*256+l_id];'+
' }'+
' l_m[l_id] = m1[1023*256+l_id];'+
' barrier(CLK_LOCAL_MEM_FENCE);'+
' for (uint i = 0; i < 8; i++) {'+
' uint offset = 1 << i;'+
' uint add = (l_id>=offset) ? l_m[l_id - offset] : 0;'+
' barrier(CLK_LOCAL_MEM_FENCE);'+
' l_m[l_id] += add;'+
' barrier(CLK_LOCAL_MEM_FENCE);'+
' }'+
' if (l_id>0) {'+
' for (uint i = 0; i < 1024; i++) {'+
' m1[i*256+l_id] += l_m[l_id-1];'+
' }'+
' }'+
'}';
//---------------------------------------------------------------------------
S3 :=
// Ядро сортировки 3
'__kernel void sort3(__global uint* m1, __global uint* m2, __global uint* m3, const uint digit, const uint len) {'+
' uint g_id = get_global_id(0);'+
//' for (uint i = 0; i < len; i++) {'+
' for (int i = len-1; i >= 0; i--) {'+ // обратить внимание цикл обратный!
' uchar key = (m1[g_id*len+i] >> digit);'+
' m2[--m3[g_id*256+key]] = m1[g_id*len+i];'+
' }'+
'}';
//---------------------------------------------------------------------------
// Получаем доступные платформы
ret := clGetPlatformIDs(1,@platform_id,@ret_num_platforms);
if CL_SUCCESS<>ret then writeln('clGetPlatformIDs Error ',ret);
// Получаем доступные устройства
//ret:=clGetDeviceIDs(platform_id, CL_DEVICE_TYPE_DEFAULT, 1, @device_id, @ret_num_devices);
ret := clGetDeviceIDs(platform_id, CL_DEVICE_TYPE_GPU, 1, @device_id, @ret_num_devices);
if CL_SUCCESS<>ret then writeln('clGetDeviceIDs Error ',ret);
clGetDeviceInfo(device_id, CL_DEVICE_NAME, 0, nil, valueSize);
GetMem(s0, valueSize);
clGetDeviceInfo(device_id, CL_DEVICE_NAME, valueSize, s0, valueSize);
Writeln('DEVICE_NAME: '+s0);
FreeMem(s0);
//clGetDeviceInfo(device_id, CL_DEVICE_MAX_WORK_GROUP_SIZE, sizeof(valueSize), @i, valueSize);
//Writeln('CL_DEVICE_MAX_WORK_GROUP_SIZE: '+inttostr(i));
// Создаём контекст для устройства
context:= clCreateContext(nil, 1, @device_id, nil, nil, ret);
if CL_SUCCESS<>ret then writeln('clCreateContext Error ',ret);
// Создаём очередь команд для контекста и устройства
command_queue := clCreateCommandQueue(context, device_id, 0, ret);
if CL_SUCCESS<>ret then writeln('clCreateContext Error ',ret);
//-------------------------------------------------------------
kernel1 := OCL_Get_Prog(context, @device_id, 'sort1', S1);
kernel2 := OCL_Get_Prog(context, @device_id, 'sort2', S2);
kernel3 := OCL_Get_Prog(context, @device_id, 'sort3', S3);
//-------------------------------------------------------------
size:=256*256*16*10;
g_works := size;
l_works := 256;
Randomize;
SetLength(mem, size);
for i:=0 to size-1 do mem[i]:=random(256*256*256*256);
// Создаём буферы для ввода вывода данных
memobj1 := clCreateBuffer(context, CL_MEM_READ_WRITE, size * sizeof(cl_uint), nil, ret);
if ret<>CL_SUCCESS then writeln('clCreateBuffer1 Error ',ret);
memobj2 := clCreateBuffer(context, CL_MEM_READ_WRITE, size * sizeof(cl_uint), nil, ret);
if ret<>CL_SUCCESS then writeln('clCreateBuffer2 Error ',ret);
memobj3 := clCreateBuffer(context, CL_MEM_READ_WRITE, 256*1024 * sizeof(cl_uint), nil, ret);
if ret<>CL_SUCCESS then writeln('clCreateBuffer3 Error ',ret);
QueryPerformanceFrequency(iCounterPerSec);//определили частоту счётчика
QueryPerformanceCounter(T3); //засекли время начала операции
QueryPerformanceFrequency(iCounterPerSec);//определили частоту счётчика
QueryPerformanceCounter(T1); //засекли время начала операции
// Записываем данные в буфер
ret := clEnqueueWriteBuffer(command_queue, memobj1, CL_TRUE, 0, size * sizeof(cl_int), @mem[0], 0, nil, nil);
if ret<>CL_SUCCESS then writeln('clEnqueueWriteBuffer Error ',ret);
QueryPerformanceCounter(T2);//засекли время окончания
Writeln('write: '+FormatFloat('0.0000', (T2 - T1)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
for step:=0 to 3 do
begin
//-------------------------------------------------------------
QueryPerformanceFrequency(iCounterPerSec);//определили частоту счётчика
QueryPerformanceCounter(T1); //засекли время начала операции
//-------------------------------------------------------------
// 1 ШАГ
ret:= clSetKernelArg(kernel1, 0, sizeof(cl_mem), @memobj1 );
if ret<>CL_SUCCESS then writeln('clSetKernelArg_1_1 Error ',ret);
ret:= clSetKernelArg(kernel1, 1, sizeof(cl_mem), @memobj3 );
if ret<>CL_SUCCESS then writeln('clSetKernelArg_1_2 Error ',ret);
j := step*8;
ret:= clSetKernelArg(kernel1, 2, sizeof(j), @j );
if ret<>CL_SUCCESS then writeln('clSetKernelArg_1_3 Error ',ret);
j := size div (1024);
ret:= clSetKernelArg(kernel1, 3, sizeof(j), @j );
if ret<>CL_SUCCESS then writeln('clSetKernelArg_1_4 Error ',ret);
i := (1024);
ret:= clEnqueueNDRangeKernel(command_queue, kernel1, 1, nil, @i, nil, 0, nil, nil);
if ret<>CL_SUCCESS then writeln('clEnqueueNDRangeKernel_1 Error ',ret);
//-------------------------------------------------------------
clFinish(command_queue); // Ожидаем завершения всех команд в очереди
QueryPerformanceCounter(T2);//засекли время окончания
Write('Run'+Inttostr(step+1)+': '+FormatFloat('0.0000', (T2 - T1)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
QueryPerformanceFrequency(iCounterPerSec);//определили частоту счётчика
QueryPerformanceCounter(T1); //засекли время начала операции
//-------------------------------------------------------------
// 2 ШАГ
ret:= clSetKernelArg(kernel2, 0, sizeof(cl_mem), @memobj3 );
if ret<>CL_SUCCESS then writeln('clSetKernelArg_2_1 Error ',ret);
j := 256;
ret:= clEnqueueNDRangeKernel(command_queue, kernel2, 1, nil, @j, @j, 0, nil, nil);
if ret<>CL_SUCCESS then writeln('clEnqueueNDRangeKernel_2 Error ',ret);
//-------------------------------------------------------------
clFinish(command_queue); // Ожидаем завершения всех команд в очереди
QueryPerformanceCounter(T2);//засекли время окончания
Write(' '+FormatFloat('0.0000', (T2 - T1)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
QueryPerformanceFrequency(iCounterPerSec);//определили частоту счётчика
QueryPerformanceCounter(T1); //засекли время начала операции
//-------------------------------------------------------------
// 3 ШАГ
ret:= clSetKernelArg(kernel3, 0, sizeof(cl_mem), @memobj1 );
if ret<>CL_SUCCESS then writeln('clSetKernelArg_3_1 Error ',ret);
ret:= clSetKernelArg(kernel3, 1, sizeof(cl_mem), @memobj2 );
if ret<>CL_SUCCESS then writeln('clSetKernelArg_3_2 Error ',ret);
ret:= clSetKernelArg(kernel3, 2, sizeof(cl_mem), @memobj3 );
if ret<>CL_SUCCESS then writeln('clSetKernelArg_3_3 Error ',ret);
j:=step*8;
ret:= clSetKernelArg(kernel3, 3, sizeof(j), @j );
if ret<>CL_SUCCESS then writeln('clSetKernelArg_3_4 Error ',ret);
j := size div (1024);
ret:= clSetKernelArg(kernel3, 4, sizeof(j), @j );
if ret<>CL_SUCCESS then writeln('clSetKernelArg_3_5 Error ',ret);
i := (1024); // глобально ядер
ret:= clEnqueueNDRangeKernel(command_queue, kernel3, 1, nil, @i, nil, 0, nil, nil);
if ret<>CL_SUCCESS then writeln('clEnqueueNDRangeKernel_3 Error ',ret);
clFinish(command_queue);
// Меняем местами указатели на массивы после каждого шага
// Результат останется в memobj1
exchange_memobj(memobj2, memobj1);
//-------------------------------------------------------------
clFinish(command_queue); // Ожидаем завершения всех команд в очереди
QueryPerformanceCounter(T2);//засекли время окончания
Writeln(' '+FormatFloat('0.0000', (T2 - T1)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
//-------------------------------------------------------------
end;
QueryPerformanceFrequency(iCounterPerSec);//определили частоту счётчика
QueryPerformanceCounter(T1); //засекли время начала операции
// Считываем данные из буфера
ret:= clEnqueueReadBuffer(command_queue, memobj1, CL_TRUE, 0, size * sizeof(cl_int), @mem[0], 0, nil, nil);
if ret<>CL_SUCCESS then writeln('clEnqueueReadBuffer Error ',ret);
QueryPerformanceCounter(T2);//засекли время окончания
Writeln('Read: '+FormatFloat('0.0000', (T2 - T1)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
QueryPerformanceCounter(T4);//засекли время окончания
Writeln('ALL: '+FormatFloat('0.0000', (T4 - T3)/iCounterPerSec) + ' second.');//вывели количество секунд на выполнение операции
// Освобождаем ресурсы
clReleaseMemObject(memobj1); // Высвобождаем память буфера
clReleaseMemObject(memobj2); // Высвобождаем память буфера
clReleaseMemObject(memobj3); // Высвобождаем память буфера
clReleaseKernel(kernel1); // Высвобождаем объект исполнения
clReleaseKernel(kernel2); // Высвобождаем объект исполнения
clReleaseKernel(kernel3); // Высвобождаем объект исполнения
clReleaseCommandQueue(command_queue); // Высвобождаем очередь
clReleaseContext(context); // Высвобождаем контекст устройства
//-------------------------------------------------------------
SetLength(mem, 0);
readln;
end.
Здесь можно увеличивать число глобальных ядер и это даёт некоторый эффект
Да вы правы, в версии алгоритма из статьи да. Перепутал с другой версией на локальных группах. Однако она у меня показала худшую эффективность. В той версии всё упёрлось в то, что рабочие группы из разных серий идут не последовательно, а как OpenCL захочет. Синхронизация локальных групп хоть и дешевле, но всё равно проиграла проходу по глобальной памяти.
KSP сижком попсовая. Я на Муну и обратно на 6 тонной ракете в скафандре летал. В реальности это совсем сказка.
Я бы посоветовал childrenofadeadearth чёткая игра.
Количество вызовов не всегда удаётся увеличить. 1024 это минимальное число воркитемов допустимых стандартом для видеокарты.
У меня CL_DEVICE_MAX_WORK_GROUP_SIZE: 1024
В Вашем примере используется векторизация на GPU это, да должно давать прирост.
Благодарю за наводку на VTune, обязательно попробую.
В смысле упрется во фронтенд? С рандомными записями в память? Не может такого быть.
С чего бы при подсчёте записям быть рандомными? Мы последовательно читаем линейный буфер. И пишем в буфер, который чётко влезает в 4 строки кэша L1. И вообще не должен вытесняться.
К сожалению, Вы меня не убедили в том, что ваш цикл подсчёта быстрее.
Мне, наконец, удалось добиться от компилятора выстроить цикл подсчёта по вашей схеме.
И знаете, выигрыша в скорости не обнаружил. Тут видимо, нужно уже переходить от подсчёта времени к подсчёту регистровых команд для точности.
Замечание что Ваш код делает меньше выборок в памяти, принимаю. Однако современные CPU читают к L1 практически с той же скоростью, как и регистры. На этом принципе и базируются стековые машины.
Быстродействие всё упрётся в скорость чтения и расшифровке опкодов (при условии использования только простых инструкций).
Ваш цикл содержит 15 инструкций общей длиной 55 байт, мой 11 инструкций общей длиной 50 байт. Ссылка
Всё же думаю разница, где то в архитектуре.
И ещё ваш цикл не отработает верно если ключ будет 1,2 или 8 байтов
Первая функция содержит в цикле 11 простых инструкций CPU, а вторая 12.
Исходя из статистики выполнения, представленной тут, и собственных тестов, могу предположить следующее:
Все запуски на AMD проигрывают по первому варианту в скорости, причём сильно. На INTEL выигрывают, тестировал на нескольких моделях (i3 – i7). Разница между подходами только в выравнивании обращений к памяти.
Видимо Интеловцы умудрились как-то оптимизировать не выровненные обращения в кэш.
Другой гипотезы у меня нет.
Благодарю, походе дело всё же в архитектуре.
А Вы в Codeblocks собирали с ключами оптимизации?
Дело в том, что изначально рассчитывал на оптимизации компилятора. В ветке выше я немного обозначил суть подхода.
Порядок прохода по массиву вообще не важен.
Прямой обратный или случайный.
Из-за того, что индексы гуляют по всей памяти, нивелируются практически любые уловки.
Префетчинг тут не играет, он тут только будет мешать, забивая кэш.
Единственно, что мы можем сделать ускорить высоконагруженный цикл.
Возможно, мой код выглядит несколько не красиво с точки зрения синтаксиса, зато компилятор чётко понимает (по крайней мере, GCC), как оптимизировать циклы. Покажу на фрагменте.
код
unsigned char *b = (unsigned char*)&m[n-1].key;
while (b >= (unsigned char*)&m[0].key)
{
for (unsigned int digit=0; digit< sizeof(m->key); digit++)
{
s[*(b+digit)+256*digit]++;
}
b -= sizeof(TNode);
}
превращается во что то вроде 0x4014ba lea 0x0(%esi),%esi
Обратите внимание, как изящно компилятор раскрыл внутренний цикл.
Ваш фрагмент:
for (unsigned int i = 0; i < n; ++i)
{
unsigned int key = m[i].key;
for (unsigned int digit=0; digit< sizeof(m->key); digit++)
{
s[((key >> (digit * 8)) & 0xFF) +256*digit]++;
}
}
В современной архитектуре процессоров k практически всегда сводится к числу байт сортируемого числа.
Это не означает, что запрещёно делить на иной множитель.
Int = 32 бита (4 байта) можно представить как:
4 — прохода по 8 бит (оптимально)
8 – проходов по 4 бита
2 — прохода по 16 бит (но это уже перебор, так как каждый проход сильно дорог)
Можно за 1 проход, но понадобится лишние 16 ГБ оперативки, и будет очень медленно.
Можно экзотику:
5 – проходов по 7 бит
6 – проходов по 6 бит
или др., но все они не будут оптимальными
Смотрите, n – длина массива в штуках, k – разрядность от типа ключа в байтах, от длины ни как не зависит. K – для целых чисел 4 байта.
Пример: Если очень грубо. Массив из 16777216 элементов целых чисел. (int)
Быстрой сортировке нужно сравнить n * Log2 16777216 = 24*n раз.
RadixSort понадобится (k*n) ~ (4 * n) раз + (1 * n) проход на подсчёт. Итог 5 к 24.
В реальности всё несколько хуже.
Простите, не до конца понял Вашу мысль.
Сложность алгоритма, действительно, линейна от длины массива (n). Однако из-за особенности структуры кэша процессора и организации памяти, добавляются значительные накладные расходы. Алгоритм так устроен, что ему нужно постоянно читать различные фрагменты памяти со всего массива, не последовательно, это дорого. Если бы вся память компьютера работала на частоте L1 процессора, алгоритм был бы невероятно эффективен. А так эффект не столь значителен.
Ваша скрупулезность делает Вам честь. Я обратил внимание на код автора picul, но не стал его сильно править. Через указатель действительно несколько быстрее, но всё же мой вариант его бьёт.
Я благодарен проявленному интересу к самому алгоритму, я разложу его соль чуть ниже. Протестируйте данный код, RSort_Node3 – это Ваши правки. Разница в скорости не так велика, но она есть, особенно при нескольких запусках программы, если брать среднее.
код
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <windows.h>
//========================================================
typedef struct TNode {
unsigned int key;
unsigned int value;
} TNode;
//========================================================
void Sort_step(TNode *source, TNode *dest, unsigned int n, unsigned int *offset, unsigned char sortable_bit)
{
unsigned char *b = (unsigned char*)&source[n].key + sortable_bit;
TNode *v = &source[n];
while (v >= source)
{
dest[--offset[*b]] = *v--;
b -= sizeof(TNode);
}
}
//========================================================
void RSort_Node(TNode *m, unsigned int n)
{
// Выделяем память под временный массив
TNode *m_temp = (TNode*)malloc(sizeof(TNode) * n);
// Заводим массив корзин
unsigned int s[sizeof(m->key) * 256] = {0};
// Заполняем массив корзин для всех разрядов
unsigned char *b = (unsigned char*)&m[n-1].key;
while (b >= (unsigned char*)&m[0].key)
{
for (unsigned int digit=0; digit< sizeof(m->key); digit++)
{
s[*(b+digit)+256*digit]++;
}
b -= sizeof(TNode);
}
// Пересчитываем смещения для корзин
for (unsigned int i = 1; i < 256; i++)
{
for (unsigned int digit=0; digit< sizeof(m->key); digit++)
{
s[i+256*digit] += s[i-1+256*digit];
}
}
// Вызов сортировки по битам от младших к старшим (LSD)
for (unsigned int digit=0; digit< sizeof(m->key); digit++)
{
Sort_step(m, m_temp, n-1, &s[256*digit] ,digit);
TNode *temp = m;
m = m_temp;
m_temp = temp;
}
// Если ключ структуры однобайтовый копируем отсортированное в исходный массив
if (sizeof(m->key)==1)
{
TNode *temp = m;
m = m_temp;
m_temp = temp;
memcpy(m, m_temp, n * sizeof(TNode));
}
free(m_temp);
}
//========================================================
void RSort_step2(TNode *source, TNode *dest, unsigned int n, unsigned int *offset, unsigned char sortable_bit)
{
unsigned char *b = (unsigned char*)&source[n].key + sortable_bit;
for (unsigned int i = 0; i < n; ++i)
{
//TNode const& src = source[i]; // <- здесь оперсанд опечатка?
TNode const src = source[i];
unsigned int off = (src.key >> (sortable_bit * 8)) & 0xFF;
dest[offset[off]++] = src;
}
}
//========================================================
void RSort_Node2(TNode *m, unsigned int n)
{
// Выделяем память под временный массив
TNode *m_temp = (TNode*)malloc(sizeof(TNode) * n);
// Заводим массив корзин
unsigned int s[sizeof(m->key) * 256] = {0};
// Заполняем массив корзин для всех разрядов
for (unsigned int i = 0; i < n; ++i)
{
unsigned int key = m[i].key;
for (unsigned int digit=0; digit< sizeof(m->key); digit++)
{
s[((key >> (digit * 8)) & 0xFF) +256*digit]++;
}
}
// Пересчитываем смещения для корзин
for (unsigned int digit=0; digit< sizeof(m->key); digit++)
{
unsigned int off = 0;
for (unsigned int i = 0; i < 256; i++)
{
unsigned int value = s[i+256*digit];
s[i+256*digit] = off;
off += value;
}
}
// Вызов сортировки по битам от младших к старшим (LSD)
for (unsigned int digit=0; digit< sizeof(m->key); digit++)
{
RSort_step2(m, m_temp, n, &s[256*digit] ,digit);
TNode *temp = m;
m = m_temp;
m_temp = temp;
}
// Если ключ структуры однобайтовый, копируем отсортированное в исходный массив
if (sizeof(m->key)==1)
{
TNode *temp = m;
m = m_temp;
m_temp = temp;
memcpy(m, m_temp, n * sizeof(TNode));
}
free(m_temp);
}
//========================================================
void RSort_step3(TNode *source, TNode *dest, unsigned int n, unsigned int *offset, unsigned char sortable_bit)
{
unsigned char *b = (unsigned char*)&source[n].key + sortable_bit;
for (unsigned int i = 0; i < n; ++i)
{
TNode * src = &source[i];
unsigned int off = (src->key >> (sortable_bit * 8)) & 0xFF;
dest[offset[off]++] = *src;
}
}
//========================================================
void RSort_Node3(TNode *m, unsigned int n)
{
// Выделяем память под временный массив
TNode *m_temp = (TNode*)malloc(sizeof(TNode) * n);
// Заводим массив корзин
unsigned int s[sizeof(m->key) * 256] = {0};
// Заполняем массив корзин для всех разрядов
for (unsigned int i = 0; i < n; ++i)
{
unsigned int key = m[i].key;
for (unsigned int digit=0; digit< sizeof(m->key); digit++)
{
s[((key >> (digit * 8)) & 0xFF) +256*digit]++;
}
}
// Пересчитываем смещения для корзин
for (unsigned int digit=0; digit< sizeof(m->key); digit++)
{
unsigned int off = 0;
for (unsigned int i = 0; i < 256; i++)
{
unsigned int value = s[i+256*digit];
s[i+256*digit] = off;
off += value;
}
}
// Вызов сортировки по битам от младших к старшим (LSD)
for (unsigned int digit=0; digit< sizeof(m->key); digit++)
{
RSort_step3(m, m_temp, n, &s[256*digit] ,digit);
TNode *temp = m;
m = m_temp;
m_temp = temp;
}
// Если ключ структуры однобайтовый, копируем отсортированное в исходный массив
if (sizeof(m->key)==1)
{
TNode *temp = m;
m = m_temp;
m_temp = temp;
memcpy(m, m_temp, n * sizeof(TNode));
}
free(m_temp);
}
//========================================================
int main()
{
unsigned int n=10000000;
LARGE_INTEGER frequency;
LARGE_INTEGER t1, t2, t3, t4, t5, t6;
double elapsedTime;
TNode *m1 = (TNode*)malloc(sizeof(TNode) * n);
TNode *m2 = (TNode*)malloc(sizeof(TNode) * n);
TNode *m3 = (TNode*)malloc(sizeof(TNode) * n);
srand(time(NULL));
for (unsigned int i=0; i<n; i++)
{
m1[i].key = rand()*RAND_MAX+rand();
m2[i].key = m1[i].key;
m3[i].key = m1[i].key;
}
QueryPerformanceFrequency(&frequency);
QueryPerformanceCounter(&t1);
RSort_Node(m1, n);
QueryPerformanceCounter(&t2);
elapsedTime=(float)(t2.QuadPart-t1.QuadPart)/frequency.QuadPart;
printf("RSort_Node1: %.5f seconds\n", elapsedTime);
QueryPerformanceFrequency(&frequency);
QueryPerformanceCounter(&t3);
RSort_Node2(m2, n);
//std::sort(m2, m2+n,[](const TNode &a, const TNode &b){return a.key < b.key;});
QueryPerformanceCounter(&t4);
elapsedTime=(float)(t4.QuadPart-t3.QuadPart)/frequency.QuadPart;
printf("RSort_Node2: %.5f seconds\n", elapsedTime);
QueryPerformanceFrequency(&frequency);
QueryPerformanceCounter(&t5);
RSort_Node3(m3, n);
QueryPerformanceCounter(&t6);
elapsedTime=(float)(t6.QuadPart-t5.QuadPart)/frequency.QuadPart;
printf("RSort_Node3: %.5f seconds\n", elapsedTime);
for (unsigned int i=0; i<n; i++)
{
if ((m1[i].key!=m2[i].key)||(m1[i].key!=m3[i].key))
{
printf("\n\n!!!!!\n");
break;
}
}
free(m1);
free(m2);
free(m3);
return 0;
}
Порядок обхода элементов вообще не важен. Можно идти хоть в случайно порядке для каждой из 256 корзин. Правда есть сложность, если параллелить в данном месте, то в зависимости от входных данных нагрузка неравномерно распределиться по ядрам. Для разделения потоков выгоднее использовать параллельный код из статьи.
Паскаль тестировал в Lazarus, он базируется так же на GCC, результат аналогичен си’шному.
На трёх системах с CPU от Intel (core i3-i7) ситуация аналогичная. Проверить на камнях от красно-белых, сейчас нет возможности, а у них насколько иная структура кэш’а.
А возможно, разница идёт от выбора компилятора.
Прошу выкладывайте свои результаты, мне бы хотелось получить общую картину.
Искренне благодарю Вас за интерес. Однако, хотел уточнить, с какими ключами Вы компилировали свой код?
У меня: GCC WIN 10 -o3 получаются противоположные результаты.
код
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <windows.h>
//========================================================
typedef struct TNode {
unsigned int key;
unsigned int value;
} TNode;
//========================================================
void Sort_step(TNode *source, TNode *dest, unsigned int n, unsigned int *offset, unsigned char sortable_bit)
{
unsigned char *b = (unsigned char*)&source[n].key + sortable_bit;
TNode *v = &source[n];
while (v >= source)
{
dest[--offset[*b]] = *v--;
b -= sizeof(TNode);
}
}
//========================================================
void RSort_Node(TNode *m, unsigned int n)
{
// Выделяем память под временный массив
TNode *m_temp = (TNode*)malloc(sizeof(TNode) * n);
// Заводим массив корзин
unsigned int s[sizeof(m->key) * 256] = {0};
// Заполняем массив корзин для всех разрядов
unsigned char *b = (unsigned char*)&m[n-1].key;
while (b >= (unsigned char*)&m[0].key)
{
for (unsigned int digit=0; digit< sizeof(m->key); digit++)
{
s[*(b+digit)+256*digit]++;
}
b -= sizeof(TNode);
}
// Пересчитываем смещения для корзин
for (unsigned int i = 1; i < 256; i++)
{
for (unsigned int digit=0; digit< sizeof(m->key); digit++)
{
s[i+256*digit] += s[i-1+256*digit];
}
}
// Вызов сортировки по битам от младших к старшим (LSD)
for (unsigned int digit=0; digit< sizeof(m->key); digit++)
{
Sort_step(m, m_temp, n-1, &s[256*digit] ,digit);
TNode *temp = m;
m = m_temp;
m_temp = temp;
}
// Если ключ структуры однобайтовый копируем отсортированное в исходный массив
if (sizeof(m->key)==1)
{
TNode *temp = m;
m = m_temp;
m_temp = temp;
memcpy(m, m_temp, n * sizeof(TNode));
}
free(m_temp);
}
//========================================================
void RSort_step2(TNode *source, TNode *dest, unsigned int n, unsigned int *offset, unsigned char sortable_bit)
{
unsigned char *b = (unsigned char*)&source[n].key + sortable_bit;
for (unsigned int i = 0; i < n; ++i)
{
//TNode const& src = source[i]; // <- здесь оперсанд опечатка?
TNode const src = source[i];
unsigned int off = (src.key >> (sortable_bit * 8)) & 0xFF;
dest[offset[off]++] = src;
}
}
//========================================================
void RSort_Node2(TNode *m, unsigned int n)
{
// Выделяем память под временный массив
TNode *m_temp = (TNode*)malloc(sizeof(TNode) * n);
// Заводим массив корзин
unsigned int s[sizeof(m->key) * 256] = {0};
// Заполняем массив корзин для всех разрядов
for (unsigned int i = 0; i < n; ++i)
{
unsigned int key = m[i].key;
for (unsigned int digit=0; digit< sizeof(m->key); digit++)
{
s[((key >> (digit * 8)) & 0xFF) +256*digit]++;
}
}
// Пересчитываем смещения для корзин
for (unsigned int digit=0; digit< sizeof(m->key); digit++)
{
unsigned int off = 0;
for (unsigned int i = 0; i < 256; i++)
{
unsigned int value = s[i+256*digit];
s[i+256*digit] = off;
off += value;
}
}
// Вызов сортировки по битам от младших к старшим (LSD)
for (unsigned int digit=0; digit< sizeof(m->key); digit++)
{
RSort_step2(m, m_temp, n, &s[256*digit] ,digit);
TNode *temp = m;
m = m_temp;
m_temp = temp;
}
// Если ключ структуры однобайтовый, копируем отсортированное в исходный массив
if (sizeof(m->key)==1)
{
TNode *temp = m;
m = m_temp;
m_temp = temp;
memcpy(m, m_temp, n * sizeof(TNode));
}
free(m_temp);
}
//========================================================
int main()
{
unsigned int n=10000000;
LARGE_INTEGER frequency;
LARGE_INTEGER t1, t2, t3, t4;
double elapsedTime;
TNode *m1 = (TNode*)malloc(sizeof(TNode) * n);
TNode *m2 = (TNode*)malloc(sizeof(TNode) * n);
srand(time(NULL));
for (unsigned int i=0; i<n; i++)
{
m1[i].key = rand()*RAND_MAX+rand();
m2[i].key = m1[i].key;
}
QueryPerformanceFrequency(&frequency);
QueryPerformanceCounter(&t1);
RSort_Node(m1, n);
QueryPerformanceCounter(&t2);
elapsedTime=(float)(t2.QuadPart-t1.QuadPart)/frequency.QuadPart;
printf("RSort_Node1: %.5f seconds\n", elapsedTime);
QueryPerformanceFrequency(&frequency);
QueryPerformanceCounter(&t3);
RSort_Node2(m2, n);
//std::sort(m2, m2+n,[](const TNode &a, const TNode &b){return a.key < b.key;});
QueryPerformanceCounter(&t4);
elapsedTime=(float)(t4.QuadPart-t3.QuadPart)/frequency.QuadPart;
printf("RSort_Node2: %.5f seconds\n", elapsedTime);
for (unsigned int i=0; i<n; i++)
{
if (m1[i].key!=m2[i].key)
{
printf("\n\n!!!!!\n");
break;
}
}
free(m1);
free(m2);
return 0;
}
Для максимального ускорения процедуры расчёта смещения. Мы меняем инструкции на более быстрый пересчёт индексов. Чтоб скомпенсировать данный эффект, проще инвертировать основной цикл прохода. На производительность самого цикла ни как не влияет. Выборка данных из памяти в обоих направления идёт одинаково.
Данные по скорости хорошие, но не хочу стать заложником ошибки выжившего, выдав свои локальные данные за факты.
Насколько производительно такое решение возможно ли за разумное время посчитать задачу коммивояжера, допустим, на маршруте 16-20 пунктов.
Раз уж Вы заинтересовались GPU’шной версией алгоритма предлагаю вам взглянуть на следующий код. Он немного сыроват, но для теста может быть интересен.
Здесь можно увеличивать число глобальных ядер и это даёт некоторый эффект
Я бы посоветовал childrenofadeadearth чёткая игра.
У меня CL_DEVICE_MAX_WORK_GROUP_SIZE: 1024
В Вашем примере используется векторизация на GPU это, да должно давать прирост.
uint4 rank;
int idx = localId*4;
rank.x = (preds.x) ? address.x : numtrue[0] + idx - address.x;
rank.y = (preds.y) ? address.y : numtrue[0] + idx + 1 - address.y;
rank.z = (preds.z) ? address.z : numtrue[0] + idx + 2 - address.z;
rank.w = (preds.w) ? address.w : numtrue[0] + idx + 3 - address.w;
В любом случае ожидал прироста на порядок, при мощной GPU, а тут всего несколько раз.
С чего бы при подсчёте записям быть рандомными? Мы последовательно читаем линейный буфер. И пишем в буфер, который чётко влезает в 4 строки кэша L1. И вообще не должен вытесняться.
Мне, наконец, удалось добиться от компилятора выстроить цикл подсчёта по вашей схеме.
И знаете, выигрыша в скорости не обнаружил. Тут видимо, нужно уже переходить от подсчёта времени к подсчёту регистровых команд для точности.
Замечание что Ваш код делает меньше выборок в памяти, принимаю. Однако современные CPU читают к L1 практически с той же скоростью, как и регистры. На этом принципе и базируются стековые машины.
Быстродействие всё упрётся в скорость чтения и расшифровке опкодов (при условии использования только простых инструкций).
Ваш цикл содержит 15 инструкций общей длиной 55 байт, мой 11 инструкций общей длиной 50 байт. Ссылка
Всё же думаю разница, где то в архитектуре.
И ещё ваш цикл не отработает верно если ключ будет 1,2 или 8 байтов
Исходя из статистики выполнения, представленной тут, и собственных тестов, могу предположить следующее:
Все запуски на AMD проигрывают по первому варианту в скорости, причём сильно. На INTEL выигрывают, тестировал на нескольких моделях (i3 – i7). Разница между подходами только в выравнивании обращений к памяти.
Видимо Интеловцы умудрились как-то оптимизировать не выровненные обращения в кэш.
Другой гипотезы у меня нет.
Прошу сообщество прокомментировать данный посыл.
ссылка
А Вы в Codeblocks собирали с ключами оптимизации?
Дело в том, что изначально рассчитывал на оптимизации компилятора. В ветке выше я немного обозначил суть подхода.
Прямой обратный или случайный.
Из-за того, что индексы гуляют по всей памяти, нивелируются практически любые уловки.
Префетчинг тут не играет, он тут только будет мешать, забивая кэш.
Единственно, что мы можем сделать ускорить высоконагруженный цикл.
Возможно, мой код выглядит несколько не красиво с точки зрения синтаксиса, зато компилятор чётко понимает (по крайней мере, GCC), как оптимизировать циклы. Покажу на фрагменте.
превращается во что то вроде
0x4014ba lea 0x0(%esi),%esi
0x4014c0 movzbl (%eax),%edx
0x4014c3 sub $0x8,%eax
0x4014c6 addl $0x1,0x20(%esp,%edx,4)
0x4014cb movzbl 0x9(%eax),%edx
0x4014cf addl $0x1,0x420(%esp,%edx,4)
0x4014d7 movzbl 0xa(%eax),%edx
0x4014db addl $0x1,0x820(%esp,%edx,4)
0x4014e3 movzbl 0xb(%eax),%edx
0x4014e7 addl $0x1,0xc20(%esp,%edx,4)
0x4014ef cmp %ebx,%eax
0x4014f1 jae 0x4014c0 <RSort_Node(TNode*, unsigned int)+80>
Обратите внимание, как изящно компилятор раскрыл внутренний цикл.
Ваш фрагмент:
превращается в
0x4018a3 cmp %ecx,%ebx
0x4018a5 jne 0x401870 <RSort_Node3(TNode*, unsigned int)+96>
0x4018a7 lea 0x420(%esp),%ebx
0x4018ae xor %esi,%esi
0x4018b0 lea -0x400(%ebx),%eax
0x4018b6 xor %edx,%edx
0x4018b8 lea 0x0(%esi,%eiz,1),%esi
0x4018bf nop
0x4018c0 mov (%eax),%ecx
0x4018c2 mov %edx,(%eax)
0x4018c4 add $0x4,%eax
0x4018c7 add %ecx,%edx
0x4018c9 cmp %eax,%ebx
0x4018cb jne 0x4018c0 <RSort_Node3(TNode*, unsigned int)+176>
0x4018cd add $0x100,%esi
0x4018d3 add $0x400,%ebx
0x4018d9 cmp $0x400,%esi
0x4018df jne 0x4018b0 <RSort_Node3(TNode*, unsigned int)+160>
Компилятор пытается решить в лоб, создавая больше кода.
На циклах в десятки миллионов повторов будет разница.
А Вы могли бы собрать у себя данный код другим компилятором?
Это не означает, что запрещёно делить на иной множитель.
Int = 32 бита (4 байта) можно представить как:
4 — прохода по 8 бит (оптимально)
8 – проходов по 4 бита
2 — прохода по 16 бит (но это уже перебор, так как каждый проход сильно дорог)
Можно за 1 проход, но понадобится лишние 16 ГБ оперативки, и будет очень медленно.
Можно экзотику:
5 – проходов по 7 бит
6 – проходов по 6 бит
или др., но все они не будут оптимальными
Пример: Если очень грубо. Массив из 16777216 элементов целых чисел. (int)
Быстрой сортировке нужно сравнить n * Log2 16777216 = 24*n раз.
RadixSort понадобится (k*n) ~ (4 * n) раз + (1 * n) проход на подсчёт. Итог 5 к 24.
В реальности всё несколько хуже.
Сложность алгоритма, действительно, линейна от длины массива (n). Однако из-за особенности структуры кэша процессора и организации памяти, добавляются значительные накладные расходы. Алгоритм так устроен, что ему нужно постоянно читать различные фрагменты памяти со всего массива, не последовательно, это дорого. Если бы вся память компьютера работала на частоте L1 процессора, алгоритм был бы невероятно эффективен. А так эффект не столь значителен.
Я благодарен проявленному интересу к самому алгоритму, я разложу его соль чуть ниже. Протестируйте данный код, RSort_Node3 – это Ваши правки. Разница в скорости не так велика, но она есть, особенно при нескольких запусках программы, если брать среднее.
На моём ноутбуке вышло:
RSort_Node1: 0.16993 seconds
RSort_Node2: 0.18999 seconds
RSort_Node3: 0.17411 seconds
Паскаль тестировал в Lazarus, он базируется так же на GCC, результат аналогичен си’шному.
А возможно, разница идёт от выбора компилятора.
Прошу выкладывайте свои результаты, мне бы хотелось получить общую картину.
У меня: GCC WIN 10 -o3 получаются противоположные результаты.
Результат:
RSort_Node1: 0.19118 seconds
RSort_Node2: 0.21374 seconds
Данные по скорости хорошие, но не хочу стать заложником ошибки выжившего, выдав свои локальные данные за факты.