Pull to refresh

Реактивная работа с Bluetooth в реальных условиях

Reading time 11 min
Views 11K

Небольшое предисловие, или в чем же боль


В последнее время я активно работаю над приложениями, которые имеют модули работы с Bluetooth по не-очень-хорошо спроектированным протоколам с кастомными устройствами, что периодически добавляет мне интересных угу, как же проблем.


Поскольку я искренний фанат реактивности в приложениях, то такие проблемы приходилось решать собственными силами, поскольку решений в сети просто нет. Совсем. О получившейся архитектуре работы с Bluetooth-устройствами я и хотел бы вам рассказать.


Опасности на пути джедая


Первый важный момент, о котором должен помнить разработчик, при работе с Bluetooth – пакеты могут повреждаться по пути. А еще – они могут сопровождаться шумом. И это не один случай из миллиона, подобные явления могут встречаться довольно часто, и их нужно обрабатывать. Еще блютус может отключиться, или не подключиться, или сделать вид что подключился, но на самом то деле мы знаем, что это ничего не значит...


В качестве примера решения этих задач, спроектируем микро-фреймворк для процессинга эвентов, которые детерминируются по типам с помощью шапки (первые N байт) и валидируются с помощью какой-нибудь простенькой чек-суммы. Для того, чтобы не загромождать код, примем допущение, что шапка по протоколу имеет фиксированный размер. Все пакеты же разделим на два типа: с фиксированной длиной, и с динамической, передаваемой отдельным байтом.


Проектирование


Начнем с описания возможных эвентов в приложении. Итак, общая абстракция будет выглядеть примерно так, с учетом принятых ограничений:


sealed class Event {

    val headSize: Int = 2

    abstract val head: ByteArray

    abstract fun isCorrupted(): Boolean

    //To be continued
}

Далее, когда мы определили наборы постоянных свойств для всех пакетов, требуется как-либо формализовать условия, при которых мы:


  1. Посчитаем, что пакет принадлежит какому-либо типу
  2. Должны добавить в буффер байт, так как пока что пакет не собирается
  3. Должны грохнуть буффер, так как какие-либо условия для его сборки не выполнились (этот пункт нужен скорее для подстраховки, лучше добавить туда логи во время тестирования приложения, чтобы проверять полноту остальных условий)
  4. Пробуем собрать пакет из буффера и проверяем его валидность

Данные четыре условия приводят нас к интерфейсу следующего вида:


interface EventMatcher {

    val headSize: Int

    fun matches(packet: ByteBuffer): Boolean

    fun create(packet: ByteBuffer): Event

    fun shouldBuffer(packet: ByteBuffer): Boolean

    fun shouldDrop(packet: ByteBuffer): Boolean
}

Создадим компонент, который будет предоставлять сказал бы, что удобный, но это оставлю на ваше усмотрение прокси-интерфейс к нашим матчерам для всех существующих типов, ничего выдающегося, код под катом:


Прокси-матчер
class EventMatchersAdapter {

    private val matchers = mutableMapOf<KClass<out Event>, EventMatcher>()

    fun register(event: KClass<out Event>, matcher: EventMatcher)
            = apply { matchers.put(event, matcher) }

    fun unregister(event: KClass<out Event>)
            = apply { matchers.remove(event) }

    fun knownEvents(): List<KClass<out Event>>
            = matchers.keys.toList()

    fun matches(packet: ByteBuffer, event: KClass<out Event>): Boolean
            = matchers[event]?.matches(packet) ?: false

    fun shouldBuffer(packet: ByteBuffer, event: KClass<out Event>): Boolean
            = matchers[event]?.shouldBuffer(packet) ?: false

    fun shouldDrop(packet: ByteBuffer, event: KClass<out Event>): Boolean
            = matchers[event]?.shouldDrop(packet) ?: false

    fun create(packet: ByteBuffer, event: KClass<out Event>): Event?
            = matchers[event]?.create(packet)
}

