Небольшое предисловие, или в чем же боль
В последнее время я активно работаю над приложениями, которые имеют модули работы с Bluetooth по не-очень-хорошо спроектированным протоколам с кастомными устройствами, что периодически добавляет мне интересных угу, как же проблем.
Поскольку я искренний фанат реактивности в приложениях, то такие проблемы приходилось решать собственными силами, поскольку решений в сети просто нет. Совсем. О получившейся архитектуре работы с Bluetooth-устройствами я и хотел бы вам рассказать.
Опасности на пути джедая
Первый важный момент, о котором должен помнить разработчик, при работе с Bluetooth – пакеты могут повреждаться по пути. А еще – они могут сопровождаться шумом. И это не один случай из миллиона, подобные явления могут встречаться довольно часто, и их нужно обрабатывать. Еще блютус может отключиться, или не подключиться, или сделать вид что подключился, но на самом то деле мы знаем, что это ничего не значит...
В качестве примера решения этих задач, спроектируем микро-фреймворк для процессинга эвентов, которые детерминируются по типам с помощью шапки (первые N байт) и валидируются с помощью какой-нибудь простенькой чек-суммы. Для того, чтобы не загромождать код, примем допущение, что шапка по протоколу имеет фиксированный размер. Все пакеты же разделим на два типа: с фиксированной длиной, и с динамической, передаваемой отдельным байтом.
Проектирование
Начнем с описания возможных эвентов в приложении. Итак, общая абстракция будет выглядеть примерно так, с учетом принятых ограничений:
sealed class Event {
val headSize: Int = 2
abstract val head: ByteArray
abstract fun isCorrupted(): Boolean
//To be continued
}
Далее, когда мы определили наборы постоянных свойств для всех пакетов, требуется как-либо формализовать условия, при которых мы:
- Посчитаем, что пакет принадлежит какому-либо типу
- Должны добавить в буффер байт, так как пока что пакет не собирается
- Должны грохнуть буффер, так как какие-либо условия для его сборки не выполнились (этот пункт нужен скорее для подстраховки, лучше добавить туда логи во время тестирования приложения, чтобы проверять полноту остальных условий)
- Пробуем собрать пакет из буффера и проверяем его валидность
Данные четыре условия приводят нас к интерфейсу следующего вида:
interface EventMatcher {
val headSize: Int
fun matches(packet: ByteBuffer): Boolean
fun create(packet: ByteBuffer): Event
fun shouldBuffer(packet: ByteBuffer): Boolean
fun shouldDrop(packet: ByteBuffer): Boolean
}
Создадим компонент, который будет предоставлять сказал бы, что удобный, но это оставлю на ваше усмотрение прокси-интерфейс к нашим матчерам для всех существующих типов, ничего выдающегося, код под катом:
class EventMatchersAdapter {
private val matchers = mutableMapOf<KClass<out Event>, EventMatcher>()
fun register(event: KClass<out Event>, matcher: EventMatcher)
= apply { matchers.put(event, matcher) }
fun unregister(event: KClass<out Event>)
= apply { matchers.remove(event) }
fun knownEvents(): List<KClass<out Event>>
= matchers.keys.toList()
fun matches(packet: ByteBuffer, event: KClass<out Event>): Boolean
= matchers[event]?.matches(packet) ?: false
fun shouldBuffer(packet: ByteBuffer, event: KClass<out Event>): Boolean
= matchers[event]?.shouldBuffer(packet) ?: false
fun shouldDrop(packet: ByteBuffer, event: KClass<out Event>): Boolean
= matchers[event]?.shouldDrop(packet) ?: false
fun create(packet: ByteBuffer, event: KClass<out Event>): Event?
= matchers[event]?.create(packet)
}
В пакетах опишем способ определения того, был данный пакет поврежден или нет. Это довольно удобный подход, который позволяет не сильно страдать из-за плохо спроектированного протокола, в котором инженеру вздумалось закинуть вам сотню способов проверки пакетов на корректность, для каждого по несколько.
data class A(override val head: ByteArray,
val payload: ByteArray,
val checksum: Byte): Event() {
companion object {
//(two bytes of head) + (2 bytes of payload) + (byte of checksum)
@JvmStatic val length = 5.toByte()
@JvmStatic val headValue = byteArrayOf(0x00, 0x00)
@JvmStatic val matcherValue = object: EventMatcher {
override val headSize: Int = 2
override fun matches(packet: ByteBuffer): Boolean {
if(packet.position() == 0) return true
if(packet.position() == 1) return packet[0] == headValue[0]
return packet[0] == headValue[0]
&& packet[1] == headValue[1]
}
override fun create(packet: ByteBuffer): A {
packet.rewind()
return A(
ByteArray(2, { packet.get() }),
ByteArray(2, { packet.get() }),
packet.get()
)
}
override fun shouldBuffer(packet: ByteBuffer): Boolean
= packet.position() < length
override fun shouldDrop(packet: ByteBuffer): Boolean
= packet.position() > length
}
}
override fun isCorrupted(): Boolean = checksumOf(payload) != checksum
override fun equals(other: Any?): Boolean {
if(other as? A == null) return false
other as A
return Arrays.equals(head, other.head)
&& Arrays.equals(payload, other.payload)
&& checksum == other.checksum
}
override fun hashCode(): Int {
var result = Arrays.hashCode(head)
result = result * 31 + Arrays.hashCode(payload)
result = result * 31 + checksum.hashCode()
return result
}
}
data class C(override val head: ByteArray,
val length: Byte,
val payload: ByteArray,
val checksum: Byte): Event() {
companion object {
@JvmStatic val headValue = byteArrayOf(0x01, 0x00)
@JvmStatic val matcherValue = object: EventMatcher {
override val headSize: Int = 2
override fun matches(packet: ByteBuffer): Boolean {
if(packet.position() == 0) return true
if(packet.position() == 1) return packet[0] == headValue[0]
return packet[0] == headValue[0]
&& packet[1] == headValue[1]
}
override fun create(packet: ByteBuffer): C {
packet.rewind()
val msb = packet.get()
val lsb = packet.get()
val length = packet.get()
return C(
byteArrayOf(msb, lsb),
length,
packet.take(3, length.toPositiveInt()),
packet.get()
)
}
override fun shouldBuffer(packet: ByteBuffer): Boolean
= when(packet.position()) {
in 0..2 -> true
else -> packet.position() < (packet[2].toPositiveInt() + 4) //increase by (2 bytes of head) + (1 byte of length) + (1 byte of checksum)
}
override fun shouldDrop(packet: ByteBuffer): Boolean
= when(packet.position()) {
in 0..2 -> false
else -> packet.position() > (packet[2].toPositiveInt() + 4) //increase by (2 bytes of head) + (1 byte of length) + (1 byte of checksum)
}
}
}
override fun isCorrupted(): Boolean = checksumOf(payload) != checksum
override fun equals(other: Any?): Boolean {
if(other as? C == null) return false
other as C
return Arrays.equals(head, other.head)
&& length == other.length
&& Arrays.equals(payload, other.payload)
&& checksum == other.checksum
}
override fun hashCode(): Int {
var result = Arrays.hashCode(head)
result = result * 31 + length.hashCode()
result = result * 31 + Arrays.hashCode(payload)
result = result * 31 + checksum.hashCode()
return result
}
}
Далее – от нас требуется описать сам алгоритм считывания пакетов, причем такой, который будет:
- Поддерживать несколько различных типов
- Разруливать повреждения пакетов за нас
- Будет дружить с Flowable
Реализация алгоритма скрытого за Subscriber интерфейсом:
class EventsBridge(private val adapter: EventMatchersAdapter,
private val emitter: FlowableEmitter<Event>,
private val bufferSize: Int = 128): DisposableSubscriber<Byte>() {
private val buffers: Map<KClass<out Event>, ByteBuffer>
= mutableMapOf<KClass<out Event>, ByteBuffer>()
.apply {
for(knownEvent in adapter.knownEvents()) {
put(knownEvent, ByteBuffer.allocateDirect(bufferSize))
}
}
.toMap()
override fun onError(t: Throwable) {
emitter.onError(t)
}
override fun onComplete() {
emitter.onComplete()
}
override fun onNext(t: Byte) {
for((key, value) in buffers) {
value.put(t)
adapter.knownEvents()
.filter { it == key }
.forEach {
if (adapter.matches(value, it)) {
when {
adapter.shouldDrop(value, it) -> {
value.clear()
}
!adapter.shouldBuffer(value, it) -> {
val event = adapter.create(value, it)
if (!emitter.isCancelled
&& event != null
&& !event.isCorrupted()) {
release()
emitter.onNext(event)
} else {
value.clear()
}
}
}
} else {
value.clear()
}
}
}
}
private fun release() {
for(buffer in buffers) buffer.value.clear()
}
}
Использование
Рассмотрим на примере прогонки unit-тестов:
@Test
fun test_single_fixedLength() {
val adapter = EventMatchersAdapter()
.register(Event.A::class, Event.A.matcherValue)
val packetA = generateCorrectPacketA()
val testSubscriber = TestSubscriber<Event>()
Flowable.create<Event>(
{ emitter ->
val bridge = EventsBridge(adapter, emitter)
Flowable.create<Byte>({ byteEmitter -> for(byte in packetA) { byteEmitter.onNext(byte) } }, BackpressureStrategy.BUFFER).subscribe(bridge)
},
BackpressureStrategy.BUFFER
)
.subscribe(testSubscriber)
testSubscriber.assertNoErrors()
testSubscriber.assertValue { event ->
event is Event.A && !event.isCorrupted()
}
}
@Test
fun test_multiple_dynamicLength_mixed_withNoise() {
val adapter = EventMatchersAdapter()
.register(Event.C::class, Event.C.matcherValue)
.register(Event.D::class, Event.D.matcherValue)
val packetC1 = generateCorrectPacketC()
val packetD1 = generateCorrectPacketD()
val packetD2 = generateCorruptedPacketD()
val packetC2 = generateCorruptedPacketC()
val testSubscriber = TestSubscriber<Event>()
val random = Random()
Flowable.create<Event>(
{ emitter ->
val bridge = EventsBridge(adapter, emitter)
Flowable.create<Byte>({ byteEmitter ->
for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) }
for(byte in packetC1) { byteEmitter.onNext(byte) }
for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) }
for(byte in packetD1) { byteEmitter.onNext(byte) }
for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) }
for(byte in packetD2) { byteEmitter.onNext(byte) }
for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) }
for(byte in packetC2) { byteEmitter.onNext(byte) }
for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) }
}, BackpressureStrategy.BUFFER).subscribe(bridge)
},
BackpressureStrategy.BUFFER
)
.subscribe(testSubscriber)
testSubscriber.assertNoErrors()
testSubscriber.assertValueCount(2)
}
private fun generateCorrectPacketB(): ByteArray {
val rnd = Random()
val payload = byteArrayOf(
rnd.nextInt().toByte(),
rnd.nextInt().toByte(),
rnd.nextInt().toByte(),
rnd.nextInt().toByte()
)
return byteArrayOf(
Event.B.headValue[0],
Event.B.headValue[1],
payload[0],
payload[1],
payload[2],
payload[3],
checksumOf(payload)
)
}
private fun generateCorrectPacketC(): ByteArray {
val rnd = Random()
val payload = List(rnd.nextInt(16), { index ->
rnd.nextInt().toByte()
}).toByteArray()
return ByteArray(4 + payload.size, { index ->
when(index) {
0 -> Event.C.headValue[0]
1 -> Event.C.headValue[1]
2 -> payload.size.toByte()
in 3..(4 + payload.size - 2) -> payload[index - 3]
4 + payload.size - 1 -> checksumOf(payload)
else -> 0.toByte()
}
})
}
private fun generateCorruptedPacketB(): ByteArray {
val rnd = Random()
val payload = byteArrayOf(
rnd.nextInt().toByte(),
rnd.nextInt().toByte(),
rnd.nextInt().toByte(),
rnd.nextInt().toByte()
)
return byteArrayOf(
Event.B.headValue[0],
Event.B.headValue[1],
payload[0],
payload[1],
payload[2],
payload[3],
(checksumOf(payload) + 1.toByte()).toByte()
)
}
private fun generateCorruptedPacketC(): ByteArray {
val rnd = Random()
val payload = List(rnd.nextInt(16), { _ -> rnd.nextInt().toByte() }).toByteArray()
return ByteArray(4 + payload.size, { index ->
when(index) {
0 -> Event.C.headValue[0]
1 -> Event.C.headValue[1]
2 -> payload.size.toByte()
in 3..(4 + payload.size - 2) -> payload[index - 3]
else -> (checksumOf(payload) + 1.toByte()).toByte()
}
})
}
inline fun checksumOf(data: ByteArray): Byte {
var result = 0x00.toByte()
for(b in data) {
result = (result + b).toByte()
}
return (result.inv() + 1.toByte()).toByte()
}
И зачем все это было нужно?
На этом примере, мне хотелось бы показать, как легко и непринужденно можно поддерживать модульность при обработке почти что произвольных событий, к слову, не обязательно пришедших из Bluetooth источника (никакого Bluetooth-зависимого кода пока-что не было), при этом избегая возможных повреждений пакетов и зашумления канала связи.
И что дальше?
Сделаем небольшую обертку над RxBluetooth, которая позволит нам в реактивном стиле работать с различными подключениями, слушая различные наборы эвентов.
Весь код условно можно разделить на три набора компонент: два сервиса и один репозиторий.
Сервисы будут у нас предоставлять подключение и работу с данными по подключению соответственно, а репозиторий – предоставлять абстракцию для работы с конкретными подключениями и выступать в роли неявного flyweight-а подключений.
Интерфейсы будут примерно следующими:
interface ConnectivityService {
fun sub(service: UUID): Observable<DataService>
}
interface DataService {
fun sub(): Flowable<Event>
fun write(data: ByteArray): Boolean
fun dispose()
}
interface DataRepository {
fun sub(serviceUUID: UUID): Flowable<Event>
fun write(serviceUUID: UUID, data: ByteArray): Flowable<Boolean>
fun dispose()
}
И, соответственно, реализации под катом
class ConnectivityServiceImpl(private val bluetooth: RxBluetooth,
private val events: EventMatchersAdapter,
private val timeoutSeconds: Long = 15L): ConnectivityService {
override fun sub(service: UUID): Observable<DataService> = when(bluetooth.isBluetoothEnabled && bluetooth.isBluetoothAvailable) {
false -> Observable.empty()
else -> {
ensureBluetoothNotDiscovering()
bluetooth.startDiscovery()
bluetooth.observeDevices()
.filter { device -> device.uuids.contains(ParcelUuid(service)) }
.timeout(timeoutSeconds, TimeUnit.SECONDS)
.take(1)
.doOnNext { _ -> ensureBluetoothNotDiscovering() }
.doOnError { _ -> ensureBluetoothNotDiscovering() }
.doOnComplete { -> ensureBluetoothNotDiscovering() }
.flatMap { device -> bluetooth.observeConnectDevice(device, service) }
.map { connection -> DataServiceImpl(BluetoothConnection(connection), events) }
}
}
private fun ensureBluetoothNotDiscovering() {
if(bluetooth.isDiscovering) {
bluetooth.cancelDiscovery()
}
}
}
class DataServiceImpl constructor(private val connection: BluetoothConnection,
private val adapter: EventMatchersAdapter): DataService {
override fun sub(): Flowable<Event> = Flowable.create<Event>({ emitter ->
val underlying = EventsBridge(adapter = adapter, emitter = emitter)
emitter.setDisposable(object: MainThreadDisposable() {
override fun onDispose() {
if(!underlying.isDisposed) {
underlying.dispose()
}
}
})
connection.observeByteStream().subscribe(underlying)
}, BackpressureStrategy.BUFFER)
override fun write(data: ByteArray): Boolean
= connection.send(data)
override fun dispose()
= connection.closeConnection()
}
class DataRepositoryImpl(private val connectivity: ConnectivityService): DataRepository {
private val services = ConcurrentHashMap<UUID, DataService>()
override fun sub(serviceUUID: UUID): Flowable<Event>
= serviceOf(serviceUUID)
.flatMap { service -> service.sub() }
override fun write(serviceUUID: UUID, data: ByteArray): Flowable<Boolean>
= serviceOf(serviceUUID)
.map { service -> service.write(data) }
override fun dispose() {
for((_, service) in services) {
service.dispose()
}
}
private fun serviceOf(serviceUUID: UUID): Flowable<DataService> = with(services[serviceUUID]) {
when(this) {
null -> connectivity.sub(serviceUUID).doOnNext { service -> services.put(serviceUUID, service) }.toFlowable(BackpressureStrategy.BUFFER)
else -> Flowable.just(this)
}
}
}
И таким образом, в минимальное количество строк, мы получаем возможность делать то, что обычно растягивалось в жуткие цепочки вызовов, или коллбэк-хэлл примерно следующим образом:
repository.sub(UUID.randomUUID())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { event ->
when(event) {
is Event.A -> doSomeStuffA(event)
is Event.B -> doSomeStuffB(event)
is Event.C -> doSomeStuffC(event)
is Event.D -> doSomeStuffD(event)
}
}
11 строк для прослушки четырех событий от произвольного устройства, неплохо, не правда ли?)
Вместо заключения
Если у кого-то из читающих возникнет желание посмотреть на исходники – они лежат здесь.
Если кому-то захочется посмотреть, как впишутся другие правила для образования пакетов из сырых байт – пишите, попробуем добавить.
UPD: оформил в микро-фреймворк с опцинальными мостами в ReactiveX, корутины, а также чистой реализацией на Kotlin.