Комментарии 7
Спасибо, интересно. Такие вопросы появились:
Может лучше стоило назвать параметр isOpen как нибудь более понятно, к примеру
val isConnectedOrConnecting: Boolean
?Может вообще в статусе
ConnectState
стоило сделать три состояния -Connected
,Connecting
,Disconnected
?
shouldBeOpened тогда уж.
Спасибо большое за комментарий! Как написал nin-jin shouldBeOpened
больше подходит по смыслу, но мне больше нравится идея с доп. состояниями для ConnectState.
На досуге перепишу, в комментариях скину чего получилось.

В "override val incoming()" разве вторая "if (isClosed){} " будет выполняется? По идее при первой уже должно в ретурн уходить.
Спасибо за вопрос. Я тоже надеялся, что так будет работать, но то что webSocketSession suspend функция заставило меня задуматься и проверить. Открытие сокета действительно требует время и при плохих условиях (частая смена сокета, плохой интернет, возможно еще что-то) сокет не будет закрыт, поэтому и потребовалась доп. проверка
Сегодня и я задался такой задачкой. И как истинный джун не полез искать костыли, а сделал свой.
Сам сокет выглядит вот так:
package org.example
import io.ktor.client.*
import io.ktor.client.plugins.websocket.*
import io.ktor.websocket.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.flow.*
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
abstract class WebSocketStream<I : Any, O>(
private val client: HttpClient,
private val workScope: CoroutineScope,
private val urlString: String,
val encoder: (O) -> String,
private val decoder: (String) -> I,
) {
companion object {
inline operator fun <reified I : Any, reified O> invoke(
client: HttpClient,
workScope: CoroutineScope,
urlString: String,
noinline encoder: (O) -> String = { defaultSerializer.encodeToString(it) },
noinline decoder: (String) -> I = { defaultSerializer.decodeFromString(it) }
): WebSocketStream<I, O> {
return object : WebSocketStream<I, O>(
client = client,
workScope = workScope,
urlString = urlString,
encoder = encoder,
decoder = decoder
){}
}
val defaultSerializer = Json {
ignoreUnknownKeys = true
allowSpecialFloatingPointValues = true
}
}
sealed interface Output<out O> {
data class Income<T : Any>(val data: T) : Output<T>
data class Something<T : Any>(val frame: Frame) : Output<T>
data class Error(val e: Throwable) : Output<Nothing>
}
sealed interface Status {
data object Connect : Status
data object Connected : Status
data object Disconnect : Status
}
private val _stream = MutableSharedFlow<Output<I>>()
val output = _stream.asSharedFlow()
private val input = Channel<String>()
private val _status = MutableStateFlow<Status>(Status.Disconnect)
val status = _status.asStateFlow()
private var job: Job? = null
fun connectWhile(
keepOpen: () -> Boolean
) {
job?.cancel()
job = workScope.launch {
while (keepOpen()) {
runCatching {
_status.value = Status.Connect
connectSuspend(keepOpen)
_status.value = Status.Disconnect
}.onFailure {
_stream.emit(Output.Error(it))
}
if (keepOpen()) {
delay(1000)
}
}
}
}
private suspend fun connectSuspend(keepOpen: () -> Boolean) {
client.webSocket(urlString) {
val connectJob = launch {
incoming.receiveAsFlow()
.onStart {
_status.value = Status.Connected
}.onEach { message ->
when (message) {
is Frame.Text -> {
val rad = message.readText()
try {
_stream.emit(Output.Income(decoder(rad)))
} catch (e: Exception) {
_stream.emit(Output.Error(e))
}
}
else -> {
_stream.emit(Output.Something(message))
}
}
}.launchIn(this)
input.consumeEach {
runCatching {
send(Frame.Text(it))
}
}
}
while (keepOpen()) delay(1000)
_status.value = Status.Disconnect
connectJob.cancel()
close(reason = CloseReason(CloseReason.Codes.NORMAL, "leave"))
}
}
private suspend fun send(data: String) {
return input.send(data)
}
fun send(
obj: O
) = workScope.launch { send(encoder(obj)) }
}
А пользоваться этим примерно так:
// Есть какой то класс оболочка на прием, а можно и без, всего то заменяем decoder
@Serializable
data class Data(
@SerialName("data")
val data: String
) : SerializedAny()
// Есть какой то класс оболочка на отправку, а можно и без, всего то заменяем encoder
@Serializable
data class Output(
@SerialName("data")
val data: String
)
private suspend fun open(client: HttpClient, text: String) {
// Создаем наш сокет
val stream = WebSocketStream<Data, Output>(
client = client,
workScope = CoroutineScope(Dispatchers.IO),
urlString = "ws://localhost:8078/ws"
)
// вешаем слушателей
CoroutineScope(Dispatchers.IO).launch {
stream.output
.onEach {
println("$text $it")
}.launchIn(this)
stream.status
.onEach {
println(it)
}.launchIn(this)
}
// флаг, что нужно продолжать работу
var keep = true
// сокет с некоторой переодчностью сам спрашивает, нужно ли ему работать дальше
// если связь оборвется, то будет пробовать переподключться
stream.connectWhile { keep }
// Тут мы отправляем данные, в ответ получаем Job. Можно ждать завершения посылки, а можно и не ждать
repeat(5) {
// job создается от workScope. Если workScope закрыть, то и производные закроются.
stream.send(Output(text)).join()
delay(1000)
}
// В конце говорим что продолжать не нужно и сокет сам отключится.
keep = false
}
После некоторых раздумий над отправкой данных и обработок ошибок при отправке решил немного доработать. Всего то заменить.
// добавить
private data class Input(
val data: String,
val statusReceiver: (SendStatus) -> Unit
)
..............
// заменить
private val input = Channel<Input>()
..............
// заменить
input.consumeEach { income ->
runCatching {
send(Frame.Text(income.data))
workScope.launch { income.statusReceiver(SendStatus.OK) }
}.onFailure {
workScope.launch { income.statusReceiver(SendStatus.Error(it)) }
}
}
....................
// заменить и лишний удалить.
fun send(
obj: O,
statusHandler: (SendStatus) -> Unit = {}
) = workScope.launch {
input.send(Input(encoder(obj), statusHandler))
}
Так отправляющий сможет не только отправить данные, но и обработать ошибку
Когда официальные гайды не такие и полезные или WebSocket на ktor