Как я и говорил в прошлой части, тут мы попробуем сделать самый простой обмен при помощи MQTT.

Что же из себя представляет MQTT? В первую очередь - это протокол обмена сообщениями. Данные сообщения могут группироваться по древовидному признаку. Мы можем как отправлять их, так и получать, подписываясь на определенные группы в дереве или отдельные сообщения.

Добавляем в репозитории контроллера системы управления пакетами сервера:

deb http://ppa.launchpad.net/mosquitto-dev/mosquitto-ppa/ubuntu precise main
deb-src http://ppa.launchpad.net/mosquitto-dev/mosquitto-ppa/ubuntu precise main

Обновляем список пакетов и устанавливаем брокер MQTT Mosquitto на контроллере:

apt-get update
apt-get install mosquitto

На системе для разработки устанавливаем файлы для сборки и исходники libmosquitto-dev:

sudo apt install libmosquitto-dev
apt source libmosquitto-dev

Переходим в созданную папку с исходниками и собираем библиотеку под компилятор для arm-linux-gnueabihf:

make WITH_TLS=no WITH_CJSON=no WITH_STATIC_LIBRARIES=yes

В папке ./lib/cpp/вы найдете все собранные библиотеки чтобы подложить в проект. Нам потребуется файл libmosquitto.a.

Давайте разберем, что же мы хотим получить не выходе? Разложим все по пунктам:

  • нам надо передавать адреса переменных;

  • будем прикреплять их за топиками;

  • можем подписываться на внешние сообщения, а можем и нет;

  • периодически переменные должны обновляться на брокере.

Опишем структуру хранения самой переменной для контроля изменения переменной, времени изменения и точности, до которой надо контролировать числа с плавающей запятой:

template<typename T>
struct Value {
    Value(string topic,T old, T* current,int precision):
        topic{topic},old{old},current{current},precision{precision} { }
    ~Value() { }
    string topic;
    T old;
    T* current;
    uint64_t tFlag=timeSinceEpochMillisec();
    int precision=2;
};

Функция timeSinceEpochMillisec() была описана в предыдущей статье.

Мы будем разрабатывать класс, который необходимо наследовать от mosquittopp. Задаем Id для присоединения к брокеру на контроллере, хост, который у нас будет в нашем случае localhost, а также начальный топик device.

class MqttDevice:public mosquittopp::mosquittopp
{
public:
    MqttDevice(string Id,string mqtt_host,string startTag="device");
    ~MqttDevice();

Очередь сообщений  будем хранить списком:

    list< tuple<string,string> > messages;

Для добавления переменных в классе организуем необходимые функции. Мы передаем топик и переменную.

