Я хотел, чтобы у меня на кухне был дисплей, показывающий время прибытия поездов метро, и мне не приходилось сверяться с телефоном, пока собираюсь к выходу из дома. Эта статья рассказывает о создании такого дисплея.

График поездов метро Нью-Йорка можно посмотреть н�� mta-trmnl.pages.dev. Исходный код моего проекта выложен на GitHub: фронтенд, бэкенд.

▍ Я безголовый


Просыпаясь утром, я каждый раз смотрю погоду и график метро, а затем иду в душ и сразу забываю эту информацию. Выхожу из душа, проверяю её снова, но пока одеваюсь, снова всё забываю. Ищу телефон, разблокирую его, обновляю виджет погоды и виджет метро так часто, что телефон отключает Face ID и заставляет меня ввести пароль. Ко времени, когда я надеваю обувь, я уже опаздываю, а когда спускаюсь по лестнице, до следующего поезда остаётся десять минут. Это очень раздражает.

Мне показалось, что идеально было бы, чтобы эта информация была видна всегда, находилась в одном и том же месте (посередине квартиры), чтобы на разблокировку и забывание тратилось меньше времени и внимания.

Я купил TRMNL — интересный настраиваемый дисплей на электронных чернилах. В нём уже был виджет погоды, но не было виджета MTA.

Примечание: пока я работал над этим проектом, кто-то уже написал виджет MTA, но этот пост посвящён мне.

В TRMNL есть виджет, позволяющий делать скриншот веб-сайта и отображать его на планшете. Я попробовал указать в нём wheresthefuckingtrain.com, но это не сработало. WTFT получает данные о поездах после загрузки самой страницы, а рендерер TRMNL возвращал скриншот состояния страницы до выполнения асинхронных запросов.

Поэтому я решил написать собственный веб-сайт с графиком MTA (наверно, в этом не�� ничего сложного?). Недавно из видео на YouTube я узнал о Durable Objects Cloudflare, и с радостью воспользовался возможностью поэкспериментировать с ними.

Если вкратце, то Durable Object — это особый тип serverless-функции, к которой прикреплено постоянное хранилище. Также у него есть будильники, то есть таймеры, полезные для выполнения периодических задач.

▍ Статичные графики


MTA публикует графики движения метро по большей мере в виде совместимых с GTFS данных. GTFS (General Transit Feed Specification) разбивает данные на статичный график и фид реального времени.

Статичный график — это файл ZIP, в котором содержится несколько файлов CSV со списками запланированных маршрутов, временем остановки, метаданными маршрутов и так далее.

❯ wget https://rrgtfsfeeds.s3.amazonaws.com/gtfs_supplemented.zip
❯ unzip gtfs_supplemented.zip        
❯ ls -lh
total 155M
-rw-r--r-- 1 ibiyemi ibiyemi  162 Feb 21 19:25 agency.txt
-rw-r--r-- 1 ibiyemi ibiyemi 4.8K Feb 21 19:25 calendar.txt
-rw-r--r-- 1 ibiyemi ibiyemi  17K Feb 21 19:25 calendar_dates.txt
-rw-r--r-- 1 ibiyemi ibiyemi  18M Feb 21 19:25 gtfs_supplemented.zip
-rw-r--r-- 1 ibiyemi ibiyemi  12K Feb 21 19:25 routes.txt
-rw-r--r-- 1 ibiyemi ibiyemi 5.6M Feb 21 19:25 shapes.txt
-rw-r--r-- 1 ibiyemi ibiyemi 126M Feb 21 19:25 stop_times.txt
-rw-r--r-- 1 ibiyemi ibiyemi  63K Feb 21 19:25 stops.txt
-rw-r--r-- 1 ibiyemi ibiyemi 8.4K Feb 21 19:25 transfers.txt
-rw-r--r-- 1 ibiyemi ibiyemi 5.4M Feb 21 19:25 trips.txt

Один из этих файлов гораздо больше остальных. Уверен, это не вызовет никаких проблем.

