
Я хотел, чтобы у меня на кухне был дисплей, показывающий время прибытия поездов метро, и мне не приходилось сверяться с телефоном, пока собираюсь к выходу из дома. Эта статья рассказывает о создании такого дисплея.
График поездов метро Нью-Йорка можно посмотреть на mta-trmnl.pages.dev. Исходный код моего проекта выложен на GitHub: фронтенд, бэкенд.
▍ Я безголовый
Просыпаясь утром, я каждый раз смотрю погоду и график метро, а затем иду в душ и сразу забываю эту информацию. Выхожу из душа, проверяю её снова, но пока одеваюсь, снова всё забываю. Ищу телефон, разблокирую его, обновляю виджет погоды и виджет метро так часто, что телефон отключает Face ID и заставляет меня ввести пароль. Ко времени, когда я надеваю обувь, я уже опаздываю, а когда спускаюсь по лестнице, до следующего поезда остаётся десять минут. Это очень раздражает.
Мне показалось, что идеально было бы, чтобы эта информация была видна всегда, находилась в одном и том же месте (посередине квартиры), чтобы на разблокировку и забывание тратилось меньше времени и внимания.
Я купил TRMNL — интересный настраиваемый дисплей на электронных чернилах. В нём уже был виджет погоды, но не было виджета MTA.
Примечание: пока я работал над этим проектом, кто-то уже написал виджет MTA, но этот пост посвящён мне.
В TRMNL есть виджет, позволяющий делать скриншот веб-сайта и отображать его на планшете. Я попробовал указать в нём wheresthefuckingtrain.com, но это не сработало. WTFT получает данные о поездах после загрузки самой страницы, а рендерер TRMNL возвращал скриншот состояния страницы до выполнения асинхронных запросов.
Поэтому я решил написать собственный веб-сайт с графиком MTA (наверно, в этом нет ничего сложного?). Недавно из видео на YouTube я узнал о Durable Objects Cloudflare, и с радостью воспользовался возможностью поэкспериментировать с ними.
Если вкратце, то Durable Object — это особый тип serverless-функции, к которой прикреплено постоянное хранилище. Также у него есть будильники, то есть таймеры, полезные для выполнения периодических задач.
▍ Статичные графики
MTA публикует графики движения метро по большей мере в виде совместимых с GTFS данных. GTFS (General Transit Feed Specification) разбивает данные на статичный график и фид реального времени.
Статичный график — это файл ZIP, в котором содержится несколько файлов CSV со списками запланированных маршрутов, временем остановки, метаданными маршрутов и так далее.
❯ wget https://rrgtfsfeeds.s3.amazonaws.com/gtfs_supplemented.zip
❯ unzip gtfs_supplemented.zip
❯ ls -lh
total 155M
-rw-r--r-- 1 ibiyemi ibiyemi 162 Feb 21 19:25 agency.txt
-rw-r--r-- 1 ibiyemi ibiyemi 4.8K Feb 21 19:25 calendar.txt
-rw-r--r-- 1 ibiyemi ibiyemi 17K Feb 21 19:25 calendar_dates.txt
-rw-r--r-- 1 ibiyemi ibiyemi 18M Feb 21 19:25 gtfs_supplemented.zip
-rw-r--r-- 1 ibiyemi ibiyemi 12K Feb 21 19:25 routes.txt
-rw-r--r-- 1 ibiyemi ibiyemi 5.6M Feb 21 19:25 shapes.txt
-rw-r--r-- 1 ibiyemi ibiyemi 126M Feb 21 19:25 stop_times.txt
-rw-r--r-- 1 ibiyemi ibiyemi 63K Feb 21 19:25 stops.txt
-rw-r--r-- 1 ibiyemi ibiyemi 8.4K Feb 21 19:25 transfers.txt
-rw-r--r-- 1 ibiyemi ibiyemi 5.4M Feb 21 19:25 trips.txt
Один из этих файлов гораздо больше остальных. Уверен, это не вызовет никаких проблем.
Я создал сайт Astro, который скачивает файл ZIP, распаковывает его, создаёт несколько
Map
на основании данных в таблицах, а затем рендерит страницу.import { parse } from "csv-parse/sync";
import { Temporal } from "temporal-polyfill";
import yauzl from "yauzl";
// полные определения типов см. на GitHub
// https://github.com/laptou/mta-trmnl/blob/b2f547e89ace6fab34cd4db0e554ecf5354c5998/src/util/mta/index.ts#L13
export interface MtaState {
calendar: CalendarEntry[];
calendarDates: CalendarDateEntry[];
routes: Route[];
stopTimes: StopTime[];
stops: Stop[];
transfers: Transfer[];
lastUpdated: Date;
// Извлечённые данные
stations: Map<string, Station>;
lineToStations: Map<string, Set<string>>;
}
async function readZipEntries(
zip: yauzl.ZipFile,
): Promise<Map<string, string>> {
// вспомогательная функция, которая возвращает map сопоставления
// имён файлов элементов zip с содержимым файлов
}
export const MTA_SUPPLEMENTED_GTFS_STATIC_URL =
"https://rrgtfsfeeds.s3.amazonaws.com/gtfs_supplemented.zip";
export async function loadMtaBaselineState(zipPath: string): Promise<MtaState> {
const gtfsData = await fetch(zipPath).then((res) => res.arrayBuffer());
// zipFromBuffer — это вспомогательная функция, пропущенная ради краткости
const gtfsArchive = await zipFromBuffer(Buffer.from(gtfsData), {
lazyEntries: true,
});
const entries = await readZipEntries(gtfsArchive);
// [парсим все айлы CSV в массивы объектов]
const stations = new Map<string, Station>();
const lineToStations = new Map<string, Set<string>>();
for (const stop of stops) {
if (!stop.parent_station) {
// преобразуем остановки в станции и добавляем к map
// станции — это остановки верхнего уровня;
// также в них содержится информация, какая линия их обслуживает
}
}
for (const route of routes) {
// создаём map маршрутов и станций,
// чтобы можно было эффективно получать все станции для выбранного маршрута
}
return {
calendar,
calendarDates,
routes,
stopTimes,
stops,
transfers,
lastUpdated: new Date(),
stations,
lineToStations,
};
}
export function getAllLines(state: MtaState): Route[] {
return state.routes;
}
export function getStationsForLine(state: MtaState, lineId: string): Station[] {
const stationIds = state.lineToStations.get(lineId) || new Set();
return Array.from(stationIds)
.map((id) => state.stations.get(id))
.filter((station): station is Station => station !== undefined);
}
export function getUpcomingArrivals(
state: MtaState,
stationId: string,
limit = 10,
): TrainArrival[] {
// ...
}
Всё получилось, но работало невероятно медленно даже для этапа разработки. Astro не рассчитан на хранение состояния, то есть невозможно было один раз загрузить графики движения в память, а потом запрашивать данные в последующих запросах. Когда я делал запрос, он загружал всё с самого начала, что занимало целую минуту.
❯ bun dev
$ astro dev
astro v5.3.0 ready in 157 ms
┃ Local http://localhost:4321/
┃ Network use --host to expose
20:20:12 watching for file changes...
20:21:21 [200] / 69834ms
20:22:30 [200] / 60981ms
И здесь нам пригодятся Durable Object — их можно использовать для кэширования данных!
Я попытался определить Durable Object в той же кодовой базе, что и сайт Astro, но это по какой-то причине не сработало, поэтому я создал отдельный проект. Мой сайт Astro может запрашивать Durable Object (DO) по HTTP, когда ему нужно что-то отрендерить, а DO может выполнять чтение из кэша, обновляя графики примерно раз в час.
DO могут иметь или хранилище ключей и значений, или хранилище SQLite. В моём случае идеально подходит SQLite, потому что я имею дело с набором реляционных данных, поэтому я выбрал его. Я настроил Durable Object очень похожим образом, только помещал данные не в
Map
, а записывал в строки базы данных SQLite моего DO.export class MtaStateObject extends DurableObject {
sql: SqlStorage;
constructor(state: DurableObjectState, env: Env) {
super(state, env);
// Получаем интерфейс хранилища SQL.
this.sql = state.storage.sql;
state.blockConcurrencyWhile(async () => {
await this.initializeDatabase();
await this.loadGtfsStatic();
});
}
private async initializeDatabase() {
this.sql.exec(`
CREATE TABLE IF NOT EXISTS calendar (
service_id TEXT PRIMARY KEY,
monday INTEGER,
tuesday INTEGER,
wednesday INTEGER,
thursday INTEGER,
friday INTEGER,
saturday INTEGER,
sunday INTEGER,
start_date TEXT,
end_date TEXT
);
# здесь будут операторы CREATE TABLE для других таблиц
CREATE INDEX IF NOT EXISTS idx_stop_times_stop_id ON stop_times(stop_id);
CREATE INDEX IF NOT EXISTS idx_trips_route_id ON trips(route_id);
CREATE INDEX IF NOT EXISTS idx_stops_parent_station ON stops(parent_station);
CREATE TABLE IF NOT EXISTS metadata (
key TEXT PRIMARY KEY,
value TEXT
);
`);
}
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
let response: Response;
switch (url.pathname) {
case "/lines":
response = await this.handleGetAllLines();
break;
case "/stations":
response = await this.handleGetStationsForLine(url.searchParams);
break;
case "/station":
response = await this.handleGetStation(url.searchParams);
break;
case "/arrivals":
response = await this.handleGetUpcomingArrivals(url.searchParams);
break;
default:
response = new Response("Not found", { status: 404 });
}
return response;
}
/**
* Загружаем данные из файла zip GTFS в SQL.
*/
private async shouldUpdateGtfs(): Promise<boolean> {
// ищем в таблице метаданных ключ last_gtfs_update,
// если прошло больше часа, то возвращаем true
}
async loadGtfsStatic() {
if (!(await this.shouldUpdateGtfs())) {
console.log("not updating static gtfs");
return;
}
console.log("updating static gtfs");
try {
const gtfsResponse = await fetch(MTA_SUPPLEMENTED_GTFS_STATIC_URL);
const buf = Buffer.from(await gtfsResponse.arrayBuffer());
const gtfsArchive = await zipFromBuffer(buf, { lazyEntries: true });
// processZipEntries изменён, чтобы он возвращал
// map потоков, а не map текста
const entries = processZipEntries(gtfsArchive);
for await (const [fileName, stream] of entries) {
const parser = parse({
columns: true,
skip_empty_lines: true,
});
stream.pipe(parser);
console.log(`processing ${fileName}`);
switch (fileName) {
case "calendar.txt":
for await (const entry of parser) {
this.sql.exec(
`INSERT OR REPLACE INTO calendar
(service_id, monday, tuesday, wednesday, thursday, friday, saturday, sunday, start_date, end_date)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
entry.service_id,
Number.parseInt(entry.monday, 10),
Number.parseInt(entry.tuesday, 10),
Number.parseInt(entry.wednesday, 10),
Number.parseInt(entry.thursday, 10),
Number.parseInt(entry.friday, 10),
Number.parseInt(entry.saturday, 10),
Number.parseInt(entry.sunday, 10),
entry.start_date,
entry.end_date,
);
}
break;
// сюда вставляем операторы для других таблиц
}
}
// Обновляем метку времени последнего обновления
const now = Temporal.Now.instant();
this.sql.exec(
"INSERT OR REPLACE INTO metadata (key, value) VALUES (?, ?)",
"last_gtfs_update",
now.toString(),
);
} catch (err) {
console.error(err);
throw err instanceof Error ? err : new Error(String(err));
}
}
/**
* Возвращаем все линии (маршруты).
*/
async handleGetAllLines(): Promise<Response> {
// ...
}
/**
* Возвращаем станции для указанного id линии.
* Используем таблицу поездок, чтобы найти поездок для маршрута, а затем все остановки.
*/
async handleGetStationsForLine(params: URLSearchParams): Promise<Response> {
// ...
}
/**
* Возвращает информацию о станции по указанному stationId.
*/
async handleGetStation(params: URLSearchParams): Promise<Response> {
// ...
}
/**
* Возвращает ближайшие прибытия для указанной станции.
* Требует stationId, опциональное направление (север или юг),
* и опциональный максимум (по умолчанию 10).
*/
async handleGetUpcomingArrivals(params: URLSearchParams): Promise<Response> {
// ...
}
/**
* Проверяет, активен ли сегодня указанный сервис (по service_id).
* Преобразует сохранённые строки (YYYYMMDD) в Temporal.PlainDate для сравнений.
*/
async isServiceActiveToday(serviceId: string): Promise<boolean> {
// ...
}
}
Затем я обновил свой сайт Astro, чтобы он начал получать данные из DO, и это сработало. Всё по-прежнему было медленным при первом запросе (так как DO заполняет базу данных), но примерно в течение часа все последующие запросы выполнялись быстро. Миссия выполнена? Я загрузил свой DO в Cloudflare, чтобы протестировать его.

Беда.
Что ж, давайте разбираться, как снизить время CPU. Что отнимает так много времени? Я добавил к инициализатору базы данных
console.log()
.
Понятно. Очевидно, таблица
stop_times
(в которой хранится список всех остановок за день для каждого поезда на каждом типе маршрута) содержит 2,2 миллиона строк.Ну, SQLite должна быть очень быстрой, так что виновато в этом, вероятно, чтение строк из файла ZIP. Давайте посмотрим, сможем ли мы ускорить распаковку.
Я попробовал заменить
yauzl
на fflate
, но безуспешно. Что, если использовать WebAssembly? Я создал простой проект wasm-pack
и добавил в него zip
и csv
.#[wasm_bindgen]
pub fn unpack_csv_archive(
data: Uint8Array,
row_chunk_len: usize,
callback: Function,
) -> Result<(), wasm_bindgen::JsError> {
// вызываем unpack_csv_archive_inner после преобразования аргументов
Ok(())
}
pub fn unpack_csv_archive_inner<
F: FnMut(&str, &js_sys::Array, &js_sys::Array) -> anyhow::Result<()>,
>(
buf: &[u8],
row_chunk_len: usize,
mut callback: F,
) -> anyhow::Result<()> {
let mut ar = zip::ZipArchive::new(Cursor::new(buf)).context("could not create zip archive")?;
for i in 0..ar.len() {
let file = ar
.by_index(i)
.with_context(|| format!("could not open file at index {i}"))?;
let file_name = file.name().to_owned();
let mut reader = csv::ReaderBuilder::new().from_reader(file);
let headers = reader.headers()?;
let headers_arr: js_sys::Array = headers.iter().map(JsString::from_str).try_collect()?;
let mut chunk = vec![];
for record in reader.into_records() {
let record = record?;
let record_arr: js_sys::Array = record.iter().map(JsString::from_str).try_collect()?;
chunk.push(record_arr);
if chunk.len() >= row_chunk_len {
// наш обратный вызов написан на JS
// он содержит цикл for..of loop, который
// вставляет все только что полученные строки
callback(&file_name, &headers_arr, &js_sys::Array::from_iter(chunk))?;
chunk = vec![];
}
}
if chunk.len() > 0 {
callback(&file_name, &headers_arr, &js_sys::Array::from_iter(chunk))?;
}
}
Ok(())
}
Получилось быстрее, но ненамного.

Похоже, пора перестать гадать и заняться профилированием.

Благодаря профилированию выяснилось, что основная часть времени выполнения тратится на
js_sys::function::apply
(это наш обратный вызов exec()
SQL) и на реализацию Deserialize
csv
. Как ускорить вставки SQLite? Нужно использовать транзакции.В текущем виде код предназначен для вставки блоков по 50000 строк за раз. Я обёртываю каждый блок в вызов
ctx.storage.transaction()
и выполняю его заново.Предыдущая локальная реализация требовала для заполнения базы данных примерно 23 секунд при работе локального Wrangler. После добавления транзакций база данных заполняется, а воркер возвращает ответ за 4 секунды!

Я развернул новую версию в Cloudflare, и всё заработало!
Теперь у нас есть Durable Object, который, по сути, является работающим по запросу индексированной репликой для чтения текущего графика MTA. Класс.
▍ Обновления в реальном времени
Проект был бы неполным без обновлений в реальном времени. В среднем поезда приходят вовремя лишь в 80% случаев. Тем, кому не повезло жить рядом с поездом F, приходится пользоваться поездом, в декабре приходившим вовремя в 64% случаев.
То есть веб-сайт, просто показывающий график без текущей информации, бесполезен и даже вреден, потому что из-за него вы можете прождать на станции полчаса, хотя могли бы просто взять Citi Bike.
Фиды реального времени GTFS передаются в формате Protobuf. Из-за названия «real-time» я сначала думал, что они потоково передают ответы по долго открытому сокет-соединению, но на самом деле это всего лишь HTTP-опросы.
Фиды реального времени достаточно просты: в них содержится список сообщений о текущем состоянии системы. В него включены и сообщения обновлённого времени прибытия и отправления для определённого маршрута на определённой остановке. После парсинга фидов реального времени я просто перезаписываю время прибытия и отправления из статической таблицы обновлёнными данными.
type RealtimeStatusGroup =
| "ACE"
| "BDFM"
| "G"
| "JZ"
| "NQRW"
| "L"
| "1234567"
| "SIR";
async getRealtimeStatus(group: RealtimeStatusGroup): Promise<FeedMessage> {
let endpoint: string;
switch (group) {
case "ACE":
endpoint =
"https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-ace";
break;
// По неизвестным мне причинам MTA разбивает
// обновления реального времени на семь отдельных фидов
}
const response = await fetch(new Request(endpoint));
// FeedMessage — это класс, который я сгенерировал из заголовка protobuf
// GTFS реального времени при помощи @protobuf-ts/plugin
return FeedMessage.fromBinary(await response.bytes());
}
async updateRealtimeStatus(group: RealtimeStatusGroup) {
const status = await this.getRealtimeStatus(group);
const activeServices = await this.getActiveServiceIds();
await this.ctx.storage.transaction(async () => {
for (const msg of status.entity) {
if (msg.tripUpdate) {
if (!msg.tripUpdate.trip) {
console.warn("got trip update without trip", msg);
continue;
}
// MTA обозначает поездки в фиде реального времени суффиксом ID поездки,
// например "082500_A..N54R"
const tripIdSuffix = msg.tripUpdate.trip.tripId;
const tripIds = this.sql
.exec<{ trip_id: string; service_id: string }>(
"SELECT trip_id, service_id FROM trips WHERE trip_id LIKE $1",
`%${tripIdSuffix}`,
)
.toArray()
.filter((r) => activeServices.includes(r.service_id))
.map((r) => r.trip_id);
const zeroTime = Temporal.PlainTime.from("00:00:00");
for (const stopTimeUpdate of msg.tripUpdate!.stopTimeUpdate) {
if (!stopTimeUpdate.stopId) continue;
// Здесь код довольно некрасивый — Temporal API очень многословный,
// особенно при взаимодействии с системами, которые его не используют,
// зато он сильно упрощает сериализацию и арифметику с датами,
// временем, datetime и интервалами
const newArrival = stopTimeUpdate.arrival?.time
? Temporal.Instant.fromEpochSeconds(
Number(stopTimeUpdate.arrival.time),
)
.toZonedDateTime({
timeZone: "America/New_York",
calendar: "gregory",
})
.toPlainTime()
.since(zeroTime)
.total("seconds") | 0
: null;
const newDeparture = /* ... */;
this.sql.exec(
`
UPDATE stop_times
SET arrival_total_seconds = IFNULL($1, arrival_total_seconds),
departure_total_seconds = IFNULL($2, departure_total_seconds),
is_realtime_updated = 1
WHERE trip_id IN (${tripIds.map((t) => `"${t}"`).join(",")}) AND stop_id == $4
`,
newArrival,
newDeparture,
stopTimeUpdate.stopId,
);
}
}
}
});
console.log("updated realtime status for ", group);
}
▍ Неожиданное открытие
Написав работающий код, я занялся его улучшением. В процессе улучшения я заметил, что Durable Object на самом деле имеют два метода для создания транзакций:
transaction
и transactionSync
. В своём загрузчике stop_times
я вызывал transaction
, имеющий сигнатуру transaction(cb: () => Promise<void>): Promise<void>
.Это неидеальное решение, поскольку:
- Мой обратный вызов не возвращает
Promise
. Хоть пока мой код работает, позже Cloudflare может решить, что это ошибка, или создать какой-нибудь малозаметный баг, который потом сведёт меня с ума. - Передача ошибок поломана. Если возвращённый
transaction
promise будет отклонён, то программа молча проигнорирует это, что тоже сведёт меня с ума, ведь мне будет сложно выявлять баги в процессе загрузки.
Поэтому я попробовал перейти на
transactionSync
, но загрузчик снова стал чрезвычайно медленным!

Я предполагаю, что отправляя кучу вызовов
transaction
, я случайно вызывал pipelining операций записи.

Примечание: предположительно, Cloudflare взимает оплату за время CPU, а не за фактическое время, то есть время, потраченное моим воркером на ожидание среды выполнения, вероятно, не будет учитываться ни для одного воркера
Хоть по описанным выше причинам этот код не обладает особой стабильностью, это всего лишь хобби-проект, от которого ничего не зависит. Поэтому я решил, что это не «хак», а «оптимизация».
Если бы я захотел использовать эту методику в реальном проекте, то, вероятно, объединил бы все
Promise
и создал Promise.all
для передачи ошибок.▍ Заключение
Было бы проще поместить этот проект в мой инстанс Coolify в облаке. Там нет конкретных ограничений на время CPU. Кроме того, это позволило бы мне использовать MTAPI. Но я хотел изучить Durable Object. Если бы я делал всё обычным образом, то не научился бы ничему новому, и создал бы просто график движения поездов, а не высокооптимизированный Web ScaleTM serverless on-demand fault-tolerant график движения поездов, доступный 99,9% времени.
Моя любимая особенность Durable Object — их будильники. Я обнаружил, что планирование задач в распределённых средах/serverless-средах обычно чрезвычайно утомляет и склоняет к настройке отдельной очереди задач. Мне нравится, что будильники очень легко использовать, и они не требуют никаких дополнительных усилий.
График движения поездов можно смотреть на mta-trmnl.pages.dev.
Исходный код этого проекта выложен на GitHub: фронтенд, бэкенд.
Telegram-канал со скидками, розыгрышами призов и новостями IT 💻
