
Я хотел, чтобы у меня на кухне был дисплей, показывающий время прибытия поездов метро, и мне не приходилось сверяться с телефоном, пока собираюсь к выходу из дома. Эта статья рассказывает о создании такого дисплея.
График поездов метро Нью-Йорка можно посмотреть н�� mta-trmnl.pages.dev. Исходный код моего проекта выложен на GitHub: фронтенд, бэкенд.
▍ Я безголовый
Просыпаясь утром, я каждый раз смотрю погоду и график метро, а затем иду в душ и сразу забываю эту информацию. Выхожу из душа, проверяю её снова, но пока одеваюсь, снова всё забываю. Ищу телефон, разблокирую его, обновляю виджет погоды и виджет метро так часто, что телефон отключает Face ID и заставляет меня ввести пароль. Ко времени, когда я надеваю обувь, я уже опаздываю, а когда спускаюсь по лестнице, до следующего поезда остаётся десять минут. Это очень раздражает.
Мне показалось, что идеально было бы, чтобы эта информация была видна всегда, находилась в одном и том же месте (посередине квартиры), чтобы на разблокировку и забывание тратилось меньше времени и внимания.
Я купил TRMNL — интересный настраиваемый дисплей на электронных чернилах. В нём уже был виджет погоды, но не было виджета MTA.
Примечание: пока я работал над этим проектом, кто-то уже написал виджет MTA, но этот пост посвящён мне.
В TRMNL есть виджет, позволяющий делать скриншот веб-сайта и отображать его на планшете. Я попробовал указать в нём wheresthefuckingtrain.com, но это не сработало. WTFT получает данные о поездах после загрузки самой страницы, а рендерер TRMNL возвращал скриншот состояния страницы до выполнения асинхронных запросов.
Поэтому я решил написать собственный веб-сайт с графиком MTA (наверно, в этом не�� ничего сложного?). Недавно из видео на YouTube я узнал о Durable Objects Cloudflare, и с радостью воспользовался возможностью поэкспериментировать с ними.
Если вкратце, то Durable Object — это особый тип serverless-функции, к которой прикреплено постоянное хранилище. Также у него есть будильники, то есть таймеры, полезные для выполнения периодических задач.
▍ Статичные графики
MTA публикует графики движения метро по большей мере в виде совместимых с GTFS данных. GTFS (General Transit Feed Specification) разбивает данные на статичный график и фид реального времени.
Статичный график — это файл ZIP, в котором содержится несколько файлов CSV со списками запланированных маршрутов, временем остановки, метаданными маршрутов и так далее.
❯ wget https://rrgtfsfeeds.s3.amazonaws.com/gtfs_supplemented.zip ❯ unzip gtfs_supplemented.zip ❯ ls -lh total 155M -rw-r--r-- 1 ibiyemi ibiyemi 162 Feb 21 19:25 agency.txt -rw-r--r-- 1 ibiyemi ibiyemi 4.8K Feb 21 19:25 calendar.txt -rw-r--r-- 1 ibiyemi ibiyemi 17K Feb 21 19:25 calendar_dates.txt -rw-r--r-- 1 ibiyemi ibiyemi 18M Feb 21 19:25 gtfs_supplemented.zip -rw-r--r-- 1 ibiyemi ibiyemi 12K Feb 21 19:25 routes.txt -rw-r--r-- 1 ibiyemi ibiyemi 5.6M Feb 21 19:25 shapes.txt -rw-r--r-- 1 ibiyemi ibiyemi 126M Feb 21 19:25 stop_times.txt -rw-r--r-- 1 ibiyemi ibiyemi 63K Feb 21 19:25 stops.txt -rw-r--r-- 1 ibiyemi ibiyemi 8.4K Feb 21 19:25 transfers.txt -rw-r--r-- 1 ibiyemi ibiyemi 5.4M Feb 21 19:25 trips.txt
Один из этих файлов гораздо больше остальных. Уверен, это не вызовет никаких проблем.
Я создал сайт Astro, который скачивает файл ZIP, распаковывает его, создаёт несколько
Map на основании данных в таблицах, а затем рендерит страницу.import { parse } from "csv-parse/sync"; import { Temporal } from "temporal-polyfill"; import yauzl from "yauzl"; // полные определения типов см. на GitHub // https://github.com/laptou/mta-trmnl/blob/b2f547e89ace6fab34cd4db0e554ecf5354c5998/src/util/mta/index.ts#L13 export interface MtaState { calendar: CalendarEntry[]; calendarDates: CalendarDateEntry[]; routes: Route[]; stopTimes: StopTime[]; stops: Stop[]; transfers: Transfer[]; lastUpdated: Date; // Извлечённые данные stations: Map<string, Station>; lineToStations: Map<string, Set<string>>; } async function readZipEntries( zip: yauzl.ZipFile, ): Promise<Map<string, string>> { // вспомогательная функция, которая возвращает map сопоставления // имён файлов элементов zip с содержимым файлов } export const MTA_SUPPLEMENTED_GTFS_STATIC_URL = "https://rrgtfsfeeds.s3.amazonaws.com/gtfs_supplemented.zip"; export async function loadMtaBaselineState(zipPath: string): Promise<MtaState> { const gtfsData = await fetch(zipPath).then((res) => res.arrayBuffer()); // zipFromBuffer — это вспомогательная функция, пропущенная ради краткости const gtfsArchive = await zipFromBuffer(Buffer.from(gtfsData), { lazyEntries: true, }); const entries = await readZipEntries(gtfsArchive); // [парсим все айлы CSV в массивы объектов] const stations = new Map<string, Station>(); const lineToStations = new Map<string, Set<string>>(); for (const stop of stops) { if (!stop.parent_station) { // преобразуем остановки в станции и добавляем к map // станции — это остановки верхнего уровня; // также в них содержится информация, какая линия их обслуживает } } for (const route of routes) { // создаём map маршрутов и станций, // чтобы можно было эффективно получать все станции для выбранного маршрута } return { calendar, calendarDates, routes, stopTimes, stops, transfers, lastUpdated: new Date(), stations, lineToStations, }; } export function getAllLines(state: MtaState): Route[] { return state.routes; } export function getStationsForLine(state: MtaState, lineId: string): Station[] { const stationIds = state.lineToStations.get(lineId) || new Set(); return Array.from(stationIds) .map((id) => state.stations.get(id)) .filter((station): station is Station => station !== undefined); } export function getUpcomingArrivals( state: MtaState, stationId: string, limit = 10, ): TrainArrival[] { // ... }
Всё получилось, но работало невероятно медленно даже для этапа разработки. Astro не рассчитан на хранение состояния, то есть невозможно было один раз загрузить графики движения в память, а потом запрашивать данные в последующих запросах. Когда я делал запрос, он загружал всё с самого начала, что занимало целую минуту.
❯ bun dev $ astro dev astro v5.3.0 ready in 157 ms ┃ Local http://localhost:4321/ ┃ Network use --host to expose 20:20:12 watching for file changes... 20:21:21 [200] / 69834ms 20:22:30 [200] / 60981ms
И здесь нам пригодятся Durable Object — их можно использовать для кэширования данных!
Я попытался определить Durable Object в той же кодовой базе, что и сайт Astro, но это п�� какой-то причине не сработало, поэтому я создал отдельный проект. Мой сайт Astro может запрашивать Durable Object (DO) по HTTP, когда ему нужно что-то отрендерить, а DO может выполнять чтение из кэша, обновляя графики примерно раз в час.
DO могут иметь или хранилище ключей и значений, или хранилище SQLite. В моём случае идеально подходит SQLite, потому что я имею дело с набором реляционных данных, поэтому я выбрал его. Я настроил Durable Object очень похожим образом, только помещал данные не в
Map, а записывал в строки базы данных SQLite моего DO.export class MtaStateObject extends DurableObject { sql: SqlStorage; constructor(state: DurableObjectState, env: Env) { super(state, env); // Получаем интерфейс хранилища SQL. this.sql = state.storage.sql; state.blockConcurrencyWhile(async () => { await this.initializeDatabase(); await this.loadGtfsStatic(); }); } private async initializeDatabase() { this.sql.exec(` CREATE TABLE IF NOT EXISTS calendar ( service_id TEXT PRIMARY KEY, monday INTEGER, tuesday INTEGER, wednesday INTEGER, thursday INTEGER, friday INTEGER, saturday INTEGER, sunday INTEGER, start_date TEXT, end_date TEXT ); # здесь будут операторы CREATE TABLE для других таблиц CREATE INDEX IF NOT EXISTS idx_stop_times_stop_id ON stop_times(stop_id); CREATE INDEX IF NOT EXISTS idx_trips_route_id ON trips(route_id); CREATE INDEX IF NOT EXISTS idx_stops_parent_station ON stops(parent_station); CREATE TABLE IF NOT EXISTS metadata ( key TEXT PRIMARY KEY, value TEXT ); `); } async fetch(request: Request): Promise<Response> { const url = new URL(request.url); let response: Response; switch (url.pathname) { case "/lines": response = await this.handleGetAllLines(); break; case "/stations": response = await this.handleGetStationsForLine(url.searchParams); break; case "/station": response = await this.handleGetStation(url.searchParams); break; case "/arrivals": response = await this.handleGetUpcomingArrivals(url.searchParams); break; default: response = new Response("Not found", { status: 404 }); } return response; } /** * Загружаем данные из файла zip GTFS в SQL. */ private async shouldUpdateGtfs(): Promise<boolean> { // ищем в таблице метаданных ключ last_gtfs_update, // если прошло больше часа, то возвращаем true } async loadGtfsStatic() { if (!(await this.shouldUpdateGtfs())) { console.log("not updating static gtfs"); return; } console.log("updating static gtfs"); try { const gtfsResponse = await fetch(MTA_SUPPLEMENTED_GTFS_STATIC_URL); const buf = Buffer.from(await gtfsResponse.arrayBuffer()); const gtfsArchive = await zipFromBuffer(buf, { lazyEntries: true }); // processZipEntries изменён, чтобы он возвращал // map потоков, а не map текста const entries = processZipEntries(gtfsArchive); for await (const [fileName, stream] of entries) { const parser = parse({ columns: true, skip_empty_lines: true, }); stream.pipe(parser); console.log(`processing ${fileName}`); switch (fileName) { case "calendar.txt": for await (const entry of parser) { this.sql.exec( `INSERT OR REPLACE INTO calendar (service_id, monday, tuesday, wednesday, thursday, friday, saturday, sunday, start_date, end_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, entry.service_id, Number.parseInt(entry.monday, 10), Number.parseInt(entry.tuesday, 10), Number.parseInt(entry.wednesday, 10), Number.parseInt(entry.thursday, 10), Number.parseInt(entry.friday, 10), Number.parseInt(entry.saturday, 10), Number.parseInt(entry.sunday, 10), entry.start_date, entry.end_date, ); } break; // сюда вставляем операторы для других таблиц } } // Обновляем метку времени последнего обновления const now = Temporal.Now.instant(); this.sql.exec( "INSERT OR REPLACE INTO metadata (key, value) VALUES (?, ?)", "last_gtfs_update", now.toString(), ); } catch (err) { console.error(err); throw err instanceof Error ? err : new Error(String(err)); } } /** * Возвращаем все линии (маршруты). */ async handleGetAllLines(): Promise<Response> { // ... } /** * Возвращаем станции для указанного id линии. * Используем таблицу поездок, чтобы найти поездок для маршрута, а затем все остановки. */ async handleGetStationsForLine(params: URLSearchParams): Promise<Response> { // ... } /** * Возвращает информацию о станции по указанному stationId. */ async handleGetStation(params: URLSearchParams): Promise<Response> { // ... } /** * Возвращает ближайшие прибытия для указанной станции. * Требует stationId, опциональное направление (север или юг), * и опциональный максимум (по умолчанию 10). */ async handleGetUpcomingArrivals(params: URLSearchParams): Promise<Response> { // ... } /** * Проверяет, активен ли сегодня указанный сервис (по service_id). * Преобразует сохранённые строки (YYYYMMDD) в Temporal.PlainDate для сравнений. */ async isServiceActiveToday(serviceId: string): Promise<boolean> { // ... } }
Затем я обновил свой сайт Astro, чтобы он начал получать данные из DO, и это сработало. Всё по-прежнему было медленным при первом запросе (так как DO заполняет базу данных), но примерно в течение часа все последующие запросы выполнялись быстро. Миссия выполнена? Я загрузил свой DO в Cloudflare, чтобы протестировать его.

Беда.
Что ж, давайте разбираться, как снизить время CPU. Что отнимает так много времени? Я добавил к инициализатору базы данных
console.log().
Понятно. Очевидно, таблица
stop_times (в которой хранится список всех остановок за день для каждого поезда на каждом типе маршрута) содержит 2,2 миллиона строк.Ну, SQLite должна быть очень быстрой, так что виновато в этом, вероятно, чтение строк из файла ZIP. Давайте посмотрим, сможем ли мы ускорить распаковку.
Я попробовал заменить
yauzl на fflate, но безуспешно. Что, если использовать WebAssembly? Я создал простой проект wasm-pack и добавил в него zip и csv.#[wasm_bindgen] pub fn unpack_csv_archive( data: Uint8Array, row_chunk_len: usize, callback: Function, ) -> Result<(), wasm_bindgen::JsError> { // вызываем unpack_csv_archive_inner после преобразования аргументов Ok(()) } pub fn unpack_csv_archive_inner< F: FnMut(&str, &js_sys::Array, &js_sys::Array) -> anyhow::Result<()>, >( buf: &[u8], row_chunk_len: usize, mut callback: F, ) -> anyhow::Result<()> { let mut ar = zip::ZipArchive::new(Cursor::new(buf)).context("could not create zip archive")?; for i in 0..ar.len() { let file = ar .by_index(i) .with_context(|| format!("could not open file at index {i}"))?; let file_name = file.name().to_owned(); let mut reader = csv::ReaderBuilder::new().from_reader(file); let headers = reader.headers()?; let headers_arr: js_sys::Array = headers.iter().map(JsString::from_str).try_collect()?; let mut chunk = vec![]; for record in reader.into_records() { let record = record?; let record_arr: js_sys::Array = record.iter().map(JsString::from_str).try_collect()?; chunk.push(record_arr); if chunk.len() >= row_chunk_len { // наш обратный вызов написан на JS // он содержит цикл for..of loop, который // вставляет все только что полученные строки callback(&file_name, &headers_arr, &js_sys::Array::from_iter(chunk))?; chunk = vec![]; } } if chunk.len() > 0 { callback(&file_name, &headers_arr, &js_sys::Array::from_iter(chunk))?; } } Ok(()) }
Получилось быстрее, но ненамного.

Похоже, пора перестать гадать и заняться профилированием.

Благодаря профилированию выяснилось, что основная часть времени выполнения тратится на
js_sys::function::apply (это наш обратный вызов exec() SQL) и на реализацию Deserializecsv. Как ускорить вставки SQLite? Нужно использовать транзакции.В текущем виде код предназначен для вставки блоков по 50000 строк за раз. Я обёртываю каждый блок в вызов
ctx.storage.transaction() и выполняю его заново.Предыдущая локальная реализация требовала для заполнения базы данных примерно 23 секунд при работе локального Wrangler. После добавления транзакций база данных заполняется, а воркер возвращает ответ за 4 секунды!

Я развернул новую версию в Cloudflare, и всё заработало!
Теперь у нас есть Durable Object, который, по сути, является работающим по запросу индексированной репликой для чтения текущего графика MTA. Класс.
▍ Обновления в реальном времени
Проект был бы неполным без обновлений в реальном времени. В среднем поезда приходят вовремя лишь в 80% случаев. Тем, кому не повезло жить рядом с поездом F, приходится пользоваться поездом, в декабре приходившим вовремя в 64% случаев.
То есть веб-сайт, просто показывающий график без текущей информации, бесполезен и даже вреден, потому что из-за него вы можете прождать на станции полчаса, хотя могли бы просто взять Citi Bike.
Фиды реального времени GTFS передаются в формате Protobuf. Из-за названия «real-time» я сначала думал, что они потоково передают ответы по долго открытому сокет-соединению, но на самом деле это всего лишь HTTP-опросы.
Фиды реального времени достаточно просты: в них содержится список сообщений о текущем состоянии системы. В него включены и сообщения обновлённого времени прибытия и отправления для определённого маршрута на определённой остановке. После парсинга фидов реального времени я просто перезаписываю время прибытия и отправления из статической таблицы обновлёнными данными.
type RealtimeStatusGroup = | "ACE" | "BDFM" | "G" | "JZ" | "NQRW" | "L" | "1234567" | "SIR"; async getRealtimeStatus(group: RealtimeStatusGroup): Promise<FeedMessage> { let endpoint: string; switch (group) { case "ACE": endpoint = "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-ace"; break; // По неизвестным мне причинам MTA разбивает // обновления реального времени на семь отдельных фидов } const response = await fetch(new Request(endpoint)); // FeedMessage — это класс, который я сгенерировал из заголовка protobuf // GTFS реального времени при помощи @protobuf-ts/plugin return FeedMessage.fromBinary(await response.bytes()); } async updateRealtimeStatus(group: RealtimeStatusGroup) { const status = await this.getRealtimeStatus(group); const activeServices = await this.getActiveServiceIds(); await this.ctx.storage.transaction(async () => { for (const msg of status.entity) { if (msg.tripUpdate) { if (!msg.tripUpdate.trip) { console.warn("got trip update without trip", msg); continue; } // MTA обозначает поездки в фиде реального времени суффиксом ID поездки, // например "082500_A..N54R" const tripIdSuffix = msg.tripUpdate.trip.tripId; const tripIds = this.sql .exec<{ trip_id: string; service_id: string }>( "SELECT trip_id, service_id FROM trips WHERE trip_id LIKE $1", `%${tripIdSuffix}`, ) .toArray() .filter((r) => activeServices.includes(r.service_id)) .map((r) => r.trip_id); const zeroTime = Temporal.PlainTime.from("00:00:00"); for (const stopTimeUpdate of msg.tripUpdate!.stopTimeUpdate) { if (!stopTimeUpdate.stopId) continue; // Здесь код довольно некрасивый — Temporal API очень многословный, // особенно при взаимодействии с системами, которые его не используют, // зато он сильно упрощает сериализацию и арифметику с датами, // временем, datetime и интервалами const newArrival = stopTimeUpdate.arrival?.time ? Temporal.Instant.fromEpochSeconds( Number(stopTimeUpdate.arrival.time), ) .toZonedDateTime({ timeZone: "America/New_York", calendar: "gregory", }) .toPlainTime() .since(zeroTime) .total("seconds") | 0 : null; const newDeparture = /* ... */; this.sql.exec( ` UPDATE stop_times SET arrival_total_seconds = IFNULL($1, arrival_total_seconds), departure_total_seconds = IFNULL($2, departure_total_seconds), is_realtime_updated = 1 WHERE trip_id IN (${tripIds.map((t) => `"${t}"`).join(",")}) AND stop_id == $4 `, newArrival, newDeparture, stopTimeUpdate.stopId, ); } } } }); console.log("updated realtime status for ", group); }
▍ Неожиданное открытие
Написав работающий код, я занялся его улучшением. В процессе улучшения я заметил, что Durable Object на самом деле имеют два метода для создания транзакций:
transaction и transactionSync. В своём загрузчике stop_times я вызывал transaction, имеющий сигнатуру transaction(cb: () => Promise<void>): Promise<void>.Это неидеальное решение, поскольку:
- Мой обратный вызов не возвращает
Promise. Хоть пока мой код работает, позже Cloudflare может решить, что это ошибка, или создать какой-нибудь малозаметный баг, который потом сведёт меня с ума. - Передача ошибок поломана. Если возвращённый
transactionpromise будет отклонён, то программа молча проигнорирует это, что тоже сведёт меня с ума, ведь мне будет сложно выявлять баги в процессе загрузки.
Поэтому я попробовал перейти на
transactionSync, но загрузчик снова стал чрезвычайно медленным!

Я предполагаю, что отправляя кучу вызовов
transaction, я случайно вызывал pipelining операций записи.

Примечание: предположительно, Cloudflare взимает оплату за время CPU, а не за фактическое время, то есть время, потраченное моим воркером на ожидание среды выполнения, вероятно, не будет учитываться ни для одного воркера
Хоть по описанным выше причинам этот код не обладает особой ��табильностью, это всего лишь хобби-проект, от которого ничего не зависит. Поэтому я решил, что это не «хак», а «оптимизация».
Если бы я захотел использовать эту методику в реальном проекте, то, вероятно, объединил бы все
Promise и создал Promise.all для передачи ошибок.▍ Заключение
Было бы проще поместить этот проект в мой инстанс Coolify в облаке. Там нет конкретных ограничений на время CPU. Кроме того, это позволило бы мне использовать MTAPI. Но я хотел изучить Durable Object. Если бы я делал всё обычным образом, то не научился бы ничему новому, и создал бы просто график движения поездов, а не высокооптимизированный Web ScaleTM serverless on-demand fault-tolerant график движения поездов, доступный 99,9% времени.
Моя любимая особенность Durable Object — их будильники. Я обнаружил, что планирование задач в распределённых средах/serverless-средах обычно чрезвычайно утомляет и склоняет к настройке отдельной очереди задач. Мне нравится, что будильники очень легко использовать, и они не требуют никаких дополнительных усилий.
График движения поездов можно смотреть на mta-trmnl.pages.dev.
Исходный код этого проекта выложен на GitHub: фронтенд, бэкенд.
Telegram-канал со скидками, розыгрышами призов и новостями IT 💻

