Привет, Хабр! Сегодня хочу поделиться интересным проектом, который мы сделали для конкурса. Задача — превратить сырые GPX-треки (треки с GPS-устройств) в структурированные данные с визуализацией, метеорологической и географической аналитикой. Всё это — на Python, с использованием открытых API и библиотек для работы с геоданными.
Что делает проект?
Код представляет собой пайплайн обработки GPS-треков, который:
Скачивает GPX-файлы по ссылкам
Визуализирует треки на карте
Извлекает данные о каждой точке (координаты, время, высота)
Добавляет погодные данные (температуру) в момент записи трека
Определяет регион и тип местности
Рассчитывает частоту шагов (для пеших походов)
Аугментирует изображения треков для расширения датасета
Визуализирует распределения данных
Загрузка и парсинг GPX
import requests import pandas as pd import os def download_gpx(links): """ Скачивает GPX-файлы по списку ссылок """ for num, url in enumerate(links): try: response = requests.get(url) filename = f"track{num}.gpx" with open(f"data/gpx/{filename}", mode="wb") as f: f.write(response.content) except Exception as e: print("Ошибка при скачивании") # Инициализация DataFrame для хранения всех точек df = pd.DataFrame(columns=["track_id", "track_time", "latitude", "longitude", "altitude"])
Визуализация треков на картах
import gpxpy import geopandas as gpd import contextily as ctx from shapely.geometry import box, LineString import matplotlib.pyplot as plt def gpx_to_png(df): """ Конвертирует GPX-файлы в изображения карт с треками """ margin = 0.02 img_path = "data/image" os.makedirs(img_path, exist_ok=True) for i in os.listdir("data/gpx"): png_name = f"{i[:-4]}.png" with open(f"data/gpx/{i}", encoding="UTF-8") as f: gpx = gpxpy.parse(f) lats, lons = [], [] for track in gpx.tracks: for segment in track.segments: for point in segment.points: lats.append(point.latitude) lons.append(point.longitude) df.loc[len(df)] = [i, point.time, point.latitude, point.longitude, point.elevation] # Создаем bounding box с отступами bbox = box( min(lons) - margin, min(lats) - margin, max(lons) + margin, max(lats) + margin ) track_line = LineString(zip(lons, lats)) # Конвертируем в Web Mercator для отображения gdf_bbox = gpd.GeoDataFrame(geometry=[bbox], crs="EPSG:4326") gdf_bbox_web = gdf_bbox.to_crs(epsg=3857) gdf_track = gpd.GeoDataFrame(geometry=[track_line], crs="EPSG:4326") gdf_track_web = gdf_track.to_crs(epsg=3857) # Создаем карту fig, ax = plt.subplots(figsize=(10, 8)) gdf_bbox_web.plot(ax=ax, alpha=0) gdf_track_web.plot(ax=ax, color="red", linewidth=2) ctx.add_basemap(ax, crs=gdf_bbox_web.crs, source=ctx.providers.OpenStreetMap.Mapnik) ax.set_axis_off() plt.savefig(f"{img_path}/{png_name}", dpi=150, bbox_inches="tight", pad_inches=0) plt.close(fig) return df
Получение исторических погодных данных
def temp(lat, lon, date): """ Получает температуру для заданных координат и даты """ url = "https://archive-api.open-meteo.com/v1/archive" params = { 'latitude': lat, # Широта 'longitude': lon, # Долгота 'start_date': date, 'end_date': date, 'hourly': 'temperature_2m', "timezone":"auto" } headers = {'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0'} response = requests.get(url, params=params, headers=headers) response.raise_for_status() if response.status_code == 200: data = response.json() return data["hourly"]['temperature_2m'][12] # Температура в полдень else: print(f"Данные не получены") def analysis_weather(df): ''' Функция для интерполяции температур между опорными точками. Использует 5 опорных точек для интерполяции температуры для всех строк. ''' n = len(df) key_indexes = [0, n//4, n//2, 3*n//4, n-1] temperatures_at_key_points = {} for idx in key_indexes: lat = df.iloc[idx]["latitude"] lon = df.iloc[idx]["longitude"] date = df.iloc[idx]["track_time"] temp_value = temp(lat, lon, date) temperatures_at_key_points[idx] = temp_value if len(temperatures_at_key_points) < 2: print("Недостаточно данных для интерполяции, проверьте работоспособность API") df["temperature"] = None return df all_temperatures = [] left_idx = 0 right_idx = 1 for i in range(n): if (right_idx < len(key_indexes) - 1 and i >= key_indexes[right_idx]): left_idx += 1 right_idx += 1 left_key = key_indexes[left_idx] right_key = key_indexes[right_idx] left_temp = temperatures_at_key_points[left_key] right_temp = temperatures_at_key_points[right_key] if i in temperatures_at_key_points: temperature = temperatures_at_key_points[i] elif left_temp is not None and right_temp is not None: temperature = left_temp + (right_temp - left_temp) * (i - left_key) / (right_key - left_key) else: temperature = left_temp if left_temp is not None else right_temp all_temperatures.append(temperature) df = df.copy() df["temperature"] = all_temperatures return df def get_temp(df): """ Обрабатывает температуру для всех треков """ try: df_temp = pd.DataFrame() for i in range(0, 3): track_data = df[df["track_id"] == f"track{i}.gpx"] track_data_weather = analysis_weather(track_data) df_temp = pd.concat([df_temp, track_data_weather]) print(f"track{i} добавлен") df = df_temp.copy() return df except Exception as e: print(f"Ошибка вызова функции: analysis_weather {e}")
Географический анализ: регион и тип местности
import time def extract_map_region(lat: float, lon: float): """ Определяет регион по координатам через Nominatim API """ try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/536.36 (KHTML, like Gecko) Chrome/58.0.3029.100 Safari/536.3'} response = requests.get(f"https://nominatim.openstreetmap.org/reverse?lat={lat}&lon={lon}&format=json", headers=headers) json = response.json() time.sleep(1.5) # Уважаем лимиты API if "county" in json["address"]: return json["address"]["county"] if "state" in json["address"]: return json["address"]["state"] return json["address"]["country"] except Exception as e: print(f"Error {e}") def analysis_region(df): ''' Функция для определения регионов между опорными точками. ''' try: lat = df.iloc[0]["latitude"] lon = df.iloc[0]["longitude"] df = df.copy() df["region"] = extract_map_region(lat, lon) return df except Exception as e: print(f"Ошибка вызова функции extract_map_region {e}") def terrain_type(df): """ Определяет тип местности и ключевые объекты через Overpass API """ overpass_endpoints = [ "https://overpass-api.de/api/interpreter", "https://overpass.kumi.systems/api/interpreter", "https://overpass.openstreetmap.ru/cgi/interpreter" ] try: points = list(zip(df["latitude"], df["longitude"])) rep_points = [points[0], points[len(points)//2], points[-1]] # 3 опорные точки all_landuse, all_natural, all_key_objects = [], [], set() for lat, lon in rep_points: overpass_query = f""" [out:json][timeout:45]; ( way(around:500,{lat},{lon})["landuse"]; way(around:500,{lat},{lon})["natural"]; way(around:500,{lat},{lon})["leisure"]; way(around:500,{lat},{lon})["waterway"="river"]; way(around:500,{lat},{lon})["waterway"="stream"]; way(around:500,{lat},{lon})["natural"="water"]; node(around:500,{lat},{lon})["place"="city"]; node(around:500,{lat},{lon})["place"="town"]; node(around:500,{lat},{lon})["place"="village"]; node(around:500,{lat},{lon})["natural"="peak"]; node(around:500,{lat},{lon})["natural"="mountain"]; ); out tags center; """ for endpoint in overpass_endpoints: response = requests.get(endpoint, params={'data': overpass_query}, timeout=60) if response.status_code == 200: data = response.json() break for element in data.get('elements', []): tags = element.get('tags', {}) if 'landuse' in tags: all_landuse.append(tags['landuse']) elif 'natural' in tags: all_natural.append(tags['natural']) if tags['natural'] in ['peak', 'mountain'] and 'name' in tags: all_key_objects.add(f"Mountain: {tags['name']}") if 'waterway' in tags and tags['waterway'] in ['river', 'stream'] and 'name' in tags: all_key_objects.add(f"River: {tags['name']}") if 'place' in tags and tags['place'] in ['city', 'town', 'village'] and 'name' in tags: all_key_objects.add(f"Settlement: {tags['name']} ({tags['place']})") if element.get('type') == 'way' and 'natural' in tags and tags['natural'] == 'water' and 'name' in tags: all_key_objects.add(f"Lake: {tags['name']}") time.sleep(1.5) terrain_type = "unknown" if all_landuse: terrain_type = max(set(all_landuse), key=all_landuse.count) elif all_natural: terrain_type = max(set(all_natural), key=all_natural.count) key_objects_str = "; ".join(sorted(all_key_objects)) if all_key_objects else None df["terrain_type"] = terrain_type df["key_objects_str"] = key_objects_str except Exception as e: print("Ошибка", e) return df
Расчет физических параметров
from geopy.distance import geodesic def step_frequency(df): """ Рассчитывает частоту шагов на основе расстояния между точками """ points = list(zip(df["latitude"], df["longitude"])) step = [0] for p1, p2 in zip(points, points[1:]): dist = geodesic(p1, p2).meters step.append(dist / 0.75) # Предполагаем среднюю длину шага 0.75 метра df["steps"] = step return df
Аугментация изображений
from PIL import Image, ImageEnhance from random import randint, uniform def data_augmentation(): """ Создает аугментированные версии изображений треков """ images_path = "data/image" os.makedirs(images_path, exist_ok=True) for filename in os.listdir(images_path): if filename.lower().endswith(".png") and not any(word in filename for word in ["rotated", "contrasted", "brightness"]): img_path = os.path.join(images_path, filename) img = Image.open(img_path) base_name = os.path.splitext(filename)[0] # Поворот rotated_img = img.rotate(randint(10, 60)) rotated_img.save(os.path.join(images_path, f"{base_name}_rotated.png")) # Изменение контраста contrasted_img = ImageEnhance.Contrast(img).enhance(randint(2, 4)) contrasted_img.save(os.path.join(images_path, f"{base_name}_contrasted.png")) # Изменение яркости brightness_img = ImageEnhance.Brightness(img).enhance(uniform(1.2, 1.6)) brightness_img.save(os.path.join(images_path, f"{base_name}_brightness.png"))
Визуализация распределений
import seaborn as sns def norm_or_not(df): """ Визуализирует распределения всех числовых колонок """ n = len(df.columns) // 5 + bool(len(df.columns) % 5) _, axes = plt.subplots(nrows=n, ncols=5, figsize=(15,20)) for idx, i in enumerate(df.columns): sns.kdeplot(data=df, x=i, common_norm=False, ax=axes[idx // 5][idx%5]) plt.title(i) plt.show()
Полный пайплайн обработки
# Пример использования всего пайплайна def main(): # 1. Скачиваем треки links = [ "https://example.com/track1.gpx", "https://example.com/track2.gpx", "https://example.com/track3.gpx" ] download_gpx(links) # 2. Конвертируем в изображения и DataFrame df = gpx_to_png(df) # 3. Форматируем даты df['track_time'] = df['track_time'].dt.strftime('%Y-%m-%d') # 4. Получаем погодные данные df = get_temp(df) # 5. Определяем регион df = analysis_region(df) # 6. Рассчитываем частоту шагов df = step_frequency(df) # 7. Определяем тип местности df = terrain_type(df) # 8. Визуализируем распределения norm_or_not(df) # 9. Аугментируем изображения data_augmentation() print("Обработка завершена!") print(f"Обработано {len(df)} точек") print(f"Сохранено треков: {len(os.listdir('data/gpx'))}") return df if __name__ == "__main__": result_df = main()
Заключение
Этот проект демонстрирует, как можно превратить сырые GPS-данные в богатый источник аналитики. Комбинация географического, метеорологического и статистического анализа позволяет извлечь максимум информации из, казалось бы, простых треков.
Код модульный и легко расширяемый — можно добавлять новые источники данных, метрики и виды анализа. И все это — на открытых данных и бесплатных API.
Требования: Python 3.8+, установка зависимостей:
Важно: Все используемые API бесплатны для некоммерческого использования, но имеют лимиты запросов.