Я создал сайт Astro, который скачивает файл ZIP, распаковывает его, создаёт несколько Map на основании данных в таблицах, а затем рендерит страницу.

import { parse } from "csv-parse/sync";
import { Temporal } from "temporal-polyfill";
import yauzl from "yauzl";

// полные определения типов см. на GitHub
// https://github.com/laptou/mta-trmnl/blob/b2f547e89ace6fab34cd4db0e554ecf5354c5998/src/util/mta/index.ts#L13

export interface MtaState {
  calendar: CalendarEntry[];
  calendarDates: CalendarDateEntry[];
  routes: Route[];
  stopTimes: StopTime[];
  stops: Stop[];
  transfers: Transfer[];
  lastUpdated: Date;

  // Извлечённые данные
  stations: Map<string, Station>;
  lineToStations: Map<string, Set<string>>;
}

async function readZipEntries(
  zip: yauzl.ZipFile,
): Promise<Map<string, string>> {
  // вспомогательная функция, которая возвращает map сопоставления
  // имён файлов элементов zip с содержимым файлов
}

export const MTA_SUPPLEMENTED_GTFS_STATIC_URL =
  "https://rrgtfsfeeds.s3.amazonaws.com/gtfs_supplemented.zip";

export async function loadMtaBaselineState(zipPath: string): Promise<MtaState> {
  const gtfsData = await fetch(zipPath).then((res) => res.arrayBuffer());
  // zipFromBuffer — это вспомогательная функция, пропущенная ради краткости
  const gtfsArchive = await zipFromBuffer(Buffer.from(gtfsData), {
    lazyEntries: true,
  });
  const entries = await readZipEntries(gtfsArchive);

  // [парсим все айлы CSV в массивы объектов]

  const stations = new Map<string, Station>();
  const lineToStations = new Map<string, Set<string>>();

  for (const stop of stops) {
    if (!stop.parent_station) {
      // преобразуем остановки в станции и добавляем к map

      // станции — это остановки верхнего уровня;
      // также в них содержится информация, какая линия их обслуживает
    }
  }

  for (const route of routes) {
    // создаём map маршрутов и станций,
    // чтобы можно было эффективно получать все станции для выбранного маршрута
  }

  return {
    calendar,
    calendarDates,
    routes,
    stopTimes,
    stops,
    transfers,
    lastUpdated: new Date(),
    stations,
    lineToStations,
  };
}

export function getAllLines(state: MtaState): Route[] {
  return state.routes;
}

export function getStationsForLine(state: MtaState, lineId: string): Station[] {
  const stationIds = state.lineToStations.get(lineId) || new Set();
  return Array.from(stationIds)
    .map((id) => state.stations.get(id))
    .filter((station): station is Station => station !== undefined);
}

export function getUpcomingArrivals(
  state: MtaState,
  stationId: string,
  limit = 10,
): TrainArrival[] {
  // ...
}

Всё получилось, но работало невероятно медленно даже для этапа разработки. Astro не рассчитан на хранение состояния, то есть невозможно было один раз загрузить графики движения в память, а потом запрашивать данные в последующих запросах. Когда я делал запрос, он загружал всё с самого начала, что занимало целую минуту.

❯ bun dev
$ astro dev
 astro  v5.3.0 ready in 157 ms

┃ Local    http://localhost:4321/
┃ Network  use --host to expose

20:20:12 watching for file changes...
20:21:21 [200] / 69834ms
20:22:30 [200] / 60981ms

И здесь нам пригодятся Durable Object — их можно использовать для кэширования данных!

Я попытался определить Durable Object в той же кодовой базе, что и сайт Astro, но это п�� какой-то причине не сработало, поэтому я создал отдельный проект. Мой сайт Astro может запрашивать Durable Object (DO) по HTTP, когда ему нужно что-то отрендерить, а DO может выполнять чтение из кэша, обновляя графики примерно раз в час.

DO могут иметь или хранилище ключей и значений, или хранилище SQLite. В моём случае идеально подходит SQLite, потому что я имею дело с набором реляционных данных, поэтому я выбрал его. Я настроил Durable Object очень похожим образом, только помещал данные не в Map, а записывал в строки базы данных SQLite моего DO.

