Как стать автором
Обновить
14
0
Потапов Данила @dspmsu

backend-разработчик, компьютерное зрение и ML/DL

Отправить сообщение

Я использовал nVidia A100 40Gb :) Мой код отрабатывает за 12 -> 23.936

Спасибо за интересные оптимизации.

  • Чтобы a было в нужном диапазоне, надо rev(b) < b, и можно рассматривать только те b, у которых последняя цифра не больше первой (а также не 0).

Я тоже такое пробовал, но почему-то это не давало прироста в скорости, хотя я и выравнивал интервалы по значениям, кратным 30. Наверное, нужно было как у вас явно выделить b_first.

Вот код:

Hidden text
import time
import numpy as np
import math
from numba import cuda
from numpy import uint64, int64, int32, uint8

BASE = uint64(10)

@cuda.jit
def cuda_run_core(A, oA, oB, B, d):
    mod9 = cuda.local.array(9, uint64)
    mod9[0] = 0
    mod9[1] = 0
    mod9[2] = 2
    mod9[3] = 6
    mod9[4] = 0
    mod9[5] = 8
    mod9[6] = 3
    mod9[7] = 0
    mod9[8] = 5
    mod11 = cuda.local.array(11, uint64)
    if d % 2 == 0:
        mod11[0] = 0
        mod11[10] = 0 # stub
        mod11[9] = 9
        mod11[8] = 4
        mod11[7] = 6
        mod11[6] = 7
        mod11[5] = 1
        mod11[4] = 8
        mod11[3] = 2
        mod11[2] = 3
        mod11[1] = 5
    else:
        mod11[0] = 0
        mod11[1] = 0 # stub
        mod11[2] = 9
        mod11[3] = 4
        mod11[4] = 6
        mod11[5] = 7
        mod11[6] = 1
        mod11[7] = 8
        mod11[8] = 2
        mod11[9] = 3
        mod11[10] = 5

    div2_mod9_x11 = cuda.local.array(18, uint64)  # x/2 mod 9
    for i in range(2):
        div2_mod9_x11[i*9+0] = 0*11
        div2_mod9_x11[i*9+1] = 5*11
        div2_mod9_x11[i*9+2] = 1*11
        div2_mod9_x11[i*9+3] = 6*11
        div2_mod9_x11[i*9+4] = 2*11
        div2_mod9_x11[i*9+5] = 7*11
        div2_mod9_x11[i*9+6] = 3*11
        div2_mod9_x11[i*9+7] = 8*11
        div2_mod9_x11[i*9+8] = 4*11

    shift = cuda.shared.array(99, uint8)
    b = cuda.threadIdx.x
    if b < 99:
        a = mod11[b % 11]
        a += div2_mod9_x11[9 + mod9[b % 9] - a % 9]
        shift[b] = a

    reverse = cuda.shared.array(100, uint64)
    i = cuda.threadIdx.x
    if i < 100:
        reverse[i] = cuda_rev2(i)
    BASE2 = uint64(pow(BASE, 2))

    cuda.syncthreads()

    B0 = B // BASE
    pos = cuda.grid(1)
    if pos + 1 < A.size:
        b_lo = A[pos]
        b_hi = A[pos + 1]
         
        b = b_lo - 1
        while b < b_hi - 1:
            b += 1
            if b % 3 == 1:
                b += 1
            if b % BASE == 0:
                b += 1
            if b >= b_hi:
                break
            rb = cuda_rev(b, reverse, BASE2)
            a0 = uint64(math.ceil(rb * 1.0 * B / b))
            a_lo = a0 - 20
            a_hi = a0 + BASE + 20
            a = a_lo - a_lo % 99 + shift[b % 99]
            if a < a_lo:
                a += 99
            if a < a_hi and B0 <= a < B:
                ra = cuda_rev(a, reverse, BASE2)
                err = uint64(a * b - rb * B)
                if err == ra:
                    oA[pos] = a
                    oB[pos] = b


@cuda.jit(device=True)
def cuda_rev2(x):  # precomputation for 2-digit values
    r = uint64(0)
    for i in range(2):
        r = r * BASE + x % BASE
        x = x // BASE
    return uint64(r)

@cuda.jit(device=True)
def cuda_rev(x, reverse, BASE2):  # 33% speedup for 100-valued reverse
    r = uint64(0)
    while x > 0:
        r = r * BASE2 + reverse[x % BASE2]
        x = x // BASE2
    while r % BASE == 0:
        r = r // BASE
    return uint64(r)

