在GPRS模块(SIM800C)和STM32芯片上实现MQTT协议

最近真是的好一个劲的折腾,算是完全搞明白了如何在STM32上实现MQTT协议了。

目录

  • 一、本教程中说明的内容
  • 三、在ARM mbed中使用MQTT

一、本教程中说明的内容

先说说本文化的适用范围吧:

一、使用的芯片是STM32F103C8T6,但是并没有任何与平台相关的代码,应该在所有STM32芯片中都是可以用的。

二、本文使用的是SIM800C模块,驱动是用C++实现的,基于ARM mbed平台写的。但是从原理上来讲,C和C++差别不大,本文的代码经过修改也可以直接用于其他平台的使用。

三、本文数据传输使用的是“透传模式”,对于所有的透传模块,本文都有很大的参考意义。

二、MQTT的使用

首先,推荐一个MQTT的库:Paho,这个库支持非常多的平台,当然也包括了嵌入式平台: GitHub – paho.mqtt.embedded-c 。将该库中的MQTTPacket文件夹下载下来,MQTTPacket文件夹下面主要有三个文件夹,我们使用的文件主要集中在src文件夹和samples文件夹中。

在GPRS模块(SIM800C)和STM32芯片上实现MQTT协议

src文件夹中存放着MQTT核心功能的代码,而samples中存放着三个例子:pub00sub1、pub0sub1_nc、qos0pub和网络驱动(transport.c和transport.h)。

由于三个驱动都有一个main函数,所以无法同时存在,本文中只使用了pub0sub,所以将此文件夹内容精减到只有pub0sub1.c、transport.c、transport.h三个文件。

在GPRS模块(SIM800C)和STM32芯片上实现MQTT协议

当然,在实现使用时可能会改变目录结构,使目录结构更加清楚,可以根据自己的喜好来进行更改,并不影响使用。

将transport.h的内容精减到以下内容:

int transport_sendPacketBuffer(unsigned char* buf, int buflen);
int transport_getdata(unsigned char* buf, int count);
int transport_open(char* host, int port);
int transport_close();

主要的工作有:

1、为了方便表示,删除了版权信息,有实际使用时请保留。

2、没有使用pub0sub1_nc这个例子,所以将transport_getdatanb方法去除。

3、透传模块中使用不到socket,所以将与socket相关的参数去掉。

这些方法实现的主要功能是:

1、transport_open的作用是初始化模块连网的信息、transport_close作用是关闭链接。

2、transport_sendPacketBuffer用于发送数据、transport_getdata用于接收数据。

然后用transport.c来实现transport.h中声明的4个函数。

三、在ARM mbed中使用MQTT

首先说句题外话,自我感觉mbed是一个非常不错的平台,很大程度上提高了代码的可重用性。但也有一个问题,就是其支持是以开发板为单位的,所以并不是对每一种芯片的支持都很好。

首先介绍一个例子, HelloMQTT – a mercurial repository | mbed 。但这个例子其中有很多不完善的地方,而且该例子使用的网络驱动也不是GPRS模块。

如果要用不同的连网方式,那么就写一个驱动,驱动中至少要包含以下两个方法:

int read(unsigned char* buffer, int len, int timeout);
int write(unsigned char* buffer, int len, int timeout);

这两个方法会在MQTTClient中自动调用,timeout表示毫秒。返回值为读或写的字节数。

对此,我写了驱动程序:

MQTTGRPSEthernet.h

#pragma once
#if !defined(MQTTGPRSETHERNET_H)
#define MQTTGPRSETHERNET_H
 
#define DEFAULT_GPRS_TIMEOUT 6000000  //6s
#define SERIAL_BUFFER_SIZE 256
 
#include "mbed.h"
 
class MQTTGPRSEthernet
{
public:
 MQTTGPRSEthernet(PinName tx, PinName rx, int baudrate = 115200);
 ~MQTTGPRSEthernet();
 bool initNet(const char* apn, const char* userName = "", const char* passWord = "", int timeout = DEFAULT_GPRS_TIMEOUT, bool isReconnect = false);
 bool connect(char* hostname, int port, int timeout = DEFAULT_GPRS_TIMEOUT);
 int read_line(char* buffer, int timeout = DEFAULT_GPRS_TIMEOUT);
 int read(unsigned char* buffer, int len, int timeout = DEFAULT_GPRS_TIMEOUT);
 int write(unsigned char* buffer, int len, int timeout = DEFAULT_GPRS_TIMEOUT);
 bool disconnect();
private:
 bool initNet();
 Serial eth;
 bool command(const char* cmd, const char* ack = "");
 bool connected = false;
 bool initialized = false;
 char* localIP;
 const char *_apn;
 const char *_passWord;
 const char *_userName;
};
#endif