export class MtaStateObject extends DurableObject {
  sql: SqlStorage;

  constructor(state: DurableObjectState, env: Env) {
    super(state, env);

    // Получаем интерфейс хранилища SQL.
    this.sql = state.storage.sql;

    state.blockConcurrencyWhile(async () => {
      await this.initializeDatabase();
      await this.loadGtfsStatic();
    });
  }

  private async initializeDatabase() {
    this.sql.exec(`
      CREATE TABLE IF NOT EXISTS calendar (
        service_id TEXT PRIMARY KEY,
        monday INTEGER,
        tuesday INTEGER,
        wednesday INTEGER, 
        thursday INTEGER,
        friday INTEGER,
        saturday INTEGER,
        sunday INTEGER,
        start_date TEXT,
        end_date TEXT
      );

      # здесь будут операторы CREATE TABLE для других таблиц

      CREATE INDEX IF NOT EXISTS idx_stop_times_stop_id ON stop_times(stop_id);
      CREATE INDEX IF NOT EXISTS idx_trips_route_id ON trips(route_id);
      CREATE INDEX IF NOT EXISTS idx_stops_parent_station ON stops(parent_station);

      CREATE TABLE IF NOT EXISTS metadata (
        key TEXT PRIMARY KEY,
        value TEXT
      );
    `);
  }

  async fetch(request: Request): Promise<Response> {
    const url = new URL(request.url);
    let response: Response;
    switch (url.pathname) {
      case "/lines":
        response = await this.handleGetAllLines();
        break;
      case "/stations":
        response = await this.handleGetStationsForLine(url.searchParams);
        break;
      case "/station":
        response = await this.handleGetStation(url.searchParams);
        break;
      case "/arrivals":
        response = await this.handleGetUpcomingArrivals(url.searchParams);
        break;
      default:
        response = new Response("Not found", { status: 404 });
    }

    return response;
  }

  /**
   * Загружаем данные из файла zip GTFS в SQL.
   */
  private async shouldUpdateGtfs(): Promise<boolean> {
    // ищем в таблице метаданных ключ last_gtfs_update,
    // если прошло больше часа, то возвращаем true
  }

  async loadGtfsStatic() {
    if (!(await this.shouldUpdateGtfs())) {
      console.log("not updating static gtfs");
      return;
    }

    console.log("updating static gtfs");

    try {
      const gtfsResponse = await fetch(MTA_SUPPLEMENTED_GTFS_STATIC_URL);
      const buf = Buffer.from(await gtfsResponse.arrayBuffer());
      const gtfsArchive = await zipFromBuffer(buf, { lazyEntries: true });
      // processZipEntries изменён, чтобы он возвращал
      // map потоков, а не map текста
      const entries = processZipEntries(gtfsArchive);

      for await (const [fileName, stream] of entries) {
        const parser = parse({
          columns: true,
          skip_empty_lines: true,
        });

        stream.pipe(parser);

        console.log(`processing ${fileName}`);

        switch (fileName) {
          case "calendar.txt":
            for await (const entry of parser) {
              this.sql.exec(
                `INSERT OR REPLACE INTO calendar
                (service_id, monday, tuesday, wednesday, thursday, friday, saturday, sunday, start_date, end_date)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
                entry.service_id,
                Number.parseInt(entry.monday, 10),
                Number.parseInt(entry.tuesday, 10),
                Number.parseInt(entry.wednesday, 10),
                Number.parseInt(entry.thursday, 10),
                Number.parseInt(entry.friday, 10),
                Number.parseInt(entry.saturday, 10),
                Number.parseInt(entry.sunday, 10),
                entry.start_date,
                entry.end_date,
              );
            }
            break;

          // сюда вставляем операторы для других таблиц
        }
      }

      // Обновляем метку времени последнего обновления
      const now = Temporal.Now.instant();
      this.sql.exec(
        "INSERT OR REPLACE INTO metadata (key, value) VALUES (?, ?)",
        "last_gtfs_update",
        now.toString(),
      );
    } catch (err) {
      console.error(err);
      throw err instanceof Error ? err : new Error(String(err));
    }
  }

  /**
   * Возвращаем все линии (маршруты).
   */
  async handleGetAllLines(): Promise<Response> {
    // ...
  }

  /**
   * Возвращаем станции для указанного id линии.
   * Используем таблицу поездок, чтобы найти поездок для маршрута, а затем все остановки.
   */
  async handleGetStationsForLine(params: URLSearchParams): Promise<Response> {
    // ...
  }

  /**
   * Возвращает информацию о станции по указанному stationId.
   */
  async handleGetStation(params: URLSearchParams): Promise<Response> {
    // ...
  }

  /**
   * Возвращает ближайшие прибытия для указанной станции.
   * Требует stationId, опциональное направление (север или юг),
   * и опциональный максимум (по умолчанию 10).
   */
  async handleGetUpcomingArrivals(params: URLSearchParams): Promise<Response> {
    // ...
  }

  /**
   * Проверяет, активен ли сегодня указанный сервис (по service_id).
   * Преобразует сохранённые строки (YYYYMMDD) в Temporal.PlainDate для сравнений.
   */
  async isServiceActiveToday(serviceId: string): Promise<boolean> {
    // ...
  }
}

Затем я обновил свой сайт Astro, чтобы он начал получать данные из DO, и это сработало. Всё по-прежнему было медленным при первом запросе (так как DO заполняет базу данных), но примерно в течение часа все последующие запросы выполнялись быстро. Миссия выполнена? Я загрузил свой DO в Cloudflare, чтобы протестировать его.


Беда.

Что ж, давайте разбираться, как снизить время CPU. Что отнимает так много времени? Я добавил к инициализатору базы данных console.log().


Понятно. Очевидно, таблица stop_times (в которой хранится список всех остановок за день для каждого поезда на каждом типе маршрута) содержит 2,2 миллиона строк.

Ну, SQLite должна быть очень быстрой, так что виновато в этом, вероятно, чтение строк из файла ZIP. Давайте посмотрим, сможем ли мы ускорить распаковку.

Я попробовал заменить yauzl на fflate, но безуспешно. Что, если использовать WebAssembly? Я создал простой проект wasm-pack и добавил в него zip и csv.

#[wasm_bindgen]
pub fn unpack_csv_archive(
    data: Uint8Array,
    row_chunk_len: usize,
    callback: Function,
) -> Result<(), wasm_bindgen::JsError> {
    // вызываем unpack_csv_archive_inner после преобразования аргументов

    Ok(())
}

pub fn unpack_csv_archive_inner<
    F: FnMut(&str, &js_sys::Array, &js_sys::Array) -> anyhow::Result<()>,
>(
    buf: &[u8],
    row_chunk_len: usize,
    mut callback: F,
) -> anyhow::Result<()> {
    let mut ar = zip::ZipArchive::new(Cursor::new(buf)).context("could not create zip archive")?;

    for i in 0..ar.len() {
        let file = ar
            .by_index(i)
            .with_context(|| format!("could not open file at index {i}"))?;

        let file_name = file.name().to_owned();

        let mut reader = csv::ReaderBuilder::new().from_reader(file);
        let headers = reader.headers()?;
        let headers_arr: js_sys::Array = headers.iter().map(JsString::from_str).try_collect()?;

        let mut chunk = vec![];

        for record in reader.into_records() {
            let record = record?;

            let record_arr: js_sys::Array = record.iter().map(JsString::from_str).try_collect()?;

            chunk.push(record_arr);

            if chunk.len() >= row_chunk_len {
                // наш обратный вызов написан на JS
                // он содержит цикл for..of loop, который
                // вставляет все только что полученные строки
                callback(&file_name, &headers_arr, &js_sys::Array::from_iter(chunk))?;
                chunk = vec![];
            }
        }

        if chunk.len() > 0 {
            callback(&file_name, &headers_arr, &js_sys::Array::from_iter(chunk))?;
        }
    }

    Ok(())
}

Получилось быстрее, но ненамного.


Похоже, пора перестать гадать и заняться профилированием.


Благодаря профилированию выяснилось, что основная часть времени выполнения тратится на js_sys::function::apply (это наш обратный вызов exec() SQL) и на реализацию Deserializecsv. Как ускорить вставки SQLite? Нужно использовать транзакции.

В текущем виде код предназначен для вставки блоков по 50000 строк за раз. Я обёртываю каждый блок в вызов ctx.storage.transaction() и выполняю его заново.

Предыдущая локальная реализация требовала для заполнения базы данных примерно 23 секунд при работе локального Wrangler. После добавления транзакций база данных заполняется, а воркер возвращает ответ за 4 секунды!


Я развернул новую версию в Cloudflare, и всё заработало!

Теперь у нас есть Durable Object, который, по сути, является работающим по запросу индексированной репликой для чтения текущего графика MTA. Класс.

▍ Обновления в реальном времени


Проект был бы неполным без обновлений в реальном времени. В среднем поезда приходят вовремя лишь в 80% случаев. Тем, кому не повезло жить рядом с поездом F, приходится пользоваться поездом, в декабре приходившим вовремя в 64% случаев.

То есть веб-сайт, просто показывающий график без текущей информации, бесполезен и даже вреден, потому что из-за него вы можете прождать на станции полчаса, хотя могли бы просто взять Citi Bike.

Фиды реального времени GTFS передаются в формате Protobuf. Из-за названия «real-time» я сначала думал, что они потоково передают ответы по долго открытому сокет-соединению, но на самом деле это всего лишь HTTP-опросы.

Фиды реального времени достаточно просты: в них содержится список сообщений о текущем состоянии системы. В него включены и сообщения обновлённого времени прибытия и отправления для определённого маршрута на определённой остановке. После парсинга фидов реального времени я просто перезаписываю время прибытия и отправления из статической таблицы обновлёнными данными.

type RealtimeStatusGroup =
	| "ACE"
	| "BDFM"
	| "G"
	| "JZ"
	| "NQRW"
	| "L"
	| "1234567"
	| "SIR";

async getRealtimeStatus(group: RealtimeStatusGroup): Promise<FeedMessage> {
  let endpoint: string;

  switch (group) {
    case "ACE":
      endpoint =
        "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-ace";
      break;
    // По неизвестным мне причинам MTA разбивает
    //  обновления реального времени на семь отдельных фидов
  }

  const response = await fetch(new Request(endpoint));
  // FeedMessage — это класс, который я сгенерировал из заголовка protobuf 
  // GTFS реального времени при помощи @protobuf-ts/plugin
  return FeedMessage.fromBinary(await response.bytes());
}

async updateRealtimeStatus(group: RealtimeStatusGroup) {
  const status = await this.getRealtimeStatus(group);
  const activeServices = await this.getActiveServiceIds();

  await this.ctx.storage.transaction(async () => {
    for (const msg of status.entity) {
      if (msg.tripUpdate) {
        if (!msg.tripUpdate.trip) {
          console.warn("got trip update without trip", msg);
          continue;
        }

        // MTA обозначает поездки в фиде реального времени суффиксом ID поездки,
        // например "082500_A..N54R"
        const tripIdSuffix = msg.tripUpdate.trip.tripId;

        const tripIds = this.sql
          .exec<{ trip_id: string; service_id: string }>(
            "SELECT trip_id, service_id FROM trips WHERE trip_id LIKE $1",
            `%${tripIdSuffix}`,
          )
          .toArray()
          .filter((r) => activeServices.includes(r.service_id))
          .map((r) => r.trip_id);

        const zeroTime = Temporal.PlainTime.from("00:00:00");

        for (const stopTimeUpdate of msg.tripUpdate!.stopTimeUpdate) {
          if (!stopTimeUpdate.stopId) continue;

          // Здесь код довольно некрасивый — Temporal API очень многословный,
          // особенно при взаимодействии с системами, которые его не используют, 
          // зато он сильно упрощает сериализацию и арифметику с датами,
          // временем, datetime и интервалами
          const newArrival = stopTimeUpdate.arrival?.time
            ? Temporal.Instant.fromEpochSeconds(
                Number(stopTimeUpdate.arrival.time),
              )
                .toZonedDateTime({
                  timeZone: "America/New_York",
                  calendar: "gregory",
                })
                .toPlainTime()
                .since(zeroTime)
                .total("seconds") | 0
            : null;

          const newDeparture = /* ... */;

          this.sql.exec(
            `
            UPDATE stop_times 
            SET arrival_total_seconds = IFNULL($1, arrival_total_seconds), 
              departure_total_seconds = IFNULL($2, departure_total_seconds), 
              is_realtime_updated = 1
            WHERE trip_id IN (${tripIds.map((t) => `"${t}"`).join(",")}) AND stop_id == $4
          `,
            newArrival,
            newDeparture,
            stopTimeUpdate.stopId,
          );
        }
      }
    }
  });

  console.log("updated realtime status for ", group);
}

▍ Неожиданное открытие


Написав работающий код, я занялся его улучшением. В процессе улучшения я заметил, что Durable Object на самом деле имеют два метода для создания транзакций: transaction и transactionSync. В своём загрузчике stop_times я вызывал transaction, имеющий сигнатуру transaction(cb: () => Promise<void>): Promise<void>.

Это неидеальное решение, поскольку:

  • Мой обратный вызов не возвращает Promise. Хоть пока мой код работает, позже Cloudflare может решить, что это ошибка, или создать какой-нибудь малозаметный баг, который потом сведёт меня с ума.
  • Передача ошибок поломана. Если возвращённый transaction promise будет отклонён, то программа молча проигнорирует это, что тоже сведёт меня с ума, ведь мне будет сложно выявлять баги в процессе загрузки.

Поэтому я попробовал перейти на transactionSync, но загрузчик снова стал чрезвычайно медленным!

Это версия, где используется This transactionSync. Похоже, на моём ноутбуке воркер Cloudflare работает примерно на 40% медленнее. Это значит, что 18 с на моём ноутбуке превысят лимит времени в 30 секунд в облаке. Так и произошло.

Версия, в которой используется transaction.

Я предполагаю, что отправляя кучу вызовов transaction, я случайно вызывал pipelining операций записи.

Что, по моему мнению, происходит с transactionSync

Что, по моему мнению, происходит с transaction

Примечание: предположительно, Cloudflare взимает оплату за время CPU, а не за фактическое время, то есть время, потраченное моим воркером на ожидание среды выполнения, вероятно, не будет учитываться ни для одного воркера

Хоть по описанным выше причинам этот код не обладает особой ��табильностью, это всего лишь хобби-проект, от которого ничего не зависит. Поэтому я решил, что это не «хак», а «оптимизация».

Если бы я захотел использовать эту методику в реальном проекте, то, вероятно, объединил бы все Promise и создал Promise.all для передачи ошибок.

▍ Заключение


Было бы проще поместить этот проект в мой инстанс Coolify в облаке. Там нет конкретных ограничений на время CPU. Кроме того, это позволило бы мне использовать MTAPI. Но я хотел изучить Durable Object. Если бы я делал всё обычным образом, то не научился бы ничему новому, и создал бы просто график движения поездов, а не высокооптимизированный Web ScaleTM serverless on-demand fault-tolerant график движения поездов, доступный 99,9% времени.

Моя любимая особенность Durable Object — их будильники. Я обнаружил, что планирование задач в распределённых средах/serverless-средах обычно чрезвычайно утомляет и склоняет к настройке отдельной очереди задач. Мне нравится, что будильники очень легко использовать, и они не требуют никаких дополнительных усилий.

График движения поездов можно смотреть на mta-trmnl.pages.dev.

Исходный код этого проекта выложен на GitHub: фронтенд, бэкенд.

Telegram-канал со скидками, розыгрышами призов и новостями IT 💻