Как стать автором
Обновить

Изучаю Scala: Часть 4 — WebSocket

Время на прочтение5 мин
Количество просмотров4.8K

Привет, Хабр! На этот раз я по пробовал сделать простенький чат через ВебСокеты. За подробностями добро пожаловать под кат.

Содержание



Ссылки


  1. Исходники
  2. Образы docker image
  3. Tapir
  4. Http4s
  5. Fs2
  6. Doobie
  7. ScalaTest
  8. ScalaCheck
  9. ScalaTestPlusScalaCheck

Собственно весь код находиться в одном объект ChatHub

class ChatHub[F[_]] private(
                             val topic: Topic[F, WebSocketFrame],
                             private val ref: Ref[F, Int]
                           )
                           (
                             implicit concurrent: Concurrent[F],
                             timer: Timer[F]
                           ) extends Http4sDsl[F] {

  val endpointWs: ServerEndpoint[String, Unit, String, Stream[IO, WebSocketFrame], IO] = endpoint
    .get
    .in("chat")
    .tag("WebSockets")
    .summary("Подключится к общему чату. Например по такому адресу: ws://localhost:8080/chat")
    .description("Подключает к общему чату")
    .in(
      stringBody
        .description("Сообщение которое будет отправлено пользователям в чате")
        .example("Привет!")
    )
    .out(
      stringBody
        .description("Сообщение которое кто-то написал в чат")
        .example("6 : Сообщение от клиента с Id подключения f518a53d: Привет!")
    )
    //Заглушка которая всегда отвечает ошибкой. 
    .serverLogic(_ => IO(Left(()): Either[Unit, String]))

  def routeWs: HttpRoutes[F] = {
    HttpRoutes.of[F] {
      case GET -> Root / "chat" => logic()
    }
  }

  private def logic(): F[Response[F]] = {
    val toClient: Stream[F, WebSocketFrame] =
      topic.subscribe(1000)
    val fromClient: Pipe[F, WebSocketFrame, Unit] =
      handle
    WebSocketBuilder[F].build(toClient, fromClient)
  }

  private def handle(s: Stream[F, WebSocketFrame]): Stream[F, Unit] = s
    .collect({
      case WebSocketFrame.Text(text, _) => text
    })
    .evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
    .through(topic.publish)
}

object ChatHub {

  def apply[F[_]]()(implicit concurrent: Concurrent[F], timer: Timer[F]): F[ChatHub[F]] = for {
    ref <- Ref.of[F, Int](0)
    topic <- Topic[F, WebSocketFrame](WebSocketFrame.Text("==="))
  } yield new ChatHub(topic, ref)
}

Тут надо сразу сказать про Topic — примитив синхронизации из Fs2 который позволяет сделать модель Publisher — Subscriber причем у вас может быть много Publisher и одновременно много Subscriber. Вообще в него лучшее отправлять сообщения через какой-то буфер вроде Queue потому что у него есть ограничения на количество сообщения в очереди и Publisher ждет пока все Subscriber не получат сообщения в свою очередь сообщений и если она переполнена то может и зависнуть.

val topic: Topic[F, WebSocketFrame],

Тут еще я считаю количество сообщений которые были переданы в чат как номер каждого сообщения. Так как это мне нужно делать из разных потоков я использовал аналог Atomic который тут называется Ref и гарантирует атомарность операции.

  private val ref: Ref[F, Int]

Обработка потока сообщений от пользователей.

  private def handle(stream: Stream[F, WebSocketFrame]): Stream[F, Unit] = 
    stream
//Достаем из фрейма текстовое сообщение и фильтруем фреймы. 
    .collect({
      case WebSocketFrame.Text(text, _) => text
    })
//Атомарно увеличиваем наш счетчик с сохранением нового значения и добавления его значения к тексту сообщения пользователя.
    .evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
//Каждое пришедшее сообщение отправляем в топик
    .through(topic.publish)

Собственно сама логика создания сокета.

private def logic(): F[Response[F]] = {
//Откуда получать данные для клиента.
    val toClient: Stream[F, WebSocketFrame] =
//Просто подписываемся на данные которые будут приходить в топик
      topic.subscribe(1000)
//Что будем делать с данными которые приходить от клиента
    val fromClient: Pipe[F, WebSocketFrame, Unit] =
//Просто отправляем данные в топик после обработки
      handle
//Создаем веб сокет с созданными ранее генератором и потребителем данных.
    WebSocketBuilder[F].build(toClient, fromClient)
  }

Связываем наш сокет с роутом на сервере (ws://localhost:8080/chat)

def routeWs: HttpRoutes[F] = {
    HttpRoutes.of[F] {
      case GET -> Root / "chat" => logic()
    }
  }

Собственно на этом все. Дальше уже можно запускать сервер с этим роутом. Мне еще захотелось какую ни какую документацию сделать. Вообще для документирования WebSocket и прочего основанного на событиях взаимодействия вроде RabbitMQ AMPQ есть AsynAPI но под Tapir там нет ничего поэтому просто сделал для Swagger описание эндпойнта как GET запрос. Работать он конечно не будет. Точнее 501 ошибку будет возвращать зато будет отображаться в Swagger

  val endpointWs: Endpoint[String, Unit, String, fs2.Stream[F, Byte]] = endpoint
    .get
    .in("chat")
    .tag("WebSockets")
    .summary("Подключится к общему чату. Например по такому адресу: ws://localhost:8080/chat")
    .description("Подключает к общему чату")
    .in(
      stringBody
        .description("Сообщение которое будет отправлено пользователям в чате")
        .example("Привет!")
    )
    .out(
      stringBody
        .description("Сообщение которое кто-то написал в чат")
        .example("6 : Сообщение от клиента с Id подключения f518a53d: Привет!")
    )

В самом сваггере это выглядит вот так



Подключаем наш чат к нашему серверу API

    todosController = new TodosController()
    imagesController = new ImagesController()
//Создаем объект нашего чата
    chatHub <- Resource.liftF(ChatHub[IO]())
    endpoints = todosController.endpoints ::: imagesController.endpoints
//Добавляем его эндпойнт в документацию Swagger
    docs = (chatHub.endpointWs :: endpoints).toOpenAPI("The Scala Todo List", "0.0.1")
    yml: String = docs.toYaml
//Добавляем его маршрут в список маршрутов приложения
    routes = chatHub.routeWs <+>
      endpoints.toRoutes <+>
      new SwaggerHttp4s(yml, "swagger").routes[IO]
    httpApp = Router(
      "/" -> routes
    ).orNotFound
    blazeServer <- BlazeServerBuilder[IO](serverEc)
      .bindHttp(settings.host.port, settings.host.host)
      .withHttpApp(httpApp)
      .resource

Подключаемся к чату крайне простым скриптом.

    <script>
        const id = `f${(~~(Math.random() * 1e8)).toString(16)}`;
        const webSocket = new WebSocket('ws://localhost:8080/chat');

        webSocket.onopen = event => {
            alert('onopen ');
        };

        webSocket.onmessage = event => {
            console.log(event);
            receive(event.data);
        };

        webSocket.onclose = event => {
            alert('onclose ');
        };

        function send() {
            let text = document.getElementById("message");
            webSocket.send(`Сообщение от клиента с Id подключения ${id}: ${text.value}`);
            text.value = '';
        }

        function receive(m) {
            let text = document.getElementById("chat");
            text.value = text.value + '\n\r' + m;
        }
    </script>

На этом собственно все. Надеюсь кому-то кто тоже изучает скала будет интересна эта статья а может даже полезна.
Теги:
Хабы:
Всего голосов 8: ↑4 и ↓40
Комментарии1

Публикации

Истории

Работа

Scala разработчик
10 вакансий

Ближайшие события

2 – 18 декабря
Yandex DataLens Festival 2024
МоскваОнлайн
11 – 13 декабря
Международная конференция по AI/ML «AI Journey»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань