Привет, Хабр! В статье хотел бы поделиться своим опытом восстановления данных (на самом деле, всего одной фотографии), который перерос в любопытный кейс применения методов машинного обучения для решения задачи реконструкции файлов изображений. Думаю, что проблема с восстановлением удаленной информации с носителей весьма актуальна для читателей Хабра (и обычных юзеров, и целых компаний), поэтому поделюсь некоторыми наработками. Все это – под катом.  

Восстановить удаленный файл с флешки? Это же просто!

Все началось с того, что я случайно удалил фотографию на флешке, и решил, что проблемы нет - ведь в наличии множество классных инструментов по работе с удаленными данными. Недолго думая, я натравил на устройство замечательную программу R-Studio (программа предназначена для работы с содержимым носителей информации на уровне секторов, ничего общего с языком R не имеет). Но все оказалось не так просто. Результат меня не порадовал: изображение оказалось «битым».

Пример «битого» изображения. Кстати, угадайте, что за зверь?

Проблема довольно быстро обнаружилась: драйвер файловой системы выделил под фотографию несколько наборов последовательных блоков (вообще-то они называются кластерами, но в машинном обучении термин «кластер» тоже используется, поэтому буду называть их блоками). Один набор (с заголовком формата JPG) удалось найти, а информация об остальных наборах пропала. Однако сами блоки на флешке остались, они никуда не делись. Соответственно, шансы на восстановление изображения все еще оставались, и неплохие.

Пример распределения файла в кластерах тома

Что же делать?

Хороший вопрос. Искал ответ на него не то, чтобы долго, но решить проблему за пару минут тоже не удалось. Помучившись, я заметил, что если попробовать вручную дописывать к изображению «правильный» блок (то есть блок данных, который ранее находился по данному смещению), то визуально картинка практически не изменится. В этом случае изменится лишь одно – серая полоса внизу станет меньше. Если же к изображению дописать случайно выбранный блок данных, то на его границе появится «артефакт» – фрагмент изображения, резко отличающийся от окружения.

Пример артефакта. В демонстрационных целях взято другое изображение

Решение возникло само собой. Надо перебрать свободные блоки 32-гигабайтной флешки, дописывая каждый блок в конец восстановленной фотографии, и проанализировать результат (то есть вывести изображение на экран и проверить наличие артефакта). Для работы с «битыми» изображениями можно использовать Python-библиотеку Pillow.

Вот код возможного решения – оно позволяет дописать блок-кандидат к восстановленному изображению и вывести результат на экран.

from PIL import Image, ImageFile, ImageOps
from pylab import rcParams

ImageFile.LOAD_TRUNCATED_IMAGES = True

def build_and_show(restored: bytes, candidate_block: bytes):
      rcParams['figure.figsize'] = 25, 40
      with open('restored.jpg', 'wb') as r:
            r.write(restored)
            r.write(candidate_block)
      img = Image.open('restored.jpg')
      plt.imshow(np.asarray(img))

Но здесь появляется еще одна проблема. Нельзя просто взять и просмотреть миллион изображений. Потребовался алгоритм, который бы определял, насколько произвольный блок данных подходит к восстановленному изображению в качестве следующего блока. Звучит, как задача для ИИ. Что же – если есть идея, нужно пробовать ее реализовать. Для начала я решил использовать классические методы машинного обучения.

Сначала проводил EDA и исследовал свойства областей на границе исходного изображения и добавленного фрагмента. На картинке ниже – пример изображения, где красным цветом выделена изучаемая область.

Пример области на границе изображения и добавленного фрагмента

Небольшое объяснение того, что мы видим помимо фрагмента неизвестного животного и красного прямоугольника. Каждому пикселю области соответствует следующий набор признаков:

  • x, y – координаты пикселя на изображение;

  • r, g, b – тройка чисел, кодирующих цвет пикселя.

Также, исходя из координат пикселя и информации об исходном изображении, можно ввести следующие признаки:

  • original – принимает значение 1, если пиксель принадлежит исходному изображению, иначе 0;

  • border – принимает значение 1, если пиксель лежит на границе со стороны исходного изображения или добавленного фрагмента, иначе 0.

Теперь пришло время объединить все векторные представления пикселей области и получить датасет (назовем его pixels). Используя RGB-координаты пикселей, можно построить кластерную структуру палитры изучаемой области. Для построения кластерной структуры использовался алгоритм k-means с числом кластеров равным 16. Метка кластера, которому соответствует цвет пикселя, будет служить его дополнительным признаком (назовем его cluster).

clustering_columns = ['r', 'g', 'b']
kmeans = KMeans(n_clusters=16, random_state=42).fit(pixels[clustering_columns])
pixels['cluster'] = kmeans.labels_

Задействуя библиотеки matplotlib и seaborn, можно построить две частотные диаграммы признака cluster. Одну – для граничных точек оригинального изображения, а другую – для граничных точек добавленного фрагмента. Библиотека позволяет совмещать построенные диаграммы на одном графике и выделять их разными цветами.

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