В пакетах опишем способ определения того, был данный пакет поврежден или нет. Это довольно удобный подход, который позволяет не сильно страдать из-за плохо спроектированного протокола, в котором инженеру вздумалось закинуть вам сотню способов проверки пакетов на корректность, для каждого по несколько.


Пример пакета с фиксированной длиной
data class A(override val head: ByteArray,
                 val payload: ByteArray,
                 val checksum: Byte): Event() {

        companion object {

            //(two bytes of head) + (2 bytes of payload) + (byte of checksum)
            @JvmStatic val length = 5.toByte()

            @JvmStatic val headValue = byteArrayOf(0x00, 0x00)

            @JvmStatic val matcherValue = object: EventMatcher {
                override val headSize: Int = 2

                override fun matches(packet: ByteBuffer): Boolean {
                    if(packet.position() == 0) return true
                    if(packet.position() == 1) return packet[0] == headValue[0]

                    return packet[0] == headValue[0]
                            && packet[1] == headValue[1]
                }

                override fun create(packet: ByteBuffer): A {
                    packet.rewind()

                    return A(
                            ByteArray(2, { packet.get() }),
                            ByteArray(2, { packet.get() }),
                            packet.get()
                    )
                }

                override fun shouldBuffer(packet: ByteBuffer): Boolean
                        = packet.position() < length

                override fun shouldDrop(packet: ByteBuffer): Boolean
                        = packet.position() > length
            }
        }

        override fun isCorrupted(): Boolean = checksumOf(payload) != checksum

        override fun equals(other: Any?): Boolean {
            if(other as? A == null) return false

            other as A

            return Arrays.equals(head, other.head)
            && Arrays.equals(payload, other.payload)
            && checksum == other.checksum
        }

        override fun hashCode(): Int {
            var result = Arrays.hashCode(head)
            result = result * 31 + Arrays.hashCode(payload)
            result = result * 31 + checksum.hashCode()
            return result
        }
    }

Пример пакета с динамической длиной
data class C(override val head: ByteArray,
                 val length: Byte,
                 val payload: ByteArray,
                 val checksum: Byte): Event() {

        companion object {

            @JvmStatic val headValue = byteArrayOf(0x01, 0x00)

            @JvmStatic val matcherValue = object: EventMatcher {
                override val headSize: Int = 2

                override fun matches(packet: ByteBuffer): Boolean {
                    if(packet.position() == 0) return true
                    if(packet.position() == 1) return packet[0] == headValue[0]

                    return packet[0] == headValue[0]
                        && packet[1] == headValue[1]
                }

                override fun create(packet: ByteBuffer): C {
                    packet.rewind()

                    val msb = packet.get()
                    val lsb = packet.get()
                    val length = packet.get()

                    return C(
                            byteArrayOf(msb, lsb),
                            length,
                            packet.take(3, length.toPositiveInt()),
                            packet.get()
                    )
                }

                override fun shouldBuffer(packet: ByteBuffer): Boolean
                        = when(packet.position()) {
                    in 0..2 -> true
                    else -> packet.position() < (packet[2].toPositiveInt() + 4) //increase by (2 bytes of head) + (1 byte of length) + (1 byte of checksum)
                }

                override fun shouldDrop(packet: ByteBuffer): Boolean
                        = when(packet.position()) {
                    in 0..2 -> false
                    else -> packet.position() > (packet[2].toPositiveInt() + 4) //increase by (2 bytes of head) + (1 byte of length) + (1 byte of checksum)
                }
            }
        }

        override fun isCorrupted(): Boolean = checksumOf(payload) != checksum

        override fun equals(other: Any?): Boolean {
            if(other as? C == null) return false

            other as C

            return Arrays.equals(head, other.head)
                    && length == other.length
                    && Arrays.equals(payload, other.payload)
                    && checksum == other.checksum
        }

        override fun hashCode(): Int {
            var result = Arrays.hashCode(head)
            result = result * 31 + length.hashCode()
            result = result * 31 + Arrays.hashCode(payload)
            result = result * 31 + checksum.hashCode()
            return result
        }
    }