MQTTGRPSEthernet.cpp

#include "MQTTGPRSEthernet.h"
#include "StringHelper.h"
 
#ifdef DEBUG
Serial pc(PB_10, PB_11, 115200);
#define LOG(args...)    pc.printf(args)
#define LOGC(args...)   pc.putc(args)
#else
#define LOG(args...)    (args)
#define LOGC(args...)   (args)
#endif // DEBUG
 
//还需要增加一个表示已经初始化成功的变量
//后续可能要增加表示6条连接的东东,如果host,port,连接状态等
MQTTGPRSEthernet::MQTTGPRSEthernet(PinName tx, PinName rx, int baudrate)
 : eth(tx, rx, baudrate){}
bool MQTTGPRSEthernet::initNet(const char* apn, const char* userName, const char* passWord, int timeout, bool isReconnect)
{
 _apn = apn;
 _userName = userName;
 _passWord = passWord;
 command("ATE0");
 wait_ms(800);
 command("AT+CIPMUX=0");
 wait_ms(800);
 //检查 GPRS 附着状态
 while (!command("AT+CGATT?\r\n", "+CGATT: 1"))
 {
 LOG("GPRS NOT ATTACHED!\r\n");
 wait_ms(1200);
 eth.printf("+++");
 wait_ms(1200);
 command("AT");
 command("AT+CIPSHUT\r\n");
 }
 //Select multiple connection
 //单链路模式
 //command("AT+CIPMUX=0");
 //wait_ms(800);
 //透传模式
 command("AT+CIPMODE=1");
 wait_ms(800); 
 command("AT+CIPCCFG=5,2,1024,1,0,1460,50");
 wait_ms(800);
 // Set APN 
 command(StringHelper::Format("AT+CSTT=\"%s\",\"%s\",\"%s\"", apn, userName, passWord));
 wait_ms(800);
 LOG(StringHelper::Format("AT+CSTT=\"%s\",\"%s\",\"%s\"\r\n", apn, userName, passWord));
 uint32_t start = us_ticker_read();
 do
 {
 // Brings up wireless connection
 //建立无线链路(GPRS 或者 CSD)
 bool flag = command("AT+CIICR", "OK");
 // Get local IP address
 eth.printf("AT+CIFSR\r\n");
 char ip_addr_buf[32];
 
 if (read_line(ip_addr_buf) <= 0) {
 //LOG("failed to join network\r\n");
 initialized = false;
 }
 else if (StringHelper::CheckIP(ip_addr_buf))
 {
 LOG("IP ADDRESS:");
 LOG(ip_addr_buf);
 LOG("\r\n");
 localIP = ip_addr_buf;
 initialized = true;
 break; 
 }
 else
 {
 initialized = false;
 }
 if (flag == false && initialized == false && isReconnect == false)
 {
 //表明此时建立无线链路返回error,得不到IP地址
 //此时应使用AT+CIPSHUT关闭PDP上下文后再重新进行连接
 //防止在数据状态
 wait_ms(1200);
 eth.printf("+++");
 wait_ms(1200);
 command("AT");
 command("AT+CIPSHUT\r\n");
 initialized = false;
 return initNet(_apn, _userName, _passWord, timeout, true);
 }
 
 } while (us_ticker_read() - start < timeout);
 //command("AT+CRPRXGET=0");
 return initialized;
}
bool MQTTGPRSEthernet::initNet()
{
 return initNet(_apn, _userName, _passWord);
}
bool MQTTGPRSEthernet::connect(char* hostname, int port, int timeout)
{
 uint32_t start = us_ticker_read();
 do
 {
 if (initialized == false)
 initNet();
 else
 break;
 } while (us_ticker_read() - start < timeout);
 start = us_ticker_read();
 do
 {
 //AT+CIPSTART=0,”TCP”,”116.228.221.51”,”8500”
 char strPort[10];
 itoa(port, strPort, 10);
 char response[64] = { 0, };
 int connectStart = us_ticker_read();
 LOG(StringHelper::Format("AT+CIPSTART=TCP,%s,%s\r\n", hostname, strPort));
 eth.printf(StringHelper::Format("AT+CIPSTART=TCP,%s,%s\r\n", hostname, strPort));
 
 do
 {
 read_line(response);
 if (strstr(response, "CONNECT") != NULL)
 {
 connected = true;
 return connected;
 }
 } while (us_ticker_read() - connectStart < timeout);
 } while (us_ticker_read() - start < timeout);
 connected = false;
 return connected;
}
//向SIM800C发送命令
//cmd:发送的命令字符串(不需要添加回车了),当cmd<0XFF的时候,发送数字(比如发送0X1A),大于的时候发送字符串.
//ack:期待的应答结果,如果为空,则表示不需要等待应答,对于那些并不关心返回结果的直接返回true
//此函数只能对应立即进行读取的情况。
//返回值:0,发送成功(得到了期待的应答结果)
//       1,发送失败
bool MQTTGPRSEthernet::command(const char* cmd, const char* ack)
{
 char response[64] = { 0, };
 if (StringHelper::EndWith(cmd, "\r\n"))
 {
 LOG(cmd);
 eth.printf(cmd);
 }
 else if (StringHelper::EndWith(cmd, "\r"))
 {
 LOG(StringHelper::Add(cmd, "\n"));
 eth.printf(StringHelper::Add(cmd, "\n")); 
 }
 else
 {
 LOG(StringHelper::Add(cmd, "\r\n"));
 eth.printf(StringHelper::Add(cmd, "\r\n"));
 } 
 read_line(response);
 
 if (strstr(response, ack) != NULL) {
 return true;
 } 
 return false;
}
int MQTTGPRSEthernet::read_line(char* buffer, int timeout)
{
 int bytes = 0;
 uint32_t start = us_ticker_read();
 
 while (true) {
 if (eth.readable()) {
 char ch = eth.getc();
 if ((ch == '\n' || ch == '\r') && bytes == 0)
 {
 //此时说明以\r\n开头,在无回显的情况下经常会出现
 //此时忽略空行\r\n
 continue;
 }
 if (ch == '\n') {
 if (bytes > 0 && buffer[bytes - 1] == '\r') {
 //表明读取到\r\n,此行结束
 bytes--;
 }
 if (bytes > 0) {
 buffer[bytes] = '\0';
 return bytes;
 }
 }
 else {
 buffer[bytes] = ch;
 bytes++;
 }
 }
 else {
 if ((uint32_t)(us_ticker_read() - start) > timeout) {
 return bytes;
 }
 }
 }
 //此时表示读到最后一个字节还没有读到\n
 //有两种情况,一种是len不够长,另一种是恰好读完,此时数组的末尾没有\0
 return bytes;
}
int MQTTGPRSEthernet::read(unsigned char* buffer, int len, int timeout)
{
 int bytes = 0;
 uint32_t start = us_ticker_read();
 
 while (bytes < len) {
 if (eth.readable()) {
 char ch = eth.getc();
 buffer[bytes] = ch;
 bytes++;
 }
 else {
 if ((uint32_t)(us_ticker_read() - start) > timeout) {
 return bytes;
 }
 }
 }
 //此时表示读到最后一个字节还没有读到\n
 //有两种情况,一种是len不够长,另一种是恰好读完,此时数组的末尾没有\0
 return bytes;
}
 
