Небольшое предисловие, или в чем же боль
В последнее время я активно работаю над приложениями, которые имеют модули работы с Bluetooth по не-очень-хорошо спроектированным протоколам с кастомными устройствами, что периодически добавляет мне интересных угу, как же проблем.
Поскольку я искренний фанат реактивности в приложениях, то такие проблемы приходилось решать собственными силами, поскольку решений в сети просто нет. Совсем. О получившейся архитектуре работы с Bluetooth-устройствами я и хотел бы вам рассказать.
Опасности на пути джедая
Первый важный момент, о котором должен помнить разработчик, при работе с Bluetooth – пакеты могут повреждаться по пути. А еще – они могут сопровождаться шумом. И это не один случай из миллиона, подобные явления могут встречаться довольно часто, и их нужно обрабатывать. Еще блютус может отключиться, или не подключиться, или сделать вид что подключился, но на самом то деле мы знаем, что это ничего не значит...
В качестве примера решения этих задач, спроектируем микро-фреймворк для процессинга эвентов, которые детерминируются по типам с помощью шапки (первые N байт) и валидируются с помощью какой-нибудь простенькой чек-суммы. Для того, чтобы не загромождать код, примем допущение, что шапка по протоколу имеет фиксированный размер. Все пакеты же разделим на два типа: с фиксированной длиной, и с динамической, передаваемой отдельным байтом.
Проектирование
Начнем с описания возможных эвентов в приложении. Итак, общая абстракция будет выглядеть примерно так, с учетом принятых ограничений:
sealed class Event { val headSize: Int = 2 abstract val head: ByteArray abstract fun isCorrupted(): Boolean //To be continued }
Далее, когда мы определили наборы постоянных свойств для всех пакетов, требуется как-либо формализовать условия, при которых мы:
- Посчитаем, что пакет принадлежит какому-либо типу
- Должны добавить в буффер байт, так как пока что пакет не собирается
- Должны грохнуть буффер, так как какие-либо условия для его сборки не выполнились (этот пункт нужен скорее для подстраховки, лучше добавить туда логи во время тестирования приложения, чтобы проверять полноту остальных условий)
- Пробуем собрать пакет из буффера и проверяем его валидность
Данные четыре условия приводят нас к интерфейсу следующего вида:
interface EventMatcher { val headSize: Int fun matches(packet: ByteBuffer): Boolean fun create(packet: ByteBuffer): Event fun shouldBuffer(packet: ByteBuffer): Boolean fun shouldDrop(packet: ByteBuffer): Boolean }
Создадим компонент, который будет предоставлять сказал бы, что удобный, но это оставлю на ваше усмотрение прокси-интерфейс к нашим матчерам для всех существующих типов, ничего выдающегося, код под катом:
class EventMatchersAdapter { private val matchers = mutableMapOf<KClass<out Event>, EventMatcher>() fun register(event: KClass<out Event>, matcher: EventMatcher) = apply { matchers.put(event, matcher) } fun unregister(event: KClass<out Event>) = apply { matchers.remove(event) } fun knownEvents(): List<KClass<out Event>> = matchers.keys.toList() fun matches(packet: ByteBuffer, event: KClass<out Event>): Boolean = matchers[event]?.matches(packet) ?: false fun shouldBuffer(packet: ByteBuffer, event: KClass<out Event>): Boolean = matchers[event]?.shouldBuffer(packet) ?: false fun shouldDrop(packet: ByteBuffer, event: KClass<out Event>): Boolean = matchers[event]?.shouldDrop(packet) ?: false fun create(packet: ByteBuffer, event: KClass<out Event>): Event? = matchers[event]?.create(packet) }
В пакетах опишем способ определения того, был данный пакет поврежден или нет. Это довольно удобный подход, который позволяет не сильно страдать из-за плохо спроектированного протокола, в котором инженеру вздумалось закинуть вам сотню способов проверки пакетов на корректность, для каждого по несколько.
data class A(override val head: ByteArray, val payload: ByteArray, val checksum: Byte): Event() { companion object { //(two bytes of head) + (2 bytes of payload) + (byte of checksum) @JvmStatic val length = 5.toByte() @JvmStatic val headValue = byteArrayOf(0x00, 0x00) @JvmStatic val matcherValue = object: EventMatcher { override val headSize: Int = 2 override fun matches(packet: ByteBuffer): Boolean { if(packet.position() == 0) return true if(packet.position() == 1) return packet[0] == headValue[0] return packet[0] == headValue[0] && packet[1] == headValue[1] } override fun create(packet: ByteBuffer): A { packet.rewind() return A( ByteArray(2, { packet.get() }), ByteArray(2, { packet.get() }), packet.get() ) } override fun shouldBuffer(packet: ByteBuffer): Boolean = packet.position() < length override fun shouldDrop(packet: ByteBuffer): Boolean = packet.position() > length } } override fun isCorrupted(): Boolean = checksumOf(payload) != checksum override fun equals(other: Any?): Boolean { if(other as? A == null) return false other as A return Arrays.equals(head, other.head) && Arrays.equals(payload, other.payload) && checksum == other.checksum } override fun hashCode(): Int { var result = Arrays.hashCode(head) result = result * 31 + Arrays.hashCode(payload) result = result * 31 + checksum.hashCode() return result } }
data class C(override val head: ByteArray, val length: Byte, val payload: ByteArray, val checksum: Byte): Event() { companion object { @JvmStatic val headValue = byteArrayOf(0x01, 0x00) @JvmStatic val matcherValue = object: EventMatcher { override val headSize: Int = 2 override fun matches(packet: ByteBuffer): Boolean { if(packet.position() == 0) return true if(packet.position() == 1) return packet[0] == headValue[0] return packet[0] == headValue[0] && packet[1] == headValue[1] } override fun create(packet: ByteBuffer): C { packet.rewind() val msb = packet.get() val lsb = packet.get() val length = packet.get() return C( byteArrayOf(msb, lsb), length, packet.take(3, length.toPositiveInt()), packet.get() ) } override fun shouldBuffer(packet: ByteBuffer): Boolean = when(packet.position()) { in 0..2 -> true else -> packet.position() < (packet[2].toPositiveInt() + 4) //increase by (2 bytes of head) + (1 byte of length) + (1 byte of checksum) } override fun shouldDrop(packet: ByteBuffer): Boolean = when(packet.position()) { in 0..2 -> false else -> packet.position() > (packet[2].toPositiveInt() + 4) //increase by (2 bytes of head) + (1 byte of length) + (1 byte of checksum) } } } override fun isCorrupted(): Boolean = checksumOf(payload) != checksum override fun equals(other: Any?): Boolean { if(other as? C == null) return false other as C return Arrays.equals(head, other.head) && length == other.length && Arrays.equals(payload, other.payload) && checksum == other.checksum } override fun hashCode(): Int { var result = Arrays.hashCode(head) result = result * 31 + length.hashCode() result = result * 31 + Arrays.hashCode(payload) result = result * 31 + checksum.hashCode() return result } }
Далее – от нас требуется описать сам алгоритм считывания пакетов, причем такой, который будет:
- Поддерживать несколько различных типов
- Разруливать повреждения пакетов за нас
- Будет дружить с Flowable
Реализация алгоритма скрытого за Subscriber интерфейсом:
class EventsBridge(private val adapter: EventMatchersAdapter, private val emitter: FlowableEmitter<Event>, private val bufferSize: Int = 128): DisposableSubscriber<Byte>() { private val buffers: Map<KClass<out Event>, ByteBuffer> = mutableMapOf<KClass<out Event>, ByteBuffer>() .apply { for(knownEvent in adapter.knownEvents()) { put(knownEvent, ByteBuffer.allocateDirect(bufferSize)) } } .toMap() override fun onError(t: Throwable) { emitter.onError(t) } override fun onComplete() { emitter.onComplete() } override fun onNext(t: Byte) { for((key, value) in buffers) { value.put(t) adapter.knownEvents() .filter { it == key } .forEach { if (adapter.matches(value, it)) { when { adapter.shouldDrop(value, it) -> { value.clear() } !adapter.shouldBuffer(value, it) -> { val event = adapter.create(value, it) if (!emitter.isCancelled && event != null && !event.isCorrupted()) { release() emitter.onNext(event) } else { value.clear() } } } } else { value.clear() } } } } private fun release() { for(buffer in buffers) buffer.value.clear() } }
Использование
Рассмотрим на примере прогонки unit-тестов:
@Test fun test_single_fixedLength() { val adapter = EventMatchersAdapter() .register(Event.A::class, Event.A.matcherValue) val packetA = generateCorrectPacketA() val testSubscriber = TestSubscriber<Event>() Flowable.create<Event>( { emitter -> val bridge = EventsBridge(adapter, emitter) Flowable.create<Byte>({ byteEmitter -> for(byte in packetA) { byteEmitter.onNext(byte) } }, BackpressureStrategy.BUFFER).subscribe(bridge) }, BackpressureStrategy.BUFFER ) .subscribe(testSubscriber) testSubscriber.assertNoErrors() testSubscriber.assertValue { event -> event is Event.A && !event.isCorrupted() } }
@Test fun test_multiple_dynamicLength_mixed_withNoise() { val adapter = EventMatchersAdapter() .register(Event.C::class, Event.C.matcherValue) .register(Event.D::class, Event.D.matcherValue) val packetC1 = generateCorrectPacketC() val packetD1 = generateCorrectPacketD() val packetD2 = generateCorruptedPacketD() val packetC2 = generateCorruptedPacketC() val testSubscriber = TestSubscriber<Event>() val random = Random() Flowable.create<Event>( { emitter -> val bridge = EventsBridge(adapter, emitter) Flowable.create<Byte>({ byteEmitter -> for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) } for(byte in packetC1) { byteEmitter.onNext(byte) } for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) } for(byte in packetD1) { byteEmitter.onNext(byte) } for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) } for(byte in packetD2) { byteEmitter.onNext(byte) } for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) } for(byte in packetC2) { byteEmitter.onNext(byte) } for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) } }, BackpressureStrategy.BUFFER).subscribe(bridge) }, BackpressureStrategy.BUFFER ) .subscribe(testSubscriber) testSubscriber.assertNoErrors() testSubscriber.assertValueCount(2) }
private fun generateCorrectPacketB(): ByteArray { val rnd = Random() val payload = byteArrayOf( rnd.nextInt().toByte(), rnd.nextInt().toByte(), rnd.nextInt().toByte(), rnd.nextInt().toByte() ) return byteArrayOf( Event.B.headValue[0], Event.B.headValue[1], payload[0], payload[1], payload[2], payload[3], checksumOf(payload) ) } private fun generateCorrectPacketC(): ByteArray { val rnd = Random() val payload = List(rnd.nextInt(16), { index -> rnd.nextInt().toByte() }).toByteArray() return ByteArray(4 + payload.size, { index -> when(index) { 0 -> Event.C.headValue[0] 1 -> Event.C.headValue[1] 2 -> payload.size.toByte() in 3..(4 + payload.size - 2) -> payload[index - 3] 4 + payload.size - 1 -> checksumOf(payload) else -> 0.toByte() } }) } private fun generateCorruptedPacketB(): ByteArray { val rnd = Random() val payload = byteArrayOf( rnd.nextInt().toByte(), rnd.nextInt().toByte(), rnd.nextInt().toByte(), rnd.nextInt().toByte() ) return byteArrayOf( Event.B.headValue[0], Event.B.headValue[1], payload[0], payload[1], payload[2], payload[3], (checksumOf(payload) + 1.toByte()).toByte() ) } private fun generateCorruptedPacketC(): ByteArray { val rnd = Random() val payload = List(rnd.nextInt(16), { _ -> rnd.nextInt().toByte() }).toByteArray() return ByteArray(4 + payload.size, { index -> when(index) { 0 -> Event.C.headValue[0] 1 -> Event.C.headValue[1] 2 -> payload.size.toByte() in 3..(4 + payload.size - 2) -> payload[index - 3] else -> (checksumOf(payload) + 1.toByte()).toByte() } }) }
inline fun checksumOf(data: ByteArray): Byte { var result = 0x00.toByte() for(b in data) { result = (result + b).toByte() } return (result.inv() + 1.toByte()).toByte() }
И зачем все это было нужно?
На этом примере, мне хотелось бы показать, как легко и непринужденно можно поддерживать модульность при обработке почти что произвольных событий, к слову, не обязательно пришедших из Bluetooth источника (никакого Bluetooth-зависимого кода пока-что не было), при этом избегая возможных повреждений пакетов и зашумления канала связи.
И что дальше?
Сделаем небольшую обертку над RxBluetooth, которая позволит нам в реактивном стиле работать с различными подключениями, слушая различные наборы эвентов.
Весь код условно можно разделить на три набора компонент: два сервиса и один репозиторий.
Сервисы будут у нас предоставлять подключение и работу с данными по подключению соответственно, а репозиторий – предоставлять абстракцию для работы с конкретными подключениями и выступать в роли неявного flyweight-а подключений.
Интерфейсы будут примерно следующими:
interface ConnectivityService { fun sub(service: UUID): Observable<DataService> } interface DataService { fun sub(): Flowable<Event> fun write(data: ByteArray): Boolean fun dispose() } interface DataRepository { fun sub(serviceUUID: UUID): Flowable<Event> fun write(serviceUUID: UUID, data: ByteArray): Flowable<Boolean> fun dispose() }
И, соответственно, реализации под катом
class ConnectivityServiceImpl(private val bluetooth: RxBluetooth, private val events: EventMatchersAdapter, private val timeoutSeconds: Long = 15L): ConnectivityService { override fun sub(service: UUID): Observable<DataService> = when(bluetooth.isBluetoothEnabled && bluetooth.isBluetoothAvailable) { false -> Observable.empty() else -> { ensureBluetoothNotDiscovering() bluetooth.startDiscovery() bluetooth.observeDevices() .filter { device -> device.uuids.contains(ParcelUuid(service)) } .timeout(timeoutSeconds, TimeUnit.SECONDS) .take(1) .doOnNext { _ -> ensureBluetoothNotDiscovering() } .doOnError { _ -> ensureBluetoothNotDiscovering() } .doOnComplete { -> ensureBluetoothNotDiscovering() } .flatMap { device -> bluetooth.observeConnectDevice(device, service) } .map { connection -> DataServiceImpl(BluetoothConnection(connection), events) } } } private fun ensureBluetoothNotDiscovering() { if(bluetooth.isDiscovering) { bluetooth.cancelDiscovery() } } }
class DataServiceImpl constructor(private val connection: BluetoothConnection, private val adapter: EventMatchersAdapter): DataService { override fun sub(): Flowable<Event> = Flowable.create<Event>({ emitter -> val underlying = EventsBridge(adapter = adapter, emitter = emitter) emitter.setDisposable(object: MainThreadDisposable() { override fun onDispose() { if(!underlying.isDisposed) { underlying.dispose() } } }) connection.observeByteStream().subscribe(underlying) }, BackpressureStrategy.BUFFER) override fun write(data: ByteArray): Boolean = connection.send(data) override fun dispose() = connection.closeConnection() }
class DataRepositoryImpl(private val connectivity: ConnectivityService): DataRepository { private val services = ConcurrentHashMap<UUID, DataService>() override fun sub(serviceUUID: UUID): Flowable<Event> = serviceOf(serviceUUID) .flatMap { service -> service.sub() } override fun write(serviceUUID: UUID, data: ByteArray): Flowable<Boolean> = serviceOf(serviceUUID) .map { service -> service.write(data) } override fun dispose() { for((_, service) in services) { service.dispose() } } private fun serviceOf(serviceUUID: UUID): Flowable<DataService> = with(services[serviceUUID]) { when(this) { null -> connectivity.sub(serviceUUID).doOnNext { service -> services.put(serviceUUID, service) }.toFlowable(BackpressureStrategy.BUFFER) else -> Flowable.just(this) } } }
И таким образом, в минимальное количество строк, мы получаем возможность делать то, что обычно растягивалось в жуткие цепочки вызовов, или коллбэк-хэлл примерно следующим образом:
repository.sub(UUID.randomUUID()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { event -> when(event) { is Event.A -> doSomeStuffA(event) is Event.B -> doSomeStuffB(event) is Event.C -> doSomeStuffC(event) is Event.D -> doSomeStuffD(event) } }
11 строк для прослушки четырех событий от произвольного устройства, неплохо, не правда ли?)
Вместо заключения
Если у кого-то из читающих возникнет желание посмотреть на исходники – они лежат здесь.
Если кому-то захочется посмотреть, как впишутся другие правила для образования пакетов из сырых байт – пишите, попробуем добавить.
UPD: оформил в микро-фреймворк с опцинальными мостами в ReactiveX, корутины, а также чистой реализацией на Kotlin.