Далее – от нас требуется описать сам алгоритм считывания пакетов, причем такой, который будет:


  1. Поддерживать несколько различных типов
  2. Разруливать повреждения пакетов за нас
  3. Будет дружить с Flowable

Реализация алгоритма скрытого за Subscriber интерфейсом:


class EventsBridge(private val adapter: EventMatchersAdapter,
                            private val emitter: FlowableEmitter<Event>,
                            private val bufferSize: Int = 128): DisposableSubscriber<Byte>() {

    private val buffers: Map<KClass<out Event>, ByteBuffer>
            = mutableMapOf<KClass<out Event>, ByteBuffer>()
            .apply {
                for(knownEvent in adapter.knownEvents()) {
                    put(knownEvent, ByteBuffer.allocateDirect(bufferSize))
                }
            }
            .toMap()

    override fun onError(t: Throwable) {
        emitter.onError(t)
    }

    override fun onComplete() {
        emitter.onComplete()
    }

    override fun onNext(t: Byte) {
        for((key, value) in buffers) {
            value.put(t)

            adapter.knownEvents()
                    .filter { it == key }
                    .forEach {
                        if (adapter.matches(value, it)) {
                            when {
                                adapter.shouldDrop(value, it) -> {
                                    value.clear()
                                }
                                !adapter.shouldBuffer(value, it) -> {
                                    val event = adapter.create(value, it)
                                    if (!emitter.isCancelled
                                            && event != null
                                            && !event.isCorrupted()) {
                                        release()
                                        emitter.onNext(event)
                                    } else {
                                        value.clear()
                                    }
                                }
                            }
                        } else {
                            value.clear()
                        }
                    }
        }
    }

    private fun release() {
        for(buffer in buffers) buffer.value.clear()
    }
}

Использование


Рассмотрим на примере прогонки unit-тестов:


Простенький тест на работу для одного типа пакета
@Test
    fun test_single_fixedLength() {
        val adapter = EventMatchersAdapter()
                .register(Event.A::class, Event.A.matcherValue)

        val packetA = generateCorrectPacketA()
        val testSubscriber = TestSubscriber<Event>()

        Flowable.create<Event>(
                { emitter ->
                    val bridge = EventsBridge(adapter, emitter)
                    Flowable.create<Byte>({ byteEmitter -> for(byte in packetA) { byteEmitter.onNext(byte) } }, BackpressureStrategy.BUFFER).subscribe(bridge)
                },
                BackpressureStrategy.BUFFER
        )
                .subscribe(testSubscriber)

        testSubscriber.assertNoErrors()
        testSubscriber.assertValue { event ->
            event is Event.A && !event.isCorrupted()
        }
    }

Тест с кучей шума, несколькими типами пакетов
@Test
    fun test_multiple_dynamicLength_mixed_withNoise() {
        val adapter = EventMatchersAdapter()
                .register(Event.C::class, Event.C.matcherValue)
                .register(Event.D::class, Event.D.matcherValue)

        val packetC1 = generateCorrectPacketC()
        val packetD1 = generateCorrectPacketD()
        val packetD2 = generateCorruptedPacketD()
        val packetC2 = generateCorruptedPacketC()

        val testSubscriber = TestSubscriber<Event>()

        val random = Random()

        Flowable.create<Event>(
                { emitter ->
                    val bridge = EventsBridge(adapter, emitter)
                    Flowable.create<Byte>({ byteEmitter ->
                        for(b in 0..100)      { byteEmitter.onNext(random.nextInt().toByte()) }
                        for(byte in packetC1) { byteEmitter.onNext(byte) }
                        for(b in 0..100)      { byteEmitter.onNext(random.nextInt().toByte()) }
                        for(byte in packetD1) { byteEmitter.onNext(byte) }
                        for(b in 0..100)      { byteEmitter.onNext(random.nextInt().toByte()) }
                        for(byte in packetD2) { byteEmitter.onNext(byte) }
                        for(b in 0..100)      { byteEmitter.onNext(random.nextInt().toByte()) }
                        for(byte in packetC2) { byteEmitter.onNext(byte) }
                        for(b in 0..100)      { byteEmitter.onNext(random.nextInt().toByte()) }
                    }, BackpressureStrategy.BUFFER).subscribe(bridge)
                },
                BackpressureStrategy.BUFFER
        )
                .subscribe(testSubscriber)

        testSubscriber.assertNoErrors()
        testSubscriber.assertValueCount(2)
    }