def rev(x):  # CPU version
    r = 0
    while x > 0:
        r = r * BASE + x % BASE
        x = x // BASE
    return r

def run_cuda(d):

    num = 2**18
    A = np.linspace(pow(BASE, d - 1), pow(BASE, d), num).astype(np.uint64)
    oA, oB = np.zeros_like(A), np.zeros_like(A)
    threadsperblock = 2**9
    blockspergrid = (A.size - 1 + (threadsperblock - 1)) // threadsperblock
    A, oA, oB = cuda.to_device(A), cuda.to_device(oA), cuda.to_device(oB)
    cuda_run_core[blockspergrid, threadsperblock](A, oA, oB, uint64(pow(BASE, d)), uint64(d))
    A, oA, oB = A.copy_to_host(), oA.copy_to_host(), oB.copy_to_host()
    for x, y in zip(oA, oB):
        if x:
            print(x, '*', y, '=', rev(y), rev(x))


def main():
    for d in range(6, 17):
        t0 = time.perf_counter()
        run_cuda(d)
        t1 = time.perf_counter()
        print(f'{d} -> {t1-t0:.3f}')

if __name__ == '__main__':
    main()

  1. Используются свойства остатков от деления на 3, 9, 11. Остаток отделения на 3 и 9 равен остатку от деления на 3 и 9 суммы цифр числа. Остаток от деления на 11 равен остатку от деления на 11 знакопеременной суммы цифр начиная с конца. Пусть a, b: a*b = rev(b)rev(a). Тогда для каждого b возможно не более одного остатка от деления a на 9 и не более одного остатка от деления a на 11. Это легко проверить, перебрав все пары остатков от деления и сравнив их произведения с суммой или разностью (в случае нечётных чисел и деления на 11). Если эти свойства объединить, то получаем для каждого b не более 1 остатка от деления на 99. Его можно получить из уравнения a = 99t + 9k + mod9[b] = 99t + 11l + mod11[b]. Далее, поскольку 0 <= l < 9, можем перейти к уравнению по модулю 9 и выразить l через деление на 2 по модулю 9.

  2. Остаток от деления a и b на 3 не может быть равен 1. Однако если мы в цикле сделаем continue, то код не будет быстрее работать, поскольку в соседних нитях остаток от деления может быть не равен 1, а, если я правильно помню из курса по CUDA, все нити одного блока всегда выполняют одинаковую строчку кода. Поэтому вместо continue используется += 1 и цикл while вместо for.

  3. Для каждого b в оригинальной версии кода рассматривался диапазон от a0 до a0+BASE. Но при n=16 у нас последняя цифра может быть неправильной, поскольку точность типа double в общем случае только 15 знаков. Поэтому я взял диапазон от a0-20 до a0+BASE+20, чтобы заведомо покрыть все нужные значения (предпоследняя цифра вроде тоже может быть округлена в +-1). В этот диапазон попадает не более 1 остатка от деления на 99.

  4. Я также ускорил функцию rev, предрассчитав обратные для пар цифр, таким образом делается в 2 раза меньше витков цикла.

Я ускорил ваш алгоритм примерно в 4 раза и расширил его до n=16 (в умножении типа double уже не хватает точности в 15 знаков). Таким образом удалось проверить n=15 и n=16. На это ушло примерно 3,5 дня.

Для n=15 ничего не нашлось, для n=16 нашлась одна пара:

8830950118748589 * 3376117662341892 = 29814326671167339858478110590388

Надеюсь, ничего не пропустил.

Позже повнимательнее посмотрел - на самом деле требуется не так уже много памяти для n=7 - чуть более 2 Гб. Поэтому запустил код на 11 ядрах и где-то за 12 часов воспроизвёл результат @gchebanov213002162.

Я взял оценку числа неупорядоченных пар для n=3..18, затем для n=4..18 посчитал коэффициент роста этой последовательности (относительно предыдущего значения) и построил график. Поскольку на бесконечности коэффициент роста не ожидается более 100, здесь явно не логарифмическая функция. Я предположил, что это экспоненциальная функция со смещением, и приближение получилось довольно точным. Приближал методом градиентного спуска с клипированием. Таким образом на бесконечности коэффициент роста ожидается приблизительно 81,6.

Спасибо! При d=18 коэффициент роста последовательности оценки количества пар равняется примерно 77,2. Попробую построить прогноз, к чему придём на бесконечности.

А вы не против, если я ваш код приведу как пример на oeis.org?

Я вот эту последовательность хочу зарегистрировать https://oeis.org/draft/A370723

Либо можете сами зарегистрироваться и вписать в раздел PROG - там любой человек может черновик редактировать.

