Как я и говорил в прошлой части, тут мы попробуем сделать самый простой обмен при помощи MQTT.

Что же из себя представляет MQTT? В первую очередь - это протокол обмена сообщениями. Данные сообщения могут группироваться по древовидному признаку. Мы можем как отправлять их, так и получать, подписываясь на определенные группы в дереве или отдельные сообщения.
Добавляем в репозитории контроллера системы управления пакетами сервера:
deb http://ppa.launchpad.net/mosquitto-dev/mosquitto-ppa/ubuntu precise main deb-src http://ppa.launchpad.net/mosquitto-dev/mosquitto-ppa/ubuntu precise main
Обновляем список пакетов и устанавливаем брокер MQTT Mosquitto на контроллере:
apt-get update apt-get install mosquitto
На системе для разработки устанавливаем файлы для сборки и исходники libmosquitto-dev:
sudo apt install libmosquitto-dev apt source libmosquitto-dev
Переходим в созданную папку с исходниками и собираем библиотеку под компилятор для arm-linux-gnueabihf:
make WITH_TLS=no WITH_CJSON=no WITH_STATIC_LIBRARIES=yes
В папке ./lib/cpp/вы найдете все собранные библиотеки чтобы подложить в проект. Нам потребуется файл libmosquitto.a.
Давайте разберем, что же мы хотим получить не выходе? Разложим все по пунктам:
нам надо передавать адреса переменных;
будем прикреплять их за топиками;
можем подписываться на внешние сообщения, а можем и нет;
периодически переменные должны обновляться на брокере.
Опишем структуру хранения самой переменной для контроля изменения переменной, времени изменения и точности, до которой надо контролировать числа с плавающей запятой:
template<typename T> struct Value { Value(string topic,T old, T* current,int precision): topic{topic},old{old},current{current},precision{precision} { } ~Value() { } string topic; T old; T* current; uint64_t tFlag=timeSinceEpochMillisec(); int precision=2; };
Функция timeSinceEpochMillisec() была описана в предыдущей статье.
Мы будем разрабатывать класс, который необходимо наследовать от mosquittopp. Задаем Id для присоединения к брокеру на контроллере, хост, который у нас будет в нашем случае localhost, а также начальный топик device.
class MqttDevice:public mosquittopp::mosquittopp { public: MqttDevice(string Id,string mqtt_host,string startTag="device"); ~MqttDevice();
Очередь сообщений будем хранить списком:
list< tuple<string,string> > messages;
Для добавления переменных в классе организуем необходимые функции. Мы передаем топик и переменную.
void addSubscribed(string topic, volatile bool* value); void addSubscribed(string topic, volatile float* value); void addSubscribed(string topic, volatile uint16_t* value); void add(string topic,volatile bool* value); void add(string topic,volatile uint16_t* value); void add(string topic,volatile float* value,int precision=1); void add(string topic,volatile double* value,int precision=1);
Получим вот такой полный файл MqttDevice.h:
class MqttDevice:public mosquittopp::mosquittopp { public: MqttDevice(string Id,string mqtt_host,string startTag="device"); ~MqttDevice(); void setTag(string topic,string value); void message(string name, volatile double value); void message(string name, volatile float value); void message(string name, volatile bool value); void message(string name, string value); list< tuple<string,string> > messages; void on_connect(int rc); void on_message(const struct mosquitto_message* message); void on_subscribe(int mid,int qos_count,const int* granted_qos); void sendMessage(string topic, string& message); void sendMessage(string& topic,char* message, int msgLength); void addSubscribed(string topic, volatile bool* value); void addSubscribed(string topic, volatile float* value); void add(string topic,volatile bool* value); void add(string topic,volatile float* value,int precision=1); void add(string topic,volatile double* value,int precision=1); protected: uint64_t tBool=0,tFloat=0,tDouble=0; bool _exit; private: string mqtt_host; list<Value<volatile bool> > lwBool; list<Value<volatile float> > lwFloat; list<Value<volatile bool> > lrBool; list<Value<volatile float> > lrFloat; list<Value<volatile double> > lrDouble; struct mosquitto *mosq; string startTag; void mosq_loop(); thread _threadMosquitto; };
В конструкторе и деструкторе мы организуем основные работы по созданию потока и его остановке.
MqttDevice::MqttDevice(string Id,string mqtt_host,string startTag) : startTag{startTag},mqtt_host{mqtt_host}, mosquittopp((Id+to_string(rand())).c_str()) { _exit=false; int rc=connect(mqtt_host.c_str(),mqtt_port,65535); printf("MqttDevice connect %d\n",rc); _threadMosquitto = thread(&MqttDevice::mosq_loop,this); } MqttDevice::~MqttDevice() { _exit=true; _threadMosquitto.join(); printf("MqttDevice delete\n"); }
Полный текст MqttDevice.cpp:
#define mqtt_port 1883 MqttDevice::MqttDevice(string Id,string mqtt_host,string startTag) : startTag{startTag},mqtt_host{mqtt_host}, mosquittopp((Id+to_string(rand())).c_str()) { _exit=false; int rc=connect(mqtt_host.c_str(),mqtt_port,65535); printf("MqttDevice connect %d\n",rc); _threadMosquitto = thread(&MqttDevice::mosq_loop,this); } MqttDevice::~MqttDevice() { _exit=true; _threadMosquitto.join(); printf("MqttDevice delete\n"); } void MqttDevice::addSubscribed(string topic, volatile bool* value) { lwBool.push_back({topic,false,value,0}); if(subscribe(0,topic.c_str(),0)==MOSQ_ERR_SUCCESS) printf("[Mosquitto] Subscribing to topic %s\n",topic.c_str()); } void MqttDevice::addSubscribed(string topic, volatile float* value) { lwFloat.push_back({topic,0.0f,value,1}); if(subscribe(0,topic.c_str(),0)==MOSQ_ERR_SUCCESS) printf("[Mosquitto] Subscribing to topic %s\n",topic.c_str()); } void MqttDevice::add(string topic,volatile bool* value) { lrBool.push_back({topic,false,value,0}); } void MqttDevice::add(string topic,volatile float* value, int precision) { lrFloat.push_back({topic,0.0f,value,precision}); } void MqttDevice::add(string topic,volatile double* value, int precision) { lrDouble.push_back({topic,0.0,value,precision}); } void MqttDevice::on_connect(int rc) { int err=0; printf("[Mosquitto] Connect. rc=%d\n",rc); if(rc==0) { for(auto& v:lwBool) { err=subscribe(0,v.topic.c_str(),0); if(err==MOSQ_ERR_SUCCESS) printf("[Mosquitto] Subscribing to topic %s\n",v.topic.c_str()); else {reconnect();return;} nsleep(10); } for(auto& v:lwFloat) { err=subscribe(0,v.topic.c_str(),0); if(err==MOSQ_ERR_SUCCESS) printf("[Mosquitto] Subscribing to topic %s\n",v.topic.c_str()); else {reconnect();return;} nsleep(10); } } else printf("[Mosquitto] Connection failed. Aborting subscribing.\n"); } void MqttDevice::on_message(const struct mosquitto_message *message) { string tag=message->topic; string value=string((const char*)message->payload,message->payloadlen); setTag(tag,value); } void MqttDevice::mosq_loop() { int rc; lib_init(); printf("[Mosquitto] Start loop...\n"); while(!_exit) { uint64_t now = timeSinceEpochMillisec(); rc=loop(); if(rc) { printf("[Mosquitto] Disconnected. Trying to reconnect...\n"); nsleep(500); reconnect(); nsleep(500); continue; } while(messages.size()>0) { rc=publish(NULL,get<0>(messages.front()).c_str(),strlen(get<1>(messages.front()).c_str()),(const uint8_t*)get<1>(messages.front()).c_str(),0,true); if(rc) { printf("[Mosquitto] Error send message\n"); break; } else messages.pop_front(); } if(now-tBool>300) { tBool=now; for(auto &v:lrBool) { if(v.current==NULL) continue; if(v.old!=*v.current || now-v.tFlag>3000) { v.tFlag=now; v.old=*v.current; message(v.topic,*v.current); } } for(auto &v:lwBool) { if(v.old!=*v.current || now-v.tFlag>3000) { v.tFlag=now; v.old=*v.current; message(v.topic,*v.current); } } } if(now-tFloat>500) { tFloat=now; for(auto &v:lrFloat) { float r=round(*v.current*pow(10,v.precision))/pow(10,v.precision); if(v.old!=r || now-v.tFlag>5000) { v.tFlag=now; v.old=r; message(v.topic,r); } } for(auto &v:lrDouble) { double r=round(*v.current*pow(10,v.precision))/pow(10,v.precision); if(v.old!=r || now-v.tFlag>5000) { v.tFlag=now; v.old=r; message(v.topic,r); } } for(auto &v:lwFloat) { float r=round(*v.current*pow(10,v.precision))/pow(10,v.precision); if(v.old!=r || now-v.tFlag>5000) { v.tFlag=now; v.old=r; message(v.topic,r); } } } } lib_cleanup(); printf("[Mosquitto] Exit from mosquitto loop.\n"); } void MqttDevice::message(string name, volatile float value) { messages.push_back(make_tuple(name,to_string(value))); } void MqttDevice::message(string name, volatile double value) { messages.push_back(make_tuple(name,to_string(value))); } void MqttDevice::message(string name, volatile bool value) { messages.push_back(make_tuple(name,(value?"true":"false"))); } void MqttDevice::message(string name, string value) { messages.push_back(make_tuple(name,value)); } void MqttDevice::setTag(string topic,string value) { try { for(auto& v:lwBool) if(v.topic==topic) { *(v.current)=(value=="true"); v.old=*(v.current); return; } for(auto& v:lwFloat) if(v.topic==topic) { *(v.current)=(float)atof(value.c_str()); v.old=*(v.current); return; } } catch(exception& ex) { } }
Для передачи более сложных объектов автоматизации, мы можем реализовать функции:
void MqttDevice::add(ValveD* valve) { add(startTag+"/"+valve->name+"/SQL",valve->b_t1); add(startTag+"/"+valve->name+"/SQH",valve->b_t2); add(startTag+"/"+valve->name+"/out",valve->b_out); addSubscribed(startTag+"/"+valve->name+"/auto",valve->b_mode); addSubscribed(startTag+"/"+valve->name+"/man",valve->b_control); }
В данном примере передается указатель на объект отсечного клапана с двумя концевиками крайних положений, выходного сигнала, а также управления клапаном. Из управления есть сигнал в каком состоянии клапан auto. Если ручной режим auto=false, то управление с сигнала man. Если автоматический режим auto=true, то управление с внутреннего состояния, заданного программно по алгоритму.
Из прошлой статьи возьмем пример опроса модуля ввода/вывода и к нему добавим отправку полученных данных на брокер:
volatile bool b1=false,b2=false,b3=false; ModbusLine line1(modbus_new_rtu("/dev/ttyS2", 115200, 'N', 8, 1)); line1.addMDS_DIO_16BD(1,&b1,&b2,&b3); MqttDevice mqtt("line","localhost"); mqtt.addSubscribed("device/b1",&b1); mqtt.add("device/b2",&b2); mqtt.add("device/b3",&b3);
При подключении к брокеру мы получили 3 переменных. При изменении переменной b1 через брокер в программе получаем изменение в значении переменной.

Мы добились нужного нам результата, а добавление новых переменных не занимает много кода.
В следующей статье мы опробуем обмен между контроллерами и устройствами через RS-485 по Modbus аналогичным образом. Это будет полезно при межконтроллерном обмене и общении с сенсорными панелями.