Генерация пакетов для тестов
private fun generateCorrectPacketB(): ByteArray {
        val rnd = Random()

        val payload = byteArrayOf(
                rnd.nextInt().toByte(),
                rnd.nextInt().toByte(),
                rnd.nextInt().toByte(),
                rnd.nextInt().toByte()
        )

        return byteArrayOf(
                Event.B.headValue[0],
                Event.B.headValue[1],
                payload[0],
                payload[1],
                payload[2],
                payload[3],
                checksumOf(payload)
        )
    }

    private fun generateCorrectPacketC(): ByteArray {
        val rnd = Random()

        val payload = List(rnd.nextInt(16), { index ->
            rnd.nextInt().toByte()
        }).toByteArray()

        return ByteArray(4 + payload.size, { index ->
            when(index) {
                0 -> Event.C.headValue[0]
                1 -> Event.C.headValue[1]
                2 -> payload.size.toByte()
                in 3..(4 + payload.size - 2) -> payload[index - 3]
                4 + payload.size - 1 -> checksumOf(payload)
                else -> 0.toByte()
            }
        })
    }

private fun generateCorruptedPacketB(): ByteArray {
        val rnd = Random()

        val payload = byteArrayOf(
                rnd.nextInt().toByte(),
                rnd.nextInt().toByte(),
                rnd.nextInt().toByte(),
                rnd.nextInt().toByte()
        )

        return byteArrayOf(
                Event.B.headValue[0],
                Event.B.headValue[1],
                payload[0],
                payload[1],
                payload[2],
                payload[3],
                (checksumOf(payload) + 1.toByte()).toByte()
        )
    }

    private fun generateCorruptedPacketC(): ByteArray {
        val rnd = Random()

        val payload = List(rnd.nextInt(16), { _ -> rnd.nextInt().toByte() }).toByteArray()

        return ByteArray(4 + payload.size, { index ->
            when(index) {
                0 -> Event.C.headValue[0]
                1 -> Event.C.headValue[1]
                2 -> payload.size.toByte()
                in 3..(4 + payload.size - 2) -> payload[index - 3]
                else -> (checksumOf(payload) + 1.toByte()).toByte()
            }
        })
    }

Простенькая чексумма, использованная для тестирования
inline fun checksumOf(data: ByteArray): Byte {
    var result = 0x00.toByte()
    for(b in data) {
        result = (result + b).toByte()
    }

    return (result.inv() + 1.toByte()).toByte()
}

И зачем все это было нужно?


На этом примере, мне хотелось бы показать, как легко и непринужденно можно поддерживать модульность при обработке почти что произвольных событий, к слову, не обязательно пришедших из Bluetooth источника (никакого Bluetooth-зависимого кода пока-что не было), при этом избегая возможных повреждений пакетов и зашумления канала связи.


И что дальше?


Сделаем небольшую обертку над RxBluetooth, которая позволит нам в реактивном стиле работать с различными подключениями, слушая различные наборы эвентов.


Весь код условно можно разделить на три набора компонент: два сервиса и один репозиторий.
Сервисы будут у нас предоставлять подключение и работу с данными по подключению соответственно, а репозиторий – предоставлять абстракцию для работы с конкретными подключениями и выступать в роли неявного flyweight-а подключений.


Интерфейсы будут примерно следующими:


interface ConnectivityService {

    fun sub(service: UUID): Observable<DataService>
}

interface DataService {

    fun sub(): Flowable<Event>

    fun write(data: ByteArray): Boolean

    fun dispose()
}

interface DataRepository {

    fun sub(serviceUUID: UUID): Flowable<Event>

    fun write(serviceUUID: UUID, data: ByteArray): Flowable<Boolean>

    fun dispose()
}

И, соответственно, реализации под катом


ConnectivityServiceImpl
class ConnectivityServiceImpl(private val bluetooth: RxBluetooth,
                              private val events: EventMatchersAdapter,
                              private val timeoutSeconds: Long = 15L): ConnectivityService {

    override fun sub(service: UUID): Observable<DataService> = when(bluetooth.isBluetoothEnabled && bluetooth.isBluetoothAvailable) {
        false -> Observable.empty()
        else -> {
            ensureBluetoothNotDiscovering()
            bluetooth.startDiscovery()
            bluetooth.observeDevices()
                    .filter { device -> device.uuids.contains(ParcelUuid(service)) }
                    .timeout(timeoutSeconds, TimeUnit.SECONDS)
                    .take(1)
                    .doOnNext { _ -> ensureBluetoothNotDiscovering() }
                    .doOnError { _ -> ensureBluetoothNotDiscovering() }
                    .doOnComplete {  -> ensureBluetoothNotDiscovering() }
                    .flatMap { device -> bluetooth.observeConnectDevice(device, service) }
                    .map { connection -> DataServiceImpl(BluetoothConnection(connection), events) }
        }
    }

    private fun ensureBluetoothNotDiscovering() {
        if(bluetooth.isDiscovering) {
            bluetooth.cancelDiscovery()
        }
    }
}

DataServiceImpl
class DataServiceImpl constructor(private val connection: BluetoothConnection,
                                  private val adapter: EventMatchersAdapter): DataService {

    override fun sub(): Flowable<Event> = Flowable.create<Event>({ emitter ->
        val underlying = EventsBridge(adapter = adapter, emitter = emitter)

        emitter.setDisposable(object: MainThreadDisposable() {
            override fun onDispose() {
                if(!underlying.isDisposed) {
                    underlying.dispose()
                }
            }
        })

        connection.observeByteStream().subscribe(underlying)
    }, BackpressureStrategy.BUFFER)

    override fun write(data: ByteArray): Boolean
         = connection.send(data)

    override fun dispose()
        = connection.closeConnection()
}

DataRepositoryImpl
class DataRepositoryImpl(private val connectivity: ConnectivityService): DataRepository {

    private val services = ConcurrentHashMap<UUID, DataService>()

    override fun sub(serviceUUID: UUID): Flowable<Event>
            = serviceOf(serviceUUID)
            .flatMap { service -> service.sub() }

    override fun write(serviceUUID: UUID, data: ByteArray): Flowable<Boolean>
            = serviceOf(serviceUUID)
            .map { service -> service.write(data) }

    override fun dispose() {
        for((_, service) in services) {
            service.dispose()
        }
    }

    private fun serviceOf(serviceUUID: UUID): Flowable<DataService> = with(services[serviceUUID]) {
        when(this) {
            null -> connectivity.sub(serviceUUID).doOnNext { service -> services.put(serviceUUID, service) }.toFlowable(BackpressureStrategy.BUFFER)
            else -> Flowable.just(this)
        }
    }
}

И таким образом, в минимальное количество строк, мы получаем возможность делать то, что обычно растягивалось в жуткие цепочки вызовов, или коллбэк-хэлл примерно следующим образом:


repository.sub(UUID.randomUUID())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe { event ->
                    when(event) {
                        is Event.A -> doSomeStuffA(event)
                        is Event.B -> doSomeStuffB(event)
                        is Event.C -> doSomeStuffC(event)
                        is Event.D -> doSomeStuffD(event)
                    }
                }

11 строк для прослушки четырех событий от произвольного устройства, неплохо, не правда ли?)


Вместо заключения


Если у кого-то из читающих возникнет желание посмотреть на исходники – они лежат здесь.


Если кому-то захочется посмотреть, как впишутся другие правила для образования пакетов из сырых байт – пишите, попробуем добавить.


UPD: оформил в микро-фреймворк с опцинальными мостами в ReactiveX, корутины, а также чистой реализацией на Kotlin.

Tags:
Hubs:
+12
Comments 6
Comments Comments 6

Articles