Коммивояжер на GPU
— Скажите, пожалуйста, куда мне отсюда идти?
— А куда ты хочешь попасть? — ответил Кот.
— Мне все равно… — сказала Алиса.
— Тогда все равно, куда и идти, — заметил Кот.Льюис Кэрролл, Алиса в стране чудес
Мы уже решали задачу коммивояжёра точно методом динамического программирования. С тех пор прошло немало времени. Мне бы хотелось поделиться некоторыми соображениями по улучшению алгоритма, а также представить алгоритм пригодный для расчёта задачи коммивояжера на GPU.
Динамическое программирование — это метод решения сложных задач путём разбиения их на более мелкие подзадачи, решение которых легче и проще.
Основная идея метода заключается в том, чтобы не решать одну и ту же подзадачу многократно, а сохранять результаты решения подзадач и повторно использовать их для ускорения общего процесса решения.
В основе метода динамического программирования лежит принцип оптимальности впервые сформулированный в 1953г. американским математиком Р.Э.Беллманом: каково бы ни было состояние системы в результате какого-либо числа шагов, на ближайшем шаге нужно выбирать управление так, чтобы оно в совокупности с оптимальным управлением на всех последующих шагах приводило к оптимальному выигрышу на всех оставшихся шагах, включая выигрыш на данном шаге.
I.
Для объяснения процесса решения возьмём малую задачу на пять городов. Задаётся задача матрицей смежности c. Так как программистам удобно использовать массивы (списки) начинающиеся с нуля, предлагаю так же первый город обозначать нулём. Обозначим вершины номерами {0,1,2,3,4}. А так как мы хотим получить циклический путь, то нам всё равно с какой вершины начинать, пусть это будет последняя – 4.
Проиллюстрируем классический алгоритм сверху вниз на матрице единиц.
Обозначим: B(src; dest) – функцию Беллмана, где src – пункт выезда, а dest – множество ещё не посещённых городов.
А c[4, 0], c[4, 1], c[4, 2], c[4, 3]. – стоимости переездов из города 4 в города 0, 1, 2, 3 взятые из матрицы c.
В свою очередь значения функции Беллмана для решения первого уравнения вычисляются следующим образом:
На третьем этапе мы вычислим функции Беллмана участвующие в расчётах второго этапа.
На последнем этапе вычислим оставшиеся функции. Так как мы обошли все возможные посещённые города, нам остаётся проложить путь в вершину, с которой мы начинали.
II.
Что мне не нравится в классическом алгоритме, так это представление функции Беллмана.
Во-первых, это функция о двух аргументах, во-вторых, один из них это множество. В Python множества представляются хоть и удобным, но совершенно безобразным механизмом, совершенно варварски обходящимся с памятью. При больших входных матрицах, на множестве городов количество таких функции вырастает колоссально, как и объём используемой памяти.
Что бы сохранять вычисленные значения функции Беллмана их нужно сохранять в памяти и обращаться по индексу. Желательно делать это экономно. Самый очевидный подход, использовать словарь, что потребовало бы создать монструозную структуру вроде такой: Словарь кортежей из источника и неизменного множества от множества, значением словаря является длина найденного маршрута.
В своей версии алгоритма предлагаю немного изменить вид функции Беллмана. Использовать только один аргумент, который будет являться множеством и одновременно ссылкой на всю группу функций в области памяти. Для этого мы объединим источник с множеством ещё не посещённых городов, что и будет составлять локальную группу функций. В виде множества мы будем использовать натуральное число, двоичные разряды которого будет выступать флагами элементов множества.
Например, множество {1, 3, 5, 6, 7} можно представить как 0b11101010 в бинарном виде. А в десятичной форме это будет число 234, которое означает индекс хранения в списке групп.
Так мы сможем решать задачу снизу вверх, по слоям. При таком подходе можно не использовать рекурсию для вычисления функции Беллмана, а так же не хранить индексы на оптимальный маршрут.
В конечном итоге мы получим список из 2^(n-1) групп словарей, от 0 до n-1 элементов в каждом.
Для матрицы единиц в качестве исходного массива, он будет выглядеть так:
Такой подход даёт быстрое прямое обращение к нужной ячейке без громоздких структур.
Заполнять такой массив очень легко. На первом шаге алгоритма мы заполняем все множества состоящие из одного элемента. На втором из двух и так далее. У нас будет ровно n-1 шагов алгоритма.
Каждый шаг алгоритма состоит из действий, количество которых равно числу сочетаний из (n-1) по (k-1). Где n – размер входной матрицы, k – номер шага алгоритма.
Для нахождения сочетаний можно использовать функциюitertools.combinations()
. Однако данная функция оказалась крайне медленной, поэтому был разработан свой вариант генератора, который производит те же действия, но более быстро и в компактном виде.
def combination(n: int, k: int, start: int = 0, value: int = 0) -> int:
"""Возвращает сочетания в двоичной форме"""
for i in range(start, n - k + 1):
number = 1 << i | value
if k > 1:
yield from combination(n, k - 1, i + 1, number)
else:
yield number
for i in combination(6, 4): # Число сочетаний из 6 по 4
print(f'{i:06b}')
001111
010111
100111
011011
101011
110011
011101
101101
110101
111001
011110
101110
110110
111010
111100
Кроме функции нахождения сочетаний мы ещё должны иметь возможность быстро обходить множества по битам, выставленным в единицу.
def significative_bits(k: int) -> int:
"""Возвращает номера не нулевых бит числа"""
i = 0
while k:
if k & 1:
yield i
k >>= 1
i += 1
print([i for i in significative_bits(0b111001)])
[0, 3, 4, 5]
Сперва заполняем начальный набор для множеств состоящих только из одного элемента. А затем в цикле заполняем всю расчётную матрицу по слоям, снизу в верх. Это самая длительная часть алгоритма.
И у нас всё готово для построения матрицы оптимального пути. Это очень быстрая процедура относительно создания расчётной матрицы. Найдя финальный минимальный результат мы можем вычитать значение из матрицы источника из оптимума и сравнивать с вычисленной матрицей на каждом шаге. Если значение разницы будет равно на шаге значению расчётной матрицы, то мы нашли искомую вершину, их может быть несколько (если существует несколько равнозначных минимальных маршрутов), мы вправе взять любую.
Полный код программы:
import random as rnd
def combination(n: int, k: int, start: int = 0, value: int = 0) -> int:
"""Возвращает сочетания в двоичной форме"""
for i in range(start, n - k + 1):
number = 1 << i | value
if k > 1:
yield from combination(n, k - 1, i + 1, number)
else:
yield number
def significative_bits(k: int) -> int:
"""Возвращает номера не нулевых бит числа"""
i = 0
while k:
if k & 1:
yield i
k >>= 1
i += 1
def tsp_dynamic(source: list[list]) -> tuple[int, list]:
"""Решает задачу коммивояжёра методом динамического програмирования"""
n = len(source) - 1
# Резервируем память под массив словарей
array = [dict() for k in range(2**n)]
# Заполнение первого набора
for i in range(n):
array[1 << i][i] = source[i][n]
# Заполнение остальных наборов
for i in range(2, n + 1):
for index in combination(n, i):
for j in significative_bits(index):
arr_key = array[1 << j ^ index]
array[index][j] = min(arr_key[k] + source[j][k] for k in arr_key)
# Подсчёт минимальной суммы
min_val = min(array[(1 << n) - 1][k] + source[n][k] for k in range(n))
# Сохранение оптимального пути
res = [n]
temp_val = min_val
index = (1 << n) - 1
for i in range(n):
for j in significative_bits(index):
if array[index][j] + source[res[-1]][j] == temp_val:
break
res.append(j)
temp_val = array[index][j]
index ^= 1 << j
res.append(n)
del array
return min_val, res
n = 18
# rnd.seed(3)
source = [[rnd.randint(10, 99) for i in range(n)] for j in range(n)]
print('n =', n)
print(source)
min_val, res = tsp_dynamic(source)
print(min_val, res)
В отличие от алгоритма, описанного прошлой работе, в данном алгоритме мы экономим в четыре раза больше памяти! Мы не храним матрицу индексов оптимального пути, а так же более плотно сохраняем расчётную матрицу.
Если не использовать словари, а сразу перейти на матрицу numpy, то она будет выглядеть следующим образом для матрицы единиц:
Можно заметить, что половина элементов левой матрицы остаются не задействованными. Однако если соединить половины матрицы поэлементно, исключая нулевые значения, то получится очень плотная компактная матрица.
Итоговый вариант использует ровно (n-1)*2^(n-2) ячеек памяти. Кроме того, каждая из ячеек линейно независима от других операций записи на каждом из шагов алгоритма.
III.
Линейно независимое обращение к памяти натолкнуло на идею об использовании GPU для нахождения решения задачи коммивояжёра. Раз уж мы используем Python как удобный вычислительный инструмент, почему бы не использовать его и для создания решения для видеокарты.
Немного погуглив, с удивлением узнал, что в модуле numba есть класс для работы с CUDA. Мне давно хотелось опробовать данную технологию, но си подобный диалект отпугивал от её изучения. А тут прямая компиляция из подмножества Python, красота!
Собственно вот код:
from numba import cuda, types
import numpy as np
from datetime import datetime
@cuda.jit('uint32(uint32, uint32)', device=True)
def compact_idx(val, n):
"""Функция устройства. Возвращает индекс в компактной форме"""
k = (1 << n - 1) - 1
if val > k:
return k & ~val
else:
return val
@cuda.jit('void(uint32[:,:], uint32[:,:], uint32)')
def first_step(m, source, n):
"""Первый шаг алгоритма"""
idx = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
if idx >= n:
return
m[compact_idx(1 << idx, n)][idx] = source[idx][n]
@cuda.jit('void(uint32[:,:], uint32[:,:], uint32, uint32)')
def next_step(m, source, n, step):
"""Остальные шаги алгоритма"""
idx = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
if idx > 2**n:
return
if cuda.popc(idx) != step: # Возвращает количество установленных бит в 32-битном регистре
return
temp_i = idx
i = 0
while temp_i:
if temp_i & 1:
min_val = types.uint32(-1)
last_idx = types.uint32(idx ^ 1 << i)
temp_j = last_idx
j = 0
while temp_j:
if temp_j & 1 and m[compact_idx(last_idx, n)][j] + source[i][j] < min_val:
min_val = m[compact_idx(last_idx, n)][j] + source[i][j]
temp_j >>= 1
j += 1
m[compact_idx(idx, n)][i] = min_val
temp_i >>= 1
i += 1
@cuda.jit('void(uint32[:,:], uint32[:,:], uint32, uint32[:])')
def get_result(m, source, n, result):
"""Вывод результата"""
idx = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
if idx > 0:
return
min_val = types.uint32(-1)
for j in range(n):
if m[0][j] + source[n, j] < min_val:
min_val = m[0][j] + source[n, j]
result[0] = min_val
result[1] = n
result[-1] = n
temp_val = min_val
index = types.uint32((1 << n) - 1)
for i in range(n):
temp_j = index
j = 0
while temp_j:
if temp_j & 1 and m[compact_idx(index, n)][j] + source[result[i + 1]][j] == temp_val:
break
temp_j >>= 1
j += 1
result[i + 2] = j
temp_val = m[compact_idx(index, n)][j]
index ^= types.uint32(1 << j)
def tsp_dynamic_cuda(source: list[list]) -> tuple[int, list]:
"""Решает задачу коммивояжёра методом динамического програмирования c использованием CUDA GPU"""
device = cuda.get_current_device() # Сведение об устройстве исполнения
gpu_source = cuda.to_device(np.array(source, dtype=np.uint32)) # Копируем матрицу на устройство
gpu_array = cuda.device_array((2**(n-2), (n-1)), np.uint32) # Резервируем память устройства
gpu_result = cuda.device_array(n + 2, np.uint32) # Резервируем память устройства под результат
tpb = device.WARP_SIZE # Количество потоков на блок
bpg = int(np.ceil((2**(n-1)) / tpb)) # Блоков на грид
# Решение
first_step[bpg, tpb](gpu_array, gpu_source, n - 1)
for i in range(2, n):
next_step[bpg, tpb](gpu_array, gpu_source, n - 1, i)
get_result[bpg, tpb](gpu_array, gpu_source, n - 1, gpu_result) # Получение оптимального пути
result = gpu_result.copy_to_host() # Копирование результата с устройства на хост
return result[0], list(result[1:])
n = 24
# np.random.seed(1)
source = np.random.randint(10, 99, (n, n))
print(source)
print('n =', n)
start_time = datetime.now()
print('result =', tsp_dynamic_cuda(source))
date_diff = (datetime.now() - start_time).total_seconds()
print('time =', date_diff)
Если вы хотите его запустить у себя на компьютере, то вам нужна видеокарта от Nvidia.
Инструкция по установке Numba.
Логика работы практически полностью соответствует работе CPU'шного варианта, за исключением того, что мы не перебираем нужные сочетания, а даём каждой нити GPU обрабатывать свой поряковый номер.
idx = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
А затем прерываем нити в номере которых число двоичных единиц не соответствует текущему шагу алгоритма.
if cuda.popc(idx) != step:
return
cuda.popc() — аппаратная процессорная инструкция GPU, очень быстрая.
Производительность алгоритма зависит от мощности вашей видеокарты, но в любом случае не сравнить с чистым Python. Расчётный массив создаётся непосредственно на устройстве и не гуляет по шине, что сильно ускоряет выполнение.
Код программы написан для int32 чисел, однако его просто переписать под float арифметику.
Алгоритм очень устойчив по скорости от входных данных. При любых входных наборах выполняется одинаковое число операций и зависит только от размера входной матрицы.
Теперь поговорим о грустном. Так как алгоритм использует (n-1)*2^(n-2) памяти, т.е. экспоненциально от размера входного массива, увеличение входной матрицы на единицу требует более чем двукратного увеличения требуемой памяти. Данную ситуацию невозможно исправить. Очевидно, что мощность видеокарт будет расти в будущем, но всё равно объём требуемой памяти не позволит сильно улучшить ситуацию.
Пока же, игровая видеокарта Geforce rtx 4700 справляется с входной матрицей n = 28, примерно за 2 секунды, но требует почти 7GB видеопамяти.
Благодарю за внимание!