Нашёл последовательность для переворачивающихся чисел не обязательно одинаковой длины: A009944 - OEIS

182738252812 туда тоже входит, но его официально не подтвердили, поэтому по нему не ищется.

Добавил в базу две последовательности. Пока они в статусе draft.

A370675 - OEIS

A370676 - OEIS

Все 4 оптимизации очень достойные!

Ваше умение обращаться с GPU впечатляет!

Я рассматривал равенство i*j = rev(j)rev(i) и перебирал всевозможные варианты числа i, то есть в отличие от вас, я перебирал младшие разряды произведения. Допустим для n = 4: abcd*xyzt = tzyxdcba, где xyzt нам неизвестно. Сначала рассматриваем разряд единиц: a = (d*t)%10. Чтобы найти t, я предрассчитал все варианты t для всех пар a и d. В цикле перебираем все варианты t и запускаем рекурсию, переходя к следующему разряду.
Разряд десятков: b = (c*t + d*z + (d*t)//10)%10. Нам уже известно значение t, поэтому можем в модульной арифметике записать (d*z) % 10 = (b - c*t + (d*t)//10) % 10. И снова находим z пользуясь предрассчитанными значениями, в цикле перебираем все варианты и запускаем рекурсию, переходя к следующему разряду.
Разряд сотен: c = (b*t + c*z + d*y + (c*t + d*z + (d*t)//10)//10)%10. Снова находим d*y в модульной арифметике и так далее.

Значение в скобках я обозначил accum - его не надо пересчитывать полностью, а только добавлять новые слагаемые и делить на 10.

В некоторых случаях оказывается, что вариантов очередной цифры нет и рекурсия просто завершается (например (2*t)%10 = 3 не имеет решений). Если дошли до конца, то проверяем, выполняется ли требуемое равенство для найденного значения xyzt.

P.S. si и sj хранят цифры чисел i и j.

Интересная идея!
Понимаю, что вы не питонист, но по-моему, range должен идти от 0 до 10+1, поскольку последнее значение не включается и теоретически может так оказаться, что дробная часть первого и второго слагаемого в сумме дают ещё один. В усовершенствованном варианте получается range(0, 10//a[0] + 1).
Ну и если добавить проверки на a % 3 != 1 и b % 3 != 1, то получится ещё почти в 2 раза ускорение.

Исходная задача

Hidden text
for n in range(1, 4+1):
    p = 10**(n-1)
    count = 0
    for i in range(p, p*10):
        for j in range(i, p*10):
            if sorted(str(i)+str(j)) == sorted(str(i*j)):
                count += 1
    print(n, count)

Переворачивающиеся при умножении числа

Hidden text
MIN = 1
MAX = 8

divisors_mod10 = [[[] for j in range(10)] for i in range(10)]
for i in range(10):
    for j in range(10):
        divisors_mod10[(i*j)%10][i].append(j)

def f(i, si, n, k, sj, accum):
    if k == n:
        if sj[0] >= sj[n-1]:
            j = int("".join(map(str, sj)))
            sm = "".join(map(str, (si + sj)[::-1]))
            if int(sm) == i*j:
                print(f"{i} * {j} = {sm}")
            
        return
    
    for k1 in range(k):
        accum += si[n-1-k+k1]*sj[n-1-k1]
    
    vd = divisors_mod10[(si[k] - (accum % 10) + 10) % 10][si[n-1]]
    for divisor in vd:
        sj[n-1-k] = divisor

        if k == 0 and si[0] < sj[n-1]:
            continue

        f(i, si, n, k+1, sj, (accum + si[n-1]*sj[n-1-k])//10)

for n in range(MIN, MAX+1):
    print(f"n={n}")
    p = 10**(n-1)
    for i in range(p+1, p*10):
        if i % 3 == 1:
            continue
        if i % p == 0:
            print(i)
        si = list(map(int, str(i)))
        sj = [0]*len(si)    
        f(i, si, n, 0, sj, 0);

Если вкратце, в произведении i*j=m перебираются всевозможные i и все возможные первые t=4 цифры числа m (обозначим за x). По этим значениям определяется, каким может быть число j – оно лежит в некотором диапазоне и при этом должно обязательно содержать цифры, которые входят в x и не входят в i. Списки чисел, которые содержат определённый набор цифр предрассчитываются, поэтому требуется много памяти при больших n.

Думаю, что не может. На math.stackexchange.com мне вообще рекомендовали к нему не обращаться)

Информация

В рейтинге
Не участвует
Зарегистрирован
Активность