Обучаем нейросеть управлению транспортным средством на основе мультисенсорных данных и информации о локальной траектории
Автор статьи: Егор Колотвин
Современные системы автономного вождения стремительно развиваются, объединяя достижения в области машинного обучения, робототехники и сенсорных технологий. Одной из ключевых задач при построении таких систем остается точная интерпретация окружающей среды и принятие решений в условиях высокой неопределенности. Эффективное управление транспортным средством требует надежного восприятия дорожной сцены, для чего всё чаще применяется мультисенсорный подход — объединение данных с различных сенсоров, — а также использование заранее рассчитанных траекторий в качестве ориентира.
Почему я взялся за этот проект? Всегда было интересно, как «видит» дорогу беспилотник и как можно научить его принимать решения. Хотелось не просто изучить подходы из книжек и статей, а руками собрать полный пайплайн — от сбора данных до управления на основе мультисенсорного восприятия и локальных траекторий.
Для обучения и тестирования модели я использовал CARLA Simulator. Одним из ключевых преимуществ этого симулятора является возможность синхронного подключения и настройки различных сенсоров, что делает его особенно подходящим для задач мультисенсорного восприятия.
Почему CARLA?
Гибкость: позволяет задавать различные сценарии движения, конфигурации трасс и поведение окружающих участников
Поддержка мультисенсорной конфигурации: может одновременно использовать датчики различных типов: лидары, RGB‑камеры, семантические камеры и другие сенсоры
Безопасность: обучение в симуляции исключает риски, связанные с реальными испытаниями, и ускоряет цикл разработки.
Как обучаем модель?
Среди наиболее распространённых подходов к обучению моделей управления можно выделить:
Reinforcement Learning — обучаем через пробы и ошибки агента с системой вознаграждений;
Behavioral Cloning — копируем поведения эксперта по заранее собранным данным;
Имитационное обучение по траекториям — следуем по заранее заданным маршрутам движения.
На этом данном я выбрал поведенческое клонирование, немного изменив этот подход к обучению — добавил информацию о локальной траектории, то есть о том, куда планируется движение в ближайшие секунды. Это даёт модели «намерение» — не просто реагировать на окружение, а действовать осмысленно. Для формирования входных данных использовал RGB‑изображения, семантические карты сегментации и глубинную информацию, полученную с лидаров, что обеспечивает комплексное восприятие сцены на каждом временном шаге.
Такой гибридный подход — управление на основе мультисенсорного восприятия с учетом локальной траектории — позволяет достичь компромисса между адаптивностью, интерпретируемостью и надёжностью модели, что делает его перспективным для применения как в симуляции, так и в реальных условиях.
1. Как устроена система
Все начинается с построения пайплайна: от генерации маршрутов и сбора данных до обучения модели управления.
Генерация маршрутов и локальной траектории. В начале каждой симуляции создается основной маршрут — последовательность вейпоинтов от начальной до конечной точки движения. По мере движения для каждого кадра достаем из маршрута 20 ближайших точек, формирующих локальную траекторию движения. Зачем это нужно? Чтобы модель знала не только текущее положение, но и «куда держим курс». Это помогает ей адаптироваться к ситуации на дороге.
Сбор данных. Далее запускаем симуляцию: авто под управлением агента едет по заданному маршруту, собирая при этом данные с сенсоров. Все данные синхронизируются по кадрам(у каждого кадра свой frame_id): RGB‑камеры, семантические камеры, лидары, значения скорости, управляющие команды
Предобработка.На этом этапе структурируем все, что собрали: склеиваем сенсорные данные в мультиканальные изображения; привязываем каждому кадру локальную траекторию(те самые 20 точек); добавляем текущее значение скорости и состояние светофора; подтягиваем управляющие команды(они же и будут таргетами в датасете)
Обучение модели. Модель принимает на вход следующие компоненты: мультиканальное изображение — 15 каналов:RGB, глубина, семантика; локальная траектория — последовательность из 20 ближайших точек основного маршрута; состояние светофора — вектор из 3 признаков; скорость транспортного средства — необходима для понимания, насколько агрессивно нужно действовать; таргеты — значения ускорения, торможения и угла поворота руля.
2. Настройка окружения в CARLA
Первым делом подключаемся к симулятору и переводим его в синхронный режим. Это важно, так как в асинхронном режиме кадры могут теряться или сдвигаться по времени — особенно при большом числе сенсоров.
2.1 Настраиваем симуляцию
client = carla.Client("localhost", 2000)
client.set_timeout(20.0)
world = client.get_world()
settings = world.get_settings()
settings.synchronous_mode = True
settings.fixed_delta_seconds = 0.1
world.apply_settings(settings)
Синхронный режим обеспечивает управление каждым шагом симуляции вручную, а фиксированное значение delta_seconds = 0.1
соответствует частоте обновления в 10 Гц.
2.2 Выбираем транспортное средство
Выбираем авто, которое будет ездить по маршрутам, а также задаем его цвет. В примере — Mini Cooper, но можно выбрать любой из стандартной библиотеки CARLA
bp_lib = world.get_blueprint_library()
vehicle_blueprints = bp_lib.filter('vehicle.*')
vehicle_bp = bp_lib.find('vehicle.mini.cooper_s_2021')
vehicle_bp.set_attribute('color', '250, 240, 230')
2.3 Генерируем маршрут
Для построения маршрута используем функцию build_random_long_route()
. Она выбирает случайную стартовую точку и наращивает маршрут, пока не наберется нужная длина. Под капотом — глобальный планировщик GlobalRoutePlanner, позволяющий строить реалистичные маршруты с учетом инфраструктуры города
from agents.navigation.global_route_planner import GlobalRoutePlanner
from agents.navigation.global_route_planner_dao import GlobalRoutePlannerDAO
from agents.navigation.behavior_agent import BehaviorAgent
def build_random_long_route(world, planner, min_distance=20000.0):
spawn_points = world.get_map().get_spawn_points()
start_transform = random.choice(spawn_points)
current_location = start_transform.location
total_route = []
total_length = 0.0
while total_length < min_distance:
next_location = random.choice(spawn_points).location
segment = planner.trace_route(current_location, next_location)
if len(segment) < 2:
continue
for i in range(len(segment) - 1):
wp1 = segment[i][0].transform.location
wp2 = segment[i + 1][0].transform.location
total_length += wp1.distance(wp2)
total_route.extend(segment)
current_location = next_location
return start_transform, total_route
Задаем планировщик и строим маршрут:
dao = GlobalRoutePlannerDAO(world.get_map(), sampling_resolution=1.0)
grp = GlobalRoutePlanner(dao, sampling_resolution=1.0)
spawn_transform, route = build_random_long_route(world, grp, min_distance=2600)
2.4 Управляющий агент
Делаем спавн транспортного средства в начальной точке маршрута, после чего создаем агента (BehaviorAgent):
vehicle = world.spawn_actor(vehicle_bp, spawn_transform)
agent = BehaviorAgent(vehicle, behavior='aggressive')
agent.set_global_plan(route)
Агент получает доступ к локальному планировщику, буферу ближайших точек маршрута и параметрам маневренности:
agent._local_planner._lane_change = False
agent._local_planner._buffer_size = 20
agent._local_planner._min_distance = 1.5
Агент будет строго следовать текущей полосе, что важно для стабильного сбора данных и повторяемости поведения;
Агент будет учитывать 20 ближайших точек маршрута при планировании движения;
Минимальная дистанция между агентом и следующей целевой точкой: Когда расстояние до ближайшей точки траектории меньше 1.5 метра — агент переходит к следующей.
Таким образом мы формировали траекторию движения, по которой транспортное средство будет перемещаться под управлением агента, в то время как система синхронно собирает данные с сенсоров, значения управляющих воздействий, текущую скорость и другие параметры, необходимые для последующего обучения модели.
2.5 Настройка сенсоров
Для имитации мультисенсорного восприятия окружающей среды на транспортное средство устанавливаем различные типы сенсоров: RGB‑камеры, семантические камеры и лидары. Все сенсоры работают в синхронном режиме с частотой обновления 2.5 Гц (sensor_tick = 0.4
), что позволяет получать согласованные данные на каждом временном шаге.
Создаем и настраиваем RGB‑камеры:
camera_bp = bp_lib.find('sensor.camera.rgb')
camera_bp.set_attribute("image_size_x", "1280")
camera_bp.set_attribute("image_size_y", "720")
camera_bp.set_attribute('sensor_tick', '0.4')
camera_bp.set_attribute("fov", "90")
Для расширенного обзора используем три RGB‑камеры: фронтальная и две боковые. Они крепятся к автомобилю с соответствующими углами поворота:
camera_front = world.spawn_actor(
camera_bp,
carla.Transform(carla.Location(x=1.0, y=0.0, z=1.8)),
attach_to=vehicle
)
camera_left = world.spawn_actor(
camera_bp,
carla.Transform(carla.Location(x=1.0, y=-0.5, z=1.8), carla.Rotation(yaw=-45)),
attach_to=vehicle
)
camera_right = world.spawn_actor(
camera_bp,
carla.Transform(carla.Location(x=1.0, y=0.5, z=1.8), carla.Rotation(yaw=45)),
attach_to=vehicle
)
Аналогично RGB настраиваем и размещаем три семантические камеры, которые предоставляют информацию о классах объектов (дорога, светофоры, здания и т. д.):
semantic_bp = bp_lib.find('sensor.camera.semantic_segmentation')
semantic_bp.set_attribute('image_size_x', '1280')
semantic_bp.set_attribute('image_size_y', '720')
semantic_bp.set_attribute('sensor_tick', '0.4')
semantic_bp.set_attribute("fov", "90")
semantic_camera_front = world.spawn_actor(
semantic_bp,
carla.Transform(carla.Location(x=1.0, y=0.0, z=1.8)),
attach_to=vehicle
)
semantic_camera_left = world.spawn_actor(
semantic_bp,
carla.Transform(carla.Location(x=1.0, y=-0.5, z=1.8), carla.Rotation(yaw=-45)),
attach_to=vehicle
)
semantic_camera_right = world.spawn_actor(
semantic_bp,
carla.Transform(carla.Location(x=1.0, y=0.5, z=1.8), carla.Rotation(yaw=45)),
attach_to=vehicle
)
Создание и настройка лидаров:
lidar_bp = bp_lib.find('sensor.lidar.ray_cast')
lidar_bp.set_attribute('channels', '64')
lidar_bp.set_attribute('points_per_second', '500000')
lidar_bp.set_attribute('range', '50.0')
lidar_bp.set_attribute('rotation_frequency', '20.0')
lidar_bp.set_attribute('upper_fov', '15.0')
lidar_bp.set_attribute('lower_fov', '-40.0')
lidar_bp.set_attribute('sensor_tick', '0.4')
Для получения глубинной информации о сцене на транспортное средство устанавливаем четыре лидара, каждый из которых обеспечивает обзор в определенной зоне:
lidar_roof — расположен на крыше автомобиля, направлен вперед, охватывает обзор на 360° по горизонтали и широкую область по вертикали. Используется как основной источник глобального 3D‑представления окружающей среды.
lidar_front — установлен в передней части автомобиля, в районе решетки радиатора. Обеспечивает детальную информацию о ближней области перед машиной, особенно полезную при маневрах на низкой скорости.
lidar_left — расположен над левым передним колесом, повернут влево (yaw = 90°). Позволяет чувствовать объекты и препятствия сбоку
lidar_right — симметричен левому, установлен над правым передним колесом и направлен вправо (yaw = -90°). Обеспечивает обзор правой боковой зоны.
lidar_positions = {
"lidar_roof": carla.Transform(carla.Location(x=-1.0, y=0.0, z=1.8)),
"lidar_left": carla.Transform(carla.Location(x=0.5, y=-1.0, z=1.0), carla.Rotation(yaw=90)),
"lidar_right": carla.Transform(carla.Location(x=0.5, y=1.0, z=1.0), carla.Rotation(yaw=-90)),
"lidar_front": carla.Transform(carla.Location(x=2.0, y=0.0, z=0.9))
}
lidars = {}
for name, transform in lidar_positions.items():
lidar = world.spawn_actor(lidar_bp, transform, attach_to=vehicle)
lidars[name] = lidar
Установив такой набор сенсоров мы обеспечиваем многоканальное и пространственно‑разнообразное восприятие сцены, приближённое к тому, как работает реальный автономный автомобиль. Эти данные формируют основу для входа в нейросетевую модель и влияют на её обобщающую способность.
3. Как собирались и обрабатывались данные
Чтобы учить модель управлять транспортом, нужно много качественных данных(от 5 до 8 тысяч дорожных сцен). Ниже расскажу, как у меня получилось всё это организовать, синхронизировать и упаковать перед обучением
3.1 Структура данных
Для хранения данных я инициализировал следующие csv‑файлы:
vehicle_commands.csv
— управляющие команды (угол поворота руля, ускорение(газ), торможение) и скорость;traffic_light.csv
— состояние ближайшего светофора (если применимо в дорожной сцене).
RGB и семантические изображения сохранял в формате .png и .npy соответственно. Лидарные облака точек объединял из четырех сенсоров и сохранял в .npy, а проекции на изображение — в .npz.
3.2 Буферизация и запись
Для синхронной обработки и записи данных использовали общий data_buffer, обновляемый в отдельных потоках (запись при наличии отработки всех сенсоров в кадре):
def save_data(frame_id):
if not all(sensor in data_buffer[frame_id] for sensor in SENSORS):
return
# Сохранение RGB, карт сегментаций, управляющих команд, состояний светофоров, облаков лидарных точек и локальных траекторий
3.3 Обработка данных с сенсоров
Каждому сенсору соответствует своя функция обработки:
RGB и семантические изображения — через
process_camera_image() и process_semantic_image()
;Лидары —
process_lidar_data() и process_lidar_scene()
(объединение всех лидарных облаков в единое).
3.4 Формирование локальной траектории
Для каждого кадра извлекал локальную траекторию — 20 ближайших точек от текущей позиции относительно глобального маршрута. Эти точки преобразовывал в локальную систему координат:
dx = loc.x - vehicle_loc.x
dy = loc.y - vehicle_loc.y
trajectory_local.append((dx, dy))
3.5 Проецирование объединенного облака лидарных точек на плоскости камер
Задаем матрицу внутренних параметров камеры K:
def get_camera_intrinsics(camera):
width = int(camera.attributes['image_size_x'])
height = int(camera.attributes['image_size_y'])
fov = float(camera.attributes['fov'])
f_x = width / (2 * np.tan(np.radians(fov) / 2))
f_y = f_x
c_x = width / 2
c_y = height / 2
K = np.array([
[f_x, 0, c_x],
[0, f_y, c_y],
[0, 0, 1]
])
return K
Задаем матрицу преобразования глобальных координат в локальные координаты камеры world_to_cam_matrix:
def world_to_cam_matrix(camera_name):
transform = camera_name.get_transform()
location = transform.location
rotation = transform.rotation
yaw = np.radians(rotation.yaw)
pitch = np.radians(rotation.pitch)
roll = np.radians(rotation.roll)
R_roll = np.array([
[1, 0, 0],
[0, np.cos(roll), -np.sin(roll)],
[0, np.sin(roll), np.cos(roll)]
])
R_pitch = np.array([
[np.cos(pitch), 0, np.sin(pitch)],
[0, 1, 0],
[-np.sin(pitch), 0, np.cos(pitch)]
])
R_yaw = np.array([
[np.cos(yaw), -np.sin(yaw), 0],
[np.sin(yaw), np.cos(yaw), 0],
[0, 0, 1]
])
R = R_yaw @ R_pitch @ R_roll
R_inv = R.T
T = np.array([
[1, 0, 0, -location.x],
[0, 1, 0, -location.y],
[0, 0, 1, -location.z],
[0, 0, 0, 1]
])
world_to_cam = np.eye(4)
world_to_cam[:3, :3] = R_inv
world_to_cam = world_to_cam @ T
A = np.array([
[0, 1, 0],
[0, 0, -1],
[1, 0, 0]
])
T_A = np.eye(4)
T_A[:3, :3] = A
world_to_cam_final = T_A @ world_to_cam
return world_to_cam_final
После получения матриц K и world_to_cam_matrix, 3D‑точки объединенного лидарного облака проецировал на плоскости камер с фильтрацией по глубине:
def project_lidar_to_image(lidar_points, world_to_cam, K, img_width, img_height):
N = lidar_points.shape[0]
ones = np.ones((N, 1))
# Добавление гомогенной координаты
lidar_points_hom = np.hstack((lidar_points, ones))
# Преобразование в систему координат камеры
points_cam_hom = (world_to_cam @ lidar_points_hom.T).T
# Извлечение координат X, Y, Z в системе камеры
X = points_cam_hom[:, 0]
Y = points_cam_hom[:, 1]
Z = points_cam_hom[:, 2]
# Фильтрация точек перед камерой
valid_idx = Z > 0
X_valid = X[valid_idx]
Y_valid = Y[valid_idx]
Z_valid = Z[valid_idx]
if len(X_valid) == 0:
return np.array([]), np.array([])
# Нормализация координат
x_norm = X_valid / Z_valid
y_norm = Y_valid / Z_valid
# Формирование нормированных точек
points_norm = np.vstack((x_norm, y_norm, np.ones_like(x_norm)))
# Применение матрицы K для получения пиксельных координат
pixels_hom = K @ points_norm
# Преобразование координат в пиксельные значения
u = pixels_hom[0, :]
v = pixels_hom[1, :]
# Фильтрация точек, попавших в изображение
mask = (u >= 0) & (u < img_width) & (v >= 0) & (v < img_height)
return np.vstack((u[mask], v[mask])).T, Z_valid[mask]
3.6 Запускаем сенсоры и синхронно пишем все подряд
После настройки симуляции и маршрута пришло время подключить сенсоры и начать собирать датасет:
command_thread = threading.Thread(target=record_commands, args=(agent,), daemon=True)
command_thread.start()
camera_front.listen(lambda image: process_camera_image(image, "camera_front"))
camera_left.listen(lambda image: process_camera_image(image, "camera_left"))
camera_right.listen(lambda image: process_camera_image(image, "camera_right"))
semantic_camera_front.listen(lambda image: process_semantic_image(image, "semantic_camera_front"))
semantic_camera_left.listen(lambda image: process_semantic_image(image, "semantic_camera_left"))
semantic_camera_right.listen(lambda image: process_semantic_image(image, "semantic_camera_right"))
lidars["lidar_roof"].listen(lambda data: process_lidar_data(data, "lidar_roof"))
lidars["lidar_left"].listen(lambda data: process_lidar_data(data, "lidar_left"))
lidars["lidar_right"].listen(lambda data: process_lidar_data(data, "lidar_right"))
lidars["lidar_front"].listen(lambda data: process_lidar_data(data, "lidar_front"))
Каждую итерацию мы делали:
Принудительное обновление мира с помощью
world.tick()
;Cнимали snapshot текущего состояния симуляции;
Извлекали frame_id и вызывали функцию
process_vehicle_commands
;Ждали 0.4 секунды (синхронизировано с sensor_tick);
Проверяли, завершил ли агент маршрут (
agent.done()
).
n = 80000
try:
for i in range(n):
world.tick()
snapshot = world.get_snapshot()
frame_id = snapshot.frame
process_vehicle_commands(agent)
time.sleep(0.4)
if agent.done():
print("done")
break
except KeyboardInterrupt:
print("stop")
В результате каждая строка в датасете — полностью синхронный кадр.
4. Проверяем, что наснимали: визуализация собранных данных
После завершения симуляции и сохранения всех данных, важным этапом является проверка и визуальный анализ записанных сцен.
4.1 RGB‑камеры: front, left, right
Для каждой сцены записали изображения с трёх камер: фронтальной, левой и правой.
4.2 Управляющие команды и состояние светофора
Для каждой сцены получили данные о текущей скорости и управляющих воздействиях (руль, газ, тормоз), а также информацию о состоянии ближайшего светофора:
df_commands = pd.read_csv(vehicle_commands_file)
frame_commands = df_commands[df_commands["frame_id"] == scene_numb]
df_traffic_light = pd.read_csv(traffic_light_file)
frame_traffic_light = df_traffic_light[df_traffic_light["frame_id"] == scene_numb]
Пример вывода:
4.3 Локальная траектория движения
Модель получает информацию о ближайших 20 точках маршрута в локальной системе координат. Визуализируем локальную траекторию: это помогает убедиться, что маршрут адекватный и соответствует текущей сцене:
Отображение локальной траектории движения относительно текущего положения транспортного средства, изображение автора
4.4 Семантические маски
Семантика помогает понять, как сцена видится моделью: где дорога, где пешеходы, а где разметка. Это особенно важно для последующей интерпретируемости поведения модели.
Номера классов в объективе левой семантической камеры:
Классы: [ 1 2 3 4 5 6 7 8 9 11 14 15 20 21 22 23 24 25]
Количество классов: 18
Номера классов в объективе фронтальной семантической камеры:
Классы: [ 1 2 3 4 5 6 7 8 9 11 14 15 20 21 22 23 24 25]
Количество классов: 18
Номера классов в объективе правой семантической камеры:
Классы: [ 1 2 3 4 5 6 7 8 9 11 13 14 15 18 20 21 22 23 24 25]
Количество классов: 20
4.5 Объединённое облако лидарных точек
Лидары, размещенные на различных участках кузова транспортного средства, формируют общее облако лидарных точек сцены
4.6 Визуализация проекций точек из объединённого лидарного облака на плоскости камер
Применив матрицу преобразования world_to_cam_matrix и матрицу внутренних параметров камеры K, мы спроецировали объединённое лидарное облако точек на плоскости фронтальной, левой и правой RGB‑камер. Это позволяет получить двумерные проекции 3D‑точек в виде координат изображения, что важно для совмещения визуальных и геометрических признаков сцены. Для более наглядного анализа это облако точек проецируем на изображения, полученные с фронтальной, левой и правой RGB‑камер:
Зачем все это делали?
Проверили отсутствие рассинхронизации сенсоров
Попробовали выявить баги до запуска обучения
Убедились, что проекция лидарного облака правильно наложилась на объекты в кадре
5. Готовим датасет и запускаем обучение
Прежде чем скормить данные модели, их нужно привести в порядок. Сырые логи из симуляции — это только начало.
5.1 Предобработка и чистка данных
Что происходило на этом этапе:
исключали сцены с длительной остановкой без активности;
фильтровали выбросы по скорости (> 30 км/ч);
проверяли файлы на их существование;
отсутствующие или поврежденные сцены отбрасывали;
изображения и маски приводили к единому размеру (224×224);
проекции лидаров преобразовывали в глубинные карты.
После предобработки сохранили датасет. Финальный датасет содержит 5616 строк(семплов)
Для удобства обучения реализовали кастомный класс CarlaDataset, в котором каждый элемент содержит:
входной тензор изображения: 15 каналов (3 RGB + 3 semantic +3 lidar project × 3 вида камер);
локальную траекторию: (10, 2) точек;
вектор состояния светофора: (3,);
вектор целевых управляющих воздействий: (4,) — скорость, угол поворота руля, ускорение, торможение.
class CarlaDataset(Dataset):
def __init__(self, data):
self.data = data
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
scene = self.data[idx]
#Входные изображения
# RGB: (3, 224, 224, 3) - (3, 3, 224, 224)
rgb = torch.from_numpy(np.stack(scene["rgb"])).permute(0, 3, 1, 2).float() #(3, 3, 224, 224)
rgb = rgb.reshape(-1, 224, 224) # (9, 224, 224)
# Lidar: (3, 224, 224)
lidar = torch.from_numpy(np.stack(scene["lidar"])).float() #(3, 224, 224)
# Semantic: (3, 224, 224)
semantic = torch.from_numpy(np.stack(scene["semantic"])).float() #(3, 224, 224)
# Объединяем 9 + 3 + 3 = 15 каналов
input_tensor = torch.cat([rgb, lidar, semantic], dim=0) #(15, 224, 224)
#Дополнительные признаки
trajectory_tensor = torch.from_numpy(scene["trajectory"]).float() #(10, 2)
lights = scene["lights"]
light_tensor = torch.tensor([
lights["green"],
lights["red"],
lights["yellow"]
], dtype=torch.float32) # (3,)
#Целевые переменные
target = scene["target"]
target_tensor = torch.tensor([
target["speed"],
target["steer"],
target["throttle"],
target["brake"]
], dtype=torch.float32) # (4,)
return {
"input": input_tensor, #(15, 224, 224)
"trajectory": trajectory_tensor, #(10, 2)
"traffic_lights": light_tensor, #(3,)
"target": target_tensor #(4,)
}
Пример одного элемента в батче:
{
"input": tensor(15, 224, 224),
"trajectory": tensor(10, 2),
"traffic_lights": tensor(3),
"target": tensor(4)
}
Перед началом обучения весь датасет разделили на train, val и test:
обучающая выборка (train) — 70% данных;
валидационная выборка (val) — 15% данных;
тестовая выборка (test) — 15% данных.
train_data, temp_data = train_test_split(dataset, test_size=0.3, random_state=42)
val_data, test_data = train_test_split(temp_data, test_size=0.5, random_state=42)
train_dataset = CarlaDataset(train_data)
val_dataset = CarlaDataset(val_data)
test_dataset = CarlaDataset(test_data)
5.2 Архитектура модели управления
Для предсказания скорости и управляющих воздействий транспортного средства реализовали кастомную нейросеть CarlaControlModel. Архитектура модели отражает модульный подход к обработке различных входных данных:
Визуальный модуль (CNN): принимает изображение с мультисенсорным восприятием — RGB, сегментация и лидар, объединённые в один тензор размером (15, 224, 224);
Траектория движения: обрабатывается отдельным полносвязным блоком из 20 координат;
Состояние светофора: кодируется вектором длины 3 и также проходит через отдельную ветку;
Объединение признаков: все полученные векторы объединяются и передаются в финальный head‑блок, который выдаёт 4 выходных значения — управляющие воздействия.
class CarlaControlModel(nn.Module):
def __init__(self):
super().__init__()
#Блок CNN для обработки входного изображения (15 каналов)
self.conv = nn.Sequential(
nn.Conv2d(15, 32, kernel_size=5, stride=2, padding=2),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(),
nn.AdaptiveAvgPool2d((1, 1))
)
self.fc_image = nn.Linear(128, 64)
#Блок для локальной траектории
self.fc_traj = nn.Sequential(
nn.Linear(20 * 2, 32),
nn.ReLU(),
nn.Linear(32, 16)
)
#Блок для сигналов светофора
self.fc_lights = nn.Sequential(
nn.Linear(3, 8),
nn.ReLU()
)
#Финальный блок для предсказания управляющих команд
self.head = nn.Sequential(
nn.Linear(64 + 16 + 8, 64),
nn.ReLU(),
nn.Linear(64, 4)
)
def forward(self, input_img, trajectory, traffic_lights):
x = self.conv(input_img)
x = x.view(x.size(0), -1)
x = self.fc_image(x)
traj = self.fc_traj(trajectory.view(trajectory.size(0), -1))
lights = self.fc_lights(traffic_lights)
combined = torch.cat([x, traj, lights], dim=1)
out = self.head(combined)
return out
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CarlaControlModel().to(device)
BATCH_SIZE = 64
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=True)
5.3 Обучающий цикл
Для обучения модели использовали стандартный пайплайн:
Функция потерь:
nn.MSELoss()
— так как задача регрессионная;Оптимизатор: Adam с learning rate = 1e-4;
Количество эпох: 120;
Размер батча: 64.
На каждой эпохе вычисляли средние значения ошибки по обучающей и валидационной выборке. Если модель демонстрировала улучшение на валидации — сохраняли ее как лучшую текущую версию:
EPOCHS = 120
best_val_loss = float('inf')
train_losses = []
val_losses = []
for epoch in range(EPOCHS):
print(f"\nEpoch {epoch+1}/{EPOCHS}")
start_time = time.time()
model.train()
train_loss = 0.0
for batch in tqdm(train_loader, desc="Training"):
inputs = batch["input"].to(device)
traj = batch["trajectory"].to(device)
lights = batch["traffic_lights"].to(device)
targets = batch["target"].to(device)
optimizer.zero_grad()
outputs = model(inputs, traj, lights)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
train_loss += loss.item()
avg_train_loss = train_loss / len(train_loader)
train_losses.append(avg_train_loss)
model.eval()
val_loss = 0.0
with torch.no_grad():
for batch in tqdm(val_loader, desc="Validation"):
inputs = batch["input"].to(device)
traj = batch["trajectory"].to(device)
lights = batch["traffic_lights"].to(device)
targets = batch["target"].to(device)
outputs = model(inputs, traj, lights)
loss = criterion(outputs, targets)
val_loss += loss.item()
avg_val_loss = val_loss / len(val_loader)
val_losses.append(avg_val_loss)
elapsed = time.time() - start_time
print(f"Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f} | Time: {elapsed:.2f}s")
if avg_val_loss < best_val_loss:
best_val_loss = avg_val_loss
torch.save(model.state_dict(), "best_model.pth")
5.4 Какие результаты получили
По результатам обучения наблюдаем стабильное и уверенное снижение ошибки на обучающей выборке, что свидетельствует о том, что модель эффективно усваивала представленные данные.
На валидационной выборке ошибка сначала плавно снижалась, однако после определенного числа эпох (примерно после 60) начала незначительно расти. Это указывает на то, что модель перестала находить новые закономерности и начала проявлять признаки переобучения — начинала адаптироваться к обучающим данным в ущерб обобщающей способности.
5.5 Оценка модели на тестовой выборке
После завершения обучения протестировали модель на ранее не использовавшихся данных из тестовой выборки. Для оценки использовали среднеквадратичная ошибка (MSE) и средняя абсолютная ошибка (MAE) по каждому из четырёх выходов: скорость, угол поворота руля, ускорение, торможение.
test_loss = 0.0
speed_loss = 0.0
steer_loss = 0.0
throttle_loss = 0.0
brake_loss = 0.0
with torch.no_grad():
for batch in tqdm(test_loader, desc="Testing"):
inputs = batch["input"].to(device)
traj = batch["trajectory"].to(device)
lights = batch["traffic_lights"].to(device)
targets = batch["target"].to(device)
outputs = model(inputs, traj, lights)
loss = criterion(outputs, targets)
test_loss += loss.item()
#Unpacking the loss for each target variable
diff = torch.abs(outputs - targets)
speed_loss += diff[:, 0].mean().item()
steer_loss += diff[:, 1].mean().item()
throttle_loss += diff[:, 2].mean().item()
brake_loss += diff[:, 3].mean().item()
n_batches = len(test_loader)
Test Loss(MSE): 6.5944
Speed MAE: 3.8075
Steer MAE: 0.0814
Throttle MAE: 0.2914
Brake MAE: 0.1477
Заключение
В рамках этого проекта я рассказал, как собрать полный пайплайн для обучения модели управления автономным транспортом: от генерации маршрутов и мультисенсорной записи до подготовки обучающего датасета и запуска обучающего цикла. Упор сделан на мультисенсорное восприятие (RGB, семантика, лидары) и локальную траекторию как источник контекста — это заметно повышает устойчивость модели в сложных дорожных условиях.
Модель демонстрирует низкую ошибку по управлению рулём, что важно для стабильного движения;
Ошибки по ускорению и торможению остаются в пределах разумного допуска;
Ошибка по предсказанию скорости выше остальных — это ожидаемо, так как скорость зависит не только от текущего кадра, но и от динамики в прошлом (которое не учитывается при покадровом обучении).
В целом, модель показала хорошую обобщающую способность, а результаты на тестовой выборке подтверждают её применимость в задачах управления автономным транспортом.
Полный код проекта, включая подготовку данных, архитектуру модели и процесс обучения, доступен в репозитории.
Все актуальные методы и инструменты DS и ML можно освоить на онлайн-курсах OTUS: в каталоге можно посмотреть список всех программ, а в календаре — записаться на открытые уроки.