int MQTTGPRSEthernet::write(unsigned char* buffer, int len, int timeout)
{
 uint32_t start = us_ticker_read();
 while (!connected)
 {
 if ((us_ticker_read() - start) > timeout)
 {
 return -1;
 }
 initNet();
 }
 for (size_t i = 0; i < len; i++)
 {
 LOGC(buffer[i]);
 eth.putc(buffer[i]);
 } 
 return len;
}
bool MQTTGPRSEthernet::disconnect()
{
 wait_ms(1200);
 eth.printf("+++");
 wait_ms(1200);
 eth.printf("AT\r\n");
 eth.printf("AT+CIPSHUT\r\n");
 connected = false;
 return true;
}
 
MQTTGPRSEthernet::~MQTTGPRSEthernet()
{
 if (connected)
 {
 disconnect();
 }
}

主程序

#include "mbed.h"
 
#define APN         "uninet"
#define USERNAME    NULL
#define PASSWORD    NULL
#define INIT_TIMES  5            //初始化次数
 
#ifdef DEBUG
Serial pc2(PB_10, PB_11, 115200);
#define LOG(args...)    pc2.printf(args)
#else
#define LOG(args...)    (args)
#endif // DEBUG
 
#include "MQTTClient.h"
#include "MQTTmbed.h"
#include "MQTTGPRSEthernet.h"
#include "StringHelper.h"
 
int arrivedcount = 0;
 
void messageArrived(MQTT::MessageData& md)
{ 
 MQTT::Message &message = md.message;
 LOG("Message arrived: qos %d, retained %d, dup %d, packetid %d\n", message.qos, message.retained, message.dup, message.id);
 LOG("Payload %.*s\n", message.payloadlen, (char*)message.payload);
 ++arrivedcount;
}
 
int main(int argc, char* argv[])
{
 MQTTGPRSEthernet ethernet(PB_6, PB_7);
 for (int i = 0; i < INIT_TIMES; i++)
 {
 if (ethernet.initNet(APN, USERNAME, PASSWORD))
 break;
 else 
 LOG("SIM CARD INIT FAILED!\r\n");
 } 
           
 MQTT::Client<MQTTGPRSEthernet, Countdown> client = MQTT::Client<MQTTGPRSEthernet, Countdown>(ethernet);
    
 char* hostname = "iot.eclipse.org";
 int port = 1883;
 for (int i = 0; i < INIT_TIMES; i++)
 {
 if (ethernet.connect(hostname, port)) 
 break; 
 else
 LOG("CAN NOT CONNECT TO SERVER!\r\n");
 }
 
 char* topic = "tson-topic-18669888635";
 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;       
 data.MQTTVersion = 3;
 data.clientID.cstring = "tson-client-18669888635"; 
 int rc;
 for (int i = 0; i < INIT_TIMES; i++)
 {
 if ((rc = client.connect(data)) != 0)
 //无法连接到MQTT服务器
 LOG("CAN NOT CONNECT TO MQTT SERVER!\r\n"); 
 else 
 break; 
 }
 
 if ((rc = client.subscribe(topic, MQTT::QOS0, messageArrived)) != 0)
 //无法订阅
 LOG("CAN NOT SUBSCRIBE THE TOPIC!\r\n");
 
 MQTT::Message message;
 
    // QoS 0
 char buf[100];
 sprintf(buf, "Hello World!  QoS 0 message from tson\r\n");
 message.qos = MQTT::QOS0;
 message.retained = false;
 message.dup = false;
 message.payload = (void*)buf;
 message.payloadlen = strlen(buf) + 1;
 rc = client.publish(topic, message);
 while (arrivedcount == 0)
 client.yield(100000);
        
 // QoS 1
 sprintf(buf, "Hello World!  QoS 1 message from tson\n");
 message.qos = MQTT::QOS1;
 message.payloadlen = strlen(buf) + 1;
 rc = client.publish(topic, message);
 while (arrivedcount == 1)
 client.yield(100000);
        
 // QoS 2
 sprintf(buf, "Hello World!  QoS 2 message from tson\n");
 message.qos = MQTT::QOS2;
 message.payloadlen = strlen(buf) + 1;
 rc = client.publish(topic, message);
 while (arrivedcount == 2)
 client.yield(100000);
    
 if ((rc = client.unsubscribe(topic)) != 0)
 printf("rc from unsubscribe was %d\n", rc);
    
 if ((rc = client.disconnect()) != 0)
 printf("rc from disconnect was %d\n", rc);
    
 ethernet.disconnect(); 
 return 0;
}

client的yield函数中用调用messageArrived函数,之前的示例给出的是100,可能是由于间太短的缘故,总是调用不了回调函数,所以我将其改的非常大,便于调试。实际使用时可以使用1000。

四、总结

其实paho embeded-c用起来还是挺方便的,但是代码的重要性不高,所以移植起来往往会让人无从下手。但深放研究就会发现其实使用起来是非常简单的。

—————————–可爱的分割线——————————————————

有路过的吃瓜群众说想要StringHelper这个类,当时这个类用处不大,里面多数功能都可以用std::string中的功能来实现。不过自己刚从C转过来自己并不太了解,所以自己又写了一个,功能并不是很完善。也分享给大家:StringHelper

原创文章,转载请注明:转载自 TsonTec:测量解决方案提供者

本文链接地址: 在GPRS模块(SIM800C)和STM32芯片上实现MQTT协议


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

为你推荐:

相关软件推荐:

查看所有标签

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

通灵芯片

通灵芯片

Daniel Hillis / 崔良沂 / 上海世纪出版集团 / 2009-1 / 19.80元

本书深入浅出地阐述了计算机科学中许多基本而重要的概念,包括布尔逻辑、有限自动机、编程语言、图灵机的普遍性、信息论、算法、并行计算、量子计算、神经网络、机器学习乃至自组织系统。 作者高屋建瓴式的概括,既不失深度,又妙趣横生,相信读者读后会有很多启发。 目录: 序言:石的奇迹 第一章 通用件 第二章 万能积木 第三章 程序设计 第四章 图灵机的普适性 第......一起来看看 《通灵芯片》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

MD5 加密
MD5 加密

MD5 加密工具

html转js在线工具
html转js在线工具

html转js在线工具