border_pixels = pixels[pixels['border'] == 1]
sns.pairplot(border_pixels[['cluster', 'original']], hue='original', palette=['g','b'], size=5)
plt.show()

Строя такие гистограммы для разных блоков данных, удалось заметить следующую закономерность:

  1. Если добавленный блок данных является «правильным», то формы частотных диаграмм будут «похожи».

  2. Если к изображению был добавлен случайный фрагмент, то формы частотных диаграмм будут отличаться.

Пример «похожих» диаграмм для «правильного» блока данных
Пример «непохожих» диаграмм для случайного блока данных

Определить «похожесть» двух распределений вероятностей можно с помощью дивергенции Кульбаха-Лейбнера, которая обладает свойством расстояния: чем она меньше, тем распределения «ближе».

В нашем случае вместо распределений вероятностей были использованы вектора, описывающие распределение цветов пикселей по кластерам на границе исходного изображения и добавленного фрагмента. Вектор P соответствует такому распределению для граничных пикселей изображения, а вектор Q – для граничных пикселей фрагмента. Следующий код демонстрирует, как построить вектора P и Q для датасета pixels.

P = pixels[ (pixels[„border“] == 1)  &  (pixels[„original“] == 1) ][„cluster“].value_counts().to_dict()
Q = pixels[ (pixels[„border“] == 1)  & (pixels[„original“] == 0) ][„cluster“].value_counts().to_dict()

Так я вычисляю значение KL-дивергенции.

def kl_divergence(P: dict, Q: dict) -> float:
    if len(P) == len(Q):
        dist = 0.0
        for i in P:
            if i not in Q:
                p = P[i] + 1
                q = 1
            else:
                p = P[i]
                q = Q[i]
            dist = dist + p * math.log2((p / q))
    else:
        dist = 1000.0
   return dist

Также оказалось, что по аналогии с рекомендательными системами соответствие изображения и добавленного фрагмента друг другу можно оценить с помощью скалярного произведения указанных векторов (чем оно больше, тем больше сходство).

def similarity(P: dict, Q: dict) -> float:
    if len(P) == len(Q):
        dist = 0.0
        for i in P:
              if i in Q:
                   dist = dist + P[i] * Q[i]
    else:
        dist = 0.0
    return dist

Опытным путем я установил, что вычислять «дивергенцию» и скалярное произведение лучше для распределений пикселей, расположенных вдоль скользящего окна, длиной 32 пикселя. При этом отношение двух этих величин для «правильных» блоков будет меньше 0.05. Перемещая окно, мы суммируем «голоса» за правильные блоки и делим на общее число «голосов». Это позволяет уменьшить ошибку классификации в случае, если на недостающем фрагменте появлялся новый объект, который отсутствовал в исходном изображении.

def calculate_heuristic(pixels: pd.DataFrame, frame_width: int=32):
       min_y = pixels['y'].min()
       max_y = pixels['y'].max()
       vote = 0
       votes_count = 0
       #шаг скользящего окна шириной frame_width
       step = 8
       #перемещаем окно вдоль границы
       for rang in range(min_y, max_y - frame_width, step):
            #выделим область внутри окна
            frame = pixels[pixels['y'] >= rang]
            frame = frame[frame['y'] < rang + pict_length]
            #вычисляем распределения
            df_P = frame[ (frame['border'] == 1) & (frame['original'] == 1) ]['cluster']
            df_Q = frame[ (frame['border'] == 1) & (frame['original'] == 0) ]['cluster']
            P = df_P.value_counts().to_dict()
            Q = df_Q.value_counts().to_dict()
            #вычисляем эвристику и учитываем голос "за" "правильный блок"
            if kl_divergence(P, Q) < 0.05 * similarity(P, Q):
                  vote += 1
	#подсчитываем общее число голосов
	votes_count += 1
       #выдаем результат
       return float(res / count) if count else 0.0

Получаемое значение является уникальной характеристикой блока при фиксированном изображении, и, как оказалось, у блока, дополняющего изображение, оно больше, чем у случайных блоков. Таким образом остается просто выбрать блок с максимальным значением эвристики.

Алгоритм был протестирован на коллекции из 200 фотографий. При проверке подсчитывалась доля правильных ответов для правильных и неправильных блоков. Результаты приведены в таблице и на картинке.

Результаты обнаружения

Доля правильных ответов (accuracy)

«Правильных» блоков

0,75

«Неправильных» блоков

0,82

Это капибара

Запускаем нейросеть!

Алгоритм работает, да, и вполне неплохо. Но у него есть и существенный недостаток. А именно - его низкая скорость. Основная задержка приходится на вычисление кластерной структуры палитры области на границе. Это имеет значение, когда для подбора одного-единственного подходящего блока размером 4096 байт нужно перебрать несколько миллионов блоков!

Недостаток есть, но от него можно избавиться при помощи сверточной нейронной сети. И для ее подготовки я как раз и задействовал наш медленный алгоритм.

Для обучения нейронной сети я составил  набор фреймов (содержимого окон вдоль границы изображения и добавленного фрагмента) размером 32 на 32 с тремя каналами RGB и набор меток классов, вычисленных для каждого фрейма с помощью описанного выше алгоритма  (1 – для «правильных» блоков по оценке алгоритма, иначе 0).

Чтобы не захламлять память, фреймы сохранялись в файле на диске (в примерах tensor.db), а смещение и метка класса каждого фрейма – в датасете (в примерах index_df). Всего получилось порядка 600000 фреймов. Для работы с ними был определен класс, унаследованный от класса Dataset.

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import torch.optim as opt


class MyDataset(Dataset):
    def __init__(self, X, y):
        self.file_name = './tensor.db'
        self.frame_width = 32
        self.block_size = self.frame_width * self.frame_width * 3
        self.X = torch.from_numpy(X.to_numpy(dtype='int64'))
        self.y = torch.from_numpy(y.to_numpy(dtype='int8')).float()
    # определяемый пользователем метод вычисления размера выборки
    def __len__(self):
        return self.X.shape[0]
   # определяемый пользователем метод получения элемента выборки
    def __getitem__(self, index):
        tensor_id = self.X[index].item()
        with open(self.file_name, 'rb') as raw:
            raw.seek(tensor_id * self.block_size, 0)
            buffer = raw.read(self.block_size)
        np_data = np.frombuffer(buffer, dtype=np.uint8)
        return (torch.from_numpy(data.float(), self.y[index])

Этот класс был использован при создании генераторов - загрузчиков данных из обучающей и тестовой выборок (трейн и тест).

X = index_df['tensor_id']
Y = index_df['value']
train_X, test_X, train_Y, test_Y = train_test_split(X, Y, test_size=0.2, random_state=42, stratify=Y)
train_X.shape, test_X.shape, train_Y.shape, test_Y.shape
# Создание обектов-генераторов для работы с батчами
TrainData = MyDataset(train_X, train_Y)
train_loader = DataLoader(TrainData, batch_size = batch_size, shuffle=True)
TestData = MyDataset(test_X, test_Y)
test_loader = DataLoader(TestData, batch_size = batch_size, shuffle=False)

Архитектура нейросети получилось такой.

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(32, 64, 5)
        self.conv3 = nn.Conv2d(64, 128, 5)
        self.fc1 = nn.Linear(2048, 1024)
        self.fc2 = nn.Linear(1024, 1)

    def forward(self, tensors):
        x = nn.ReLU(self.conv1(tensors))
        x = nn.ReLU(self.conv2(x))
        x = self.pool(x)
        x = nn.ReLU(self.conv3(x))
        x = self.pool(x)
        x = nn.ReLU(self.fc1(x.view(x.size(0), -1)))
        x = self.fc2(x.view(x.size(0), -1))
        return torch.sigmoid(x).view(-1)

Остальной код был позаимствован здесь и здесь. В качестве функционала ошибки был взят Binary Cross Entropy Loss, применяемый в задачах бинарной классификации. Для оптимизации использовался метод стохастического градиентного спуска (SGD).

# Задаем основные параметры модели
criterion = nn.BCELoss()
epochs = 64
best_epoch = -1
best_loss = 100000.0
batch_size = 32 
optimizer = torch.optim.SGD(net.parameters(), lr=0.001)
# Цикл по эпохам (всего 64, но возможен и досрочный выход) 
net = Net()
for epoch in range(epochs):
    net.train()
    running_loss = 0.0
    i = 0    
# Цикл по батчам (небольшим подвыборкам), которые выдает наш генератор трейновых данных
    for images, labels in iter(train_loader):
            optimizer.zero_grad()
            # Получаем оценку модели
            out = net(images)
            # Вычисляем ошибку
            loss = criterion(out, labels)
            running_loss += loss.item()        
            # Вычисляем градиент
            loss.backward()
            # Изменяем веса
            optimizer.step()
            i += 1
    # Вычисляем среднее значение ошибки
    avg_loss = running_loss/len(train_loader.sampler)
    print('Epoch: ', epoch+1, 'Average Loss: ', avg_loss)
    # В конце каждой эпохи опробуем модель на тестовой выборке
    net.eval()
    mean_loss = 0
    batch_n = 0
    # При валидации градиенты не считаем
    with torch.no_grad():
        for batch_X, target in iter(test_loader):
            predicted_values = net(batch_X)
            loss = criterion(predicted_values, target)
            mean_loss += loss.item()
            batch_n += 1
        mean_loss /= batch_n
        print(f'Loss_val: {mean_loss}')  
        # Вычисляем наименьшее значение ошибки
        if best_loss > mean_loss:
            best_epoch = epoch
            best_loss = mean_loss
   # Если в течение 10 эпох ошибка на тесте не менялась - выходим
    if epoch - best_epoch > 10:
        break

Успешный успех!

После обучения сверточной сети результирующая ошибка на трейне составила 0.003, на тесте 0.08. Полученная модель была применена в новой версии алгоритма. При этом удалось отказаться от формирования датасета pixels и определения его кластерной структуры, за счет того, что решение о «правильности» каждого фрейма принималось нейросетью сразу при его формировании.