
Всем доброго дня! С вами Анна Жаркова, ведущий мобильный разработчик компании Usetech. Продолжаем рассматривать способы многопоточный работы в Kotlin Native.
В предыдущей части мы посмотрели некоторые нюансы работы с корутинами, как работать с Worker и AtomicReference.
Еще одним возможным API для работы с многопоточностью является DetachedObjectGraph.
//код Supranatural concurrency override fun <T, V> execute(jobInput: T, job: (T) -> V): Future<V> { val deferred = DeferredFuture<V>() val detached = DetachedObjectGraph { Triple(jobInput, job, deferred).toImmutable() }.asCPointer() dispatch_async_f(dispatch_get_main_queue(), detached, staticCFunction { it: COpaquePointer? -> initRuntimeIfNeeded() val attached = DetachedObjectGraph<Triple<T, (T) -> V, DeferredFuture<V>>>(it).attach() val result = attached.second(attached.first) attached.third.setValue(result) }) return deferred }
Чем оно примечательно, так тем, что это способ передавать объекты между потоками без заморозки. За счет чего это достигается:
1. Входные параметры, ссылки на блоки и коллбэки, которые надо вызвать и передать в другом потоке, кладутся в качестве общего содержимого вовнутрь DetachedObjectGraph.
2. Затем получаем указатель на открепленный подграф с нашим объектом с помощью asCPointer. И уже внутри нужного потока вызываем staticCFunction, где в качестве параметра работаем с нашим указателем.
3. Для того, чтобы извлечь из графа параметры, надо его прикрепить с помощью команды attach, извлечь упакованные данные и преобразовать нужным способом.
4. А вот вернуть коллбэк нужно вне этого потока.
Важно! Attach открепленного объекта можно вызвать один раз. Иначе можно получить исключение IllegalStateException: Illegal transfer state. Поэтому ссылка на граф должна зануляться после окончания работы блока.
Также при упаковке объекта полезно указать TransferMode.SAFE. Но в API DetachedObjectGraph этот параметр используется по умолчанию.
Для удобства можно сделать обертку, которая позволит работать с изменяемыми элементами в разных потоках, соединив и worker, и AtomicReference, и DetachedGraphObject:
class SharedDetachedObject<T:Any>(producer: () -> T) { private val adog : AtomicReference<DetachedObjectGraph<Any>?> private val lock = Any() init { val detachedObjectGraph = DetachedObjectGraph { producer() as Any }.freeze() adog = AtomicReference(detachedObjectGraph.freeze()) } fun <R> access(block: (T) -> R): R = trySynchronized(lock){ val holder = FreezableAtomicReference<Any?>(null) val producer = { grabAccess(holder, block) as Any } adog.value = DetachedObjectGraph(TransferMode.SAFE, producer).freeze() val result = holder.value!! holder.value = null result as R } private fun <R> grabAccess(holder:FreezableAtomicReference<Any?>, block: (T) -> R):T{ val attach = adog.value!!.attach() val t = attach as T holder.value = block(t) return t } }
Красиво и сложно, местами даже слишком.
Теперь рассмотрим, как наладить коммуникацию между контекстами и частями нативного запроса нашего клиента с помощью корутин. Для этого нам доступен следующий функционал:
•Channels
•Flows
•CompletableDeffered
Начнем с CompletaleDeferred. Данный механизм позволит нам awaitable, результат работы которого мы можем вернуть вместо callback в suspend функции:
class DefferedResponseReader: NSObject(), NSURLSessionDataDelegateProtocol { private var chunks = ByteArray(0).atomic() private var rawResponse = CompletableDeferred<Response>() suspend fun awaitResponse(): Response { return rawResponse.await().share() } override fun URLSession( session: NSURLSession, task: NSURLSessionTask, didCompleteWithError: NSError? ) { val response = task.response as NSHTTPURLResponse completed(response.statusCode,didCompleteWithError as? Error) } fun completed(code: Long, error: Error?) { val content = chunks.value.string() if (!rawResponse.isCompleted) { NSLog("completed: %s",content) rawResponse.complete(Response(code, content, error)) clearChunks() } else { NSLog("already completed:") } }
Так как такой код мы можем вызвать только в корутине, то модернизируем и запрос:
class HttpDefferedEngine { val engineJob = SupervisorJob() val engineScope: CoroutineScope = CoroutineScope(defaultDispatcher + engineJob) suspend fun request(request: Request): Response { val reader = DefferedResponseReader() val urlSession = NSURLSession.sessionWithConfiguration( NSURLSessionConfiguration.defaultSessionConfiguration, responseReader,//.share(), delegateQueue = NSOperationQueue.currentQueue() ) /*....**/ val task = urlSession.share().dataTaskWithRequest(urlRequest) engineScope.launch { task?.resume() } val response = responseReader.awaitResponse() return response } }
Т.к заморозка вызывается до отправки в скоуп и вынесена на более низкий уровень взаимодействия, то проблема, с которой мы столкнулись ранее, у нас уже решена, и исключения не возникает.
Теперь избавимся от атомарных ссылок. Для этого мы можем использовать следующее API:
private var chunks = Channel<ByteArray>(UNLIMITED) //либо private var chunksFlow = MutableStateFlow(ByteArray(0))
В случае channel модифицируем получение данных и отправку в ответе так:
suspend fun awaitResponse(): Response { var array = ByteArray(0) var response = rawResponse.await() chunks.consumeEach { array += it } response.content = array.string() return response.share() // помним о заморозке } private fun updateChunks(data: NSData) { val bytes = data.toByteArray() scope.launch { chunks.send(bytes) } }
В случае flow вот так:
suspend fun awaitResponse(): Response { var chunks = ByteArray(0) chunksFlow.onEach { chunks += it }.launchIn(scope) val response = rawResponse.await() response.content = chunks.string() return response.share() } private fun updateChunks(data: NSData) { val bytes = data.toByteArray().share() chunksFlow.tryEmit(bytes) }
Кстати, MutableStateFlow является приемлемой альтернативой MutableLiveData, которую мы спокойно можем использовать в Kotlin Native.
При вызове на стороне общего модуля проблем у нас вообще не возникнет:
class MoviesListViewModel() : BaseViewModel(ioDispatcher) { private val service = MoviesService.instance val moviesList: MutableStateFlow<List<MoviesItem>> = MutableStateFlow(emptyList()) fun loadMovies() { scope.launch { val result = service.loadMovies() moviesList.value = result.content?.results ?: arrayListOf() } }
Но вот прямо на стороне iOS (нативного приложения) вызов флоу выглядит странновато:
flow.collect(collector: <#T##Kotlinx_coroutines_coreFlowCollector#>, completionHandler: <#T##(KotlinUnit?, Error?) -> Void#>)
Необходимо реализовать специальный коллектор, с помощью которого собирать приходящие значения:
class Collector<T>: Kotlinx_coroutines_coreFlowCollector { let callback:(T) -> Void init(callback: @escaping (T) -> Void) { self.callback = callback } func emit(value: Any?, completionHandler: @escaping (KotlinUnit?, Error?) -> Void) { // do whatever you what with the emitted value callback(value as! T) //Значения иногда теряются completionHandler(KotlinUnit(), nil) } }
Однако, если сконфигурировать обработку неверно, сигналы могут теряться.
Поэтому на стороне IOS полезно сделать вот такую обработку Flow с помощью расширения-обертки на стороне общего модуля:
class AnyFlow<T>(source: Flow<T>): Flow<T> by source { fun collect(onEach: (T) -> Unit, onCompletion: (cause: Throwable?) -> Unit): Cancellable { val scope = CoroutineScope(SupervisorJob() + Dispatchers.Main) scope.launch { try { collect { onEach(it) } onCompletion(null) } catch (e: Throwable) { onCompletion(e) } } return object : Cancellable { override fun cancel() { scope.cancel() } } } }
Получаемый поток можно слушать, как обычный suspend на стороне iOS:
someFlow().collect { value-> // do with value } onCompletion { print("Completed") }
Получилось насыщенно. Какие-то вещи удобнее делать с помощью Kotlin Coroutines, какие-то проще делать без корутин, но нативным API Kotlin Native.
Остается сравнить с тем, что у нас появилось в новой модели управления памятью, о чем смотрите в следующей части.
И полезные ссылки:
Kotlin Multiplatform. Advanced multithreading by Anna Zharkova | KotLand