    void addSubscribed(string topic, volatile bool* value);
    void addSubscribed(string topic, volatile float* value);
    void addSubscribed(string topic, volatile uint16_t* value);
    void add(string topic,volatile bool* value);
    void add(string topic,volatile uint16_t* value);
    void add(string topic,volatile float* value,int precision=1);
    void add(string topic,volatile double* value,int precision=1);

Получим вот такой полный файл MqttDevice.h:

class MqttDevice:public mosquittopp::mosquittopp {
public:
    MqttDevice(string Id,string mqtt_host,string startTag="device");
    ~MqttDevice();
    void setTag(string topic,string value);
    void message(string name, volatile double value);
    void message(string name, volatile float value);
    void message(string name, volatile bool value);
    void message(string name, string value);
    list< tuple<string,string> > messages;
    void on_connect(int rc);
    void on_message(const struct mosquitto_message* message);
    void on_subscribe(int mid,int qos_count,const int* granted_qos);
    void sendMessage(string topic, string& message);
    void sendMessage(string& topic,char* message, int msgLength);
    void addSubscribed(string topic, volatile bool* value);
    void addSubscribed(string topic, volatile float* value);
    void add(string topic,volatile bool* value);
    void add(string topic,volatile float* value,int precision=1);
    void add(string topic,volatile double* value,int precision=1);
protected:
    uint64_t tBool=0,tFloat=0,tDouble=0;
    bool _exit;
private:
    string mqtt_host;
    list<Value<volatile bool> > lwBool;
    list<Value<volatile float> > lwFloat;
    list<Value<volatile bool> > lrBool;
    list<Value<volatile float> > lrFloat;
    list<Value<volatile double> > lrDouble;
    struct mosquitto *mosq;
    string startTag;
    void mosq_loop();
    thread _threadMosquitto;
};

В конструкторе и деструкторе мы организуем основные работы по созданию потока и его остановке.

MqttDevice::MqttDevice(string Id,string mqtt_host,string startTag) : startTag{startTag},mqtt_host{mqtt_host}, mosquittopp((Id+to_string(rand())).c_str()) {
    _exit=false;
    int rc=connect(mqtt_host.c_str(),mqtt_port,65535);
    printf("MqttDevice connect %d\n",rc);
    _threadMosquitto = thread(&MqttDevice::mosq_loop,this);
}
MqttDevice::~MqttDevice() {
    _exit=true;
    _threadMosquitto.join();
    printf("MqttDevice delete\n");
}

Полный текст MqttDevice.cpp:

#define mqtt_port 1883

MqttDevice::MqttDevice(string Id,string mqtt_host,string startTag) : startTag{startTag},mqtt_host{mqtt_host}, mosquittopp((Id+to_string(rand())).c_str()) {
    _exit=false;
    int rc=connect(mqtt_host.c_str(),mqtt_port,65535);
    printf("MqttDevice connect %d\n",rc);
    _threadMosquitto = thread(&MqttDevice::mosq_loop,this);
}
MqttDevice::~MqttDevice() {
    _exit=true;
    _threadMosquitto.join();
    printf("MqttDevice delete\n");
}
void MqttDevice::addSubscribed(string topic, volatile bool* value) {
    lwBool.push_back({topic,false,value,0});
    if(subscribe(0,topic.c_str(),0)==MOSQ_ERR_SUCCESS) printf("[Mosquitto] Subscribing to topic %s\n",topic.c_str());
}
void MqttDevice::addSubscribed(string topic, volatile float* value) {
    lwFloat.push_back({topic,0.0f,value,1});
    if(subscribe(0,topic.c_str(),0)==MOSQ_ERR_SUCCESS) printf("[Mosquitto] Subscribing to topic %s\n",topic.c_str());
}
void MqttDevice::add(string topic,volatile bool* value) {
    lrBool.push_back({topic,false,value,0});
}
void MqttDevice::add(string topic,volatile float* value, int precision) {
    lrFloat.push_back({topic,0.0f,value,precision});
}
void MqttDevice::add(string topic,volatile double* value, int precision) {
    lrDouble.push_back({topic,0.0,value,precision});
}
void MqttDevice::on_connect(int rc) {
    int err=0;    
    printf("[Mosquitto] Connect. rc=%d\n",rc);
    if(rc==0) {
        for(auto& v:lwBool) {
            err=subscribe(0,v.topic.c_str(),0);
            if(err==MOSQ_ERR_SUCCESS) printf("[Mosquitto] Subscribing to topic %s\n",v.topic.c_str());
            else {reconnect();return;}
            nsleep(10);
        }
        for(auto& v:lwFloat) {
            err=subscribe(0,v.topic.c_str(),0);
            if(err==MOSQ_ERR_SUCCESS) printf("[Mosquitto] Subscribing to topic %s\n",v.topic.c_str());
            else {reconnect();return;}
            nsleep(10);
        }
    }
    else
        printf("[Mosquitto] Connection failed. Aborting subscribing.\n");
}
void MqttDevice::on_message(const struct mosquitto_message *message) {
    string tag=message->topic;
    string value=string((const char*)message->payload,message->payloadlen);
    setTag(tag,value);
}
void MqttDevice::mosq_loop() {
    int rc;
    lib_init();
    printf("[Mosquitto] Start loop...\n");
    while(!_exit) {
        uint64_t now = timeSinceEpochMillisec();  
        rc=loop();
        if(rc) {
            printf("[Mosquitto] Disconnected. Trying to reconnect...\n");
            nsleep(500);
            reconnect();
            nsleep(500);
            continue;
        }
        while(messages.size()>0) {
            rc=publish(NULL,get<0>(messages.front()).c_str(),strlen(get<1>(messages.front()).c_str()),(const uint8_t*)get<1>(messages.front()).c_str(),0,true);
            if(rc) {
                printf("[Mosquitto] Error send message\n");
                break;
            }
            else
                messages.pop_front();
        }
        if(now-tBool>300) {
            tBool=now;
            for(auto &v:lrBool) {
                if(v.current==NULL) continue;
                if(v.old!=*v.current || now-v.tFlag>3000) {
                    v.tFlag=now;
                    v.old=*v.current;
                    message(v.topic,*v.current);
                }
            }
            for(auto &v:lwBool) {
                if(v.old!=*v.current || now-v.tFlag>3000) {
                    v.tFlag=now;
                    v.old=*v.current;
                    message(v.topic,*v.current);
                }
            }
        }
        if(now-tFloat>500) {
            tFloat=now;
            for(auto &v:lrFloat) {
                float r=round(*v.current*pow(10,v.precision))/pow(10,v.precision);
                if(v.old!=r || now-v.tFlag>5000) {
                    v.tFlag=now;
                    v.old=r;
                    message(v.topic,r);
                }
            }
            for(auto &v:lrDouble) {
                double r=round(*v.current*pow(10,v.precision))/pow(10,v.precision);
                if(v.old!=r || now-v.tFlag>5000) {
                    v.tFlag=now;
                    v.old=r;
                    message(v.topic,r);
                }
            }
            for(auto &v:lwFloat) {
                float r=round(*v.current*pow(10,v.precision))/pow(10,v.precision);
                if(v.old!=r || now-v.tFlag>5000) {
                    v.tFlag=now;
                    v.old=r;
                    message(v.topic,r);
                }
            }
        }
    }
    lib_cleanup();
    printf("[Mosquitto] Exit from mosquitto loop.\n");
}
void MqttDevice::message(string name, volatile float value) {
    messages.push_back(make_tuple(name,to_string(value)));
}
void MqttDevice::message(string name, volatile double value) {
    messages.push_back(make_tuple(name,to_string(value)));
}
void MqttDevice::message(string name, volatile bool value) {
    messages.push_back(make_tuple(name,(value?"true":"false")));
}
void MqttDevice::message(string name, string value) {
    messages.push_back(make_tuple(name,value));
}
void MqttDevice::setTag(string topic,string value) {
    try {
        for(auto& v:lwBool)
            if(v.topic==topic) {
                *(v.current)=(value=="true");
                v.old=*(v.current);
                return;
            }
        for(auto& v:lwFloat)
            if(v.topic==topic) {
                *(v.current)=(float)atof(value.c_str());
                v.old=*(v.current);
                return;
            }
    }
    catch(exception& ex) { }
}

Для передачи более сложных объектов автоматизации, мы можем реализовать функции:

void MqttDevice::add(ValveD* valve) {
    add(startTag+"/"+valve->name+"/SQL",valve->b_t1);
    add(startTag+"/"+valve->name+"/SQH",valve->b_t2);
    add(startTag+"/"+valve->name+"/out",valve->b_out);
    addSubscribed(startTag+"/"+valve->name+"/auto",valve->b_mode);
    addSubscribed(startTag+"/"+valve->name+"/man",valve->b_control);
}

В данном примере передается указатель на объект отсечного клапана с двумя концевиками крайних положений, выходного сигнала, а также управления клапаном. Из управления есть сигнал в каком состоянии клапан auto. Если ручной режим auto=false, то управление с сигнала man. Если автоматический режим auto=true, то управление с внутреннего состояния, заданного программно по алгоритму.

Из прошлой статьи возьмем пример опроса модуля ввода/вывода и к нему добавим отправку полученных данных на брокер:

volatile bool b1=false,b2=false,b3=false;
ModbusLine line1(modbus_new_rtu("/dev/ttyS2", 115200, 'N', 8, 1));
line1.addMDS_DIO_16BD(1,&b1,&b2,&b3);  

MqttDevice mqtt("line","localhost");
mqtt.addSubscribed("device/b1",&b1);
mqtt.add("device/b2",&b2);
mqtt.add("device/b3",&b3);

При подключении к брокеру мы получили 3 переменных. При изменении переменной b1 через брокер в программе получаем изменение в значении переменной.

Смотрим в MQTT Explorer результаты работы программы

Мы добились нужного нам результата, а добавление новых переменных не занимает много кода.

В следующей статье мы опробуем обмен между контроллерами и устройствами через RS-485 по Modbus аналогичным образом. Это будет полезно при межконтроллерном обмене и общении с сенсорными панелями.