Как стать автором
Обновить

Комментарии 7

Спасибо, интересно. Такие вопросы появились:

  • Может лучше стоило назвать параметр isOpen как нибудь более понятно, к примеру val isConnectedOrConnecting: Boolean ?

  • Может вообще в статусе ConnectState стоило сделать три состояния - Connected, Connecting, Disconnected ?

shouldBeOpened тогда уж.

Спасибо большое за комментарий! Как написал nin-jin shouldBeOpened больше подходит по смыслу, но мне больше нравится идея с доп. состояниями для ConnectState. На досуге перепишу, в комментариях скину чего получилось.

В "override val incoming()" разве вторая "if (isClosed){} " будет выполняется? По идее при первой уже должно в ретурн уходить.

Спасибо за вопрос. Я тоже надеялся, что так будет работать, но то что webSocketSession suspend функция заставило меня задуматься и проверить. Открытие сокета действительно требует время и при плохих условиях (частая смена сокета, плохой интернет, возможно еще что-то) сокет не будет закрыт, поэтому и потребовалась доп. проверка

Сегодня и я задался такой задачкой. И как истинный джун не полез искать костыли, а сделал свой.

Сам сокет выглядит вот так:

package org.example

import io.ktor.client.*
import io.ktor.client.plugins.websocket.*
import io.ktor.websocket.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.flow.*
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json

abstract class WebSocketStream<I : Any, O>(
    private val client: HttpClient,
    private val workScope: CoroutineScope,
    private val urlString: String,
    val encoder: (O) -> String,
    private val decoder: (String) -> I,
) {

    companion object {

        inline operator fun <reified I : Any, reified O> invoke(
            client: HttpClient,
            workScope: CoroutineScope,
            urlString: String,
            noinline encoder: (O) -> String = { defaultSerializer.encodeToString(it) },
            noinline decoder: (String) -> I = { defaultSerializer.decodeFromString(it) }
        ): WebSocketStream<I, O> {
            return object : WebSocketStream<I, O>(
                client = client,
                workScope = workScope,
                urlString = urlString,
                encoder = encoder,
                decoder = decoder
            ){}
        }

        val defaultSerializer = Json {
            ignoreUnknownKeys = true
            allowSpecialFloatingPointValues = true
        }

    }


    sealed interface Output<out O> {
        data class Income<T : Any>(val data: T) : Output<T>
        data class Something<T : Any>(val frame: Frame) : Output<T>
        data class Error(val e: Throwable) : Output<Nothing>
    }


    sealed interface Status {
        data object Connect : Status
        data object Connected : Status
        data object Disconnect : Status
    }

    private val _stream = MutableSharedFlow<Output<I>>()
    val output = _stream.asSharedFlow()

    private val input = Channel<String>()

    private val _status = MutableStateFlow<Status>(Status.Disconnect)
    val status = _status.asStateFlow()

    private var job: Job? = null


    fun connectWhile(
        keepOpen: () -> Boolean
    ) {
        job?.cancel()
        job = workScope.launch {
            while (keepOpen()) {
                runCatching {
                    _status.value = Status.Connect
                    connectSuspend(keepOpen)
                    _status.value = Status.Disconnect
                }.onFailure {
                    _stream.emit(Output.Error(it))
                }
                if (keepOpen()) {
                    delay(1000)
                }
            }
        }
    }

    private suspend fun connectSuspend(keepOpen: () -> Boolean) {
        client.webSocket(urlString) {

            val connectJob = launch {

                incoming.receiveAsFlow()
                    .onStart {
                        _status.value = Status.Connected
                    }.onEach { message ->
                        when (message) {
                            is Frame.Text -> {
                                val rad = message.readText()
                                try {
                                    _stream.emit(Output.Income(decoder(rad)))
                                } catch (e: Exception) {
                                    _stream.emit(Output.Error(e))
                                }
                            }

                            else -> {
                                _stream.emit(Output.Something(message))
                            }
                        }
                    }.launchIn(this)

                input.consumeEach {
                    runCatching {
                        send(Frame.Text(it))
                    }
                }

            }

            while (keepOpen()) delay(1000)
            _status.value = Status.Disconnect
            connectJob.cancel()
            close(reason = CloseReason(CloseReason.Codes.NORMAL, "leave"))
        }

    }

    private suspend fun send(data: String) {
        return input.send(data)
    }

    fun send(
        obj: O
    ) = workScope.launch { send(encoder(obj)) }


}

А пользоваться этим примерно так:

// Есть какой то класс оболочка на прием, а можно и без, всего то заменяем decoder
@Serializable
data class Data(
    @SerialName("data")
    val data: String
) : SerializedAny()

// Есть какой то класс оболочка на отправку, а можно и без, всего то заменяем encoder
@Serializable
data class Output(
    @SerialName("data")
    val data: String
)

private suspend fun open(client: HttpClient, text: String) {
    // Создаем наш сокет
  
     val stream = WebSocketStream<Data, Output>(
        client = client,
        workScope = CoroutineScope(Dispatchers.IO),
        urlString = "ws://localhost:8078/ws"
    )

    // вешаем слушателей
    CoroutineScope(Dispatchers.IO).launch {
        stream.output
            .onEach {
                println("$text $it")
            }.launchIn(this)

        stream.status
            .onEach {
                println(it)
            }.launchIn(this)
    }

    // флаг, что нужно продолжать работу
    var keep = true

    // сокет с некоторой переодчностью сам спрашивает, нужно ли ему работать дальше
    // если связь оборвется, то будет пробовать переподключться
    stream.connectWhile { keep }

    // Тут мы отправляем данные, в ответ получаем Job. Можно ждать завершения посылки, а можно и не ждать
    repeat(5) {
        // job создается от workScope. Если workScope закрыть, то и производные закроются.
        stream.send(Output(text)).join()
        delay(1000)
    }

    // В конце говорим что продолжать не нужно и сокет сам отключится.
    keep = false
}

После некоторых раздумий над отправкой данных и обработок ошибок при отправке решил немного доработать. Всего то заменить.

    // добавить
    private data class Input(
        val data: String,
        val statusReceiver: (SendStatus) -> Unit
    )

..............
   // заменить
   private val input = Channel<Input>()
..............
  // заменить
  input.consumeEach { income ->
      runCatching {
          send(Frame.Text(income.data))
          workScope.launch { income.statusReceiver(SendStatus.OK) }
      }.onFailure {
          workScope.launch { income.statusReceiver(SendStatus.Error(it)) }
      }
  }
....................
    // заменить и лишний удалить.
    fun send(
        obj: O,
        statusHandler: (SendStatus) -> Unit = {}
    ) = workScope.launch {
        input.send(Input(encoder(obj), statusHandler))
    }

Так отправляющий сможет не только отправить данные, но и обработать ошибку

Зарегистрируйтесь на Хабре, чтобы оставить комментарий

Публикации