mqtt應(yīng)用于進(jìn)程間通信
前言
上一篇分享了:《簡(jiǎn)單認(rèn)識(shí)認(rèn)識(shí)mqtt及mosquitto》,但也只是分享了mqtt的一些概念及mosquitto的一些介紹。然后就有讀者來(lái)催更了:
這一篇我們就來(lái)分享mqtt應(yīng)用于進(jìn)程間通信的實(shí)例。我們沿用往期文章《例說(shuō)嵌入式實(shí)用知識(shí)之JSON數(shù)據(jù)》的綜合demo來(lái)改造改造。那個(gè)綜合demo的功能是這樣子的:
這是以socket來(lái)作為進(jìn)程間通信的方式,并且這個(gè)demo是基于Windows寫(xiě)的,需要包含Windows特定的頭文件。
本篇筆記我們把上面這個(gè)綜合demo改為:
我們用mqtt來(lái)作為進(jìn)程間通信的方式,在Linux下進(jìn)程測(cè)試。
先貼代碼:
json_print進(jìn)程源碼
「json_print.c:」
左右滑動(dòng)查看全部代碼>>>
/*
-?程序功能:?組JSON格式數(shù)據(jù)包并發(fā)送(MQTT發(fā)布者客戶端程序)
-?編譯命令:?gcc?cJSON.c?json_print.c?-L?../mosquitto/build/lib?-lmosquitto?-o?json_print
-?導(dǎo)出mosquitto動(dòng)態(tài)庫(kù):?export?LD_LIBRARY_PATH=../mosquitto/build/lib:$LD_LIBRARY_PATH
-?作者:ZhengN
-?公眾號(hào):嵌入式大雜燴
*/
#include?
#include?
#include?
#include?"cJSON.h"
#include?"../mosquitto/lib/mosquitto.h"
#define??STU_NAME_LEN??32
/*?學(xué)生結(jié)構(gòu)體?*/
typedef?struct?_Student
{
????char?name[STU_NAME_LEN];??//?名字??????
????int?num;??????????????????//?學(xué)號(hào)??????
????int?c_score;??????????????//?C語(yǔ)言分?jǐn)?shù)
}StudentDef,?*pStudentDef;
/*?內(nèi)部函數(shù)聲明?*/
static?StudentDef?StudentData_Prepare(void);
static?char?*StudentsData_Packet(pStudentDef?_Stu);
static?void?StudentData_Send(const?char?*_data);
static?void?MqttClientInit(void);
static?void?MqttClientClean(void);
bool?clean_session?=?true;
struct?mosquitto?*mosq?=?NULL;
/********************************************************************************************************
**?函數(shù):?main
**------------------------------------------------------------------------------------------------------
**?參數(shù):?
**?說(shuō)明:?
**?返回:?
********************************************************************************************************/
int?main(void)
{
????StudentDef?stu?=?{0};
????char?*stu_data?=?NULL;
????int?stu_count?=?0;
????int?i?=?0;
????/*?MQTT客戶端初始化?*/
????MqttClientInit();
????/*?需要登記的學(xué)生總?cè)藬?shù)?*/
????printf("Please?input?number?of?student:?");
????scanf("%d",?&stu_count);
????while?(i++?????{
????????/*?準(zhǔn)備數(shù)據(jù)?*/
????????stu?=?StudentData_Prepare();
????????/*?JSON格式數(shù)據(jù)組包?*/
????????stu_data?=?StudentsData_Packet(&stu);
????????/*?發(fā)送數(shù)據(jù)?*/
????????StudentData_Send(stu_data);
????}
????/*?回收操作?*/
????MqttClientClean();
????return?0;
}
/********************************************************************************************************
**?函數(shù):?StudentData_Prepare,?準(zhǔn)備組包需要的數(shù)據(jù)
**------------------------------------------------------------------------------------------------------
**?參數(shù):?
**?說(shuō)明:?
**?返回:?獲取得到的數(shù)據(jù)
********************************************************************************************************/
static?StudentDef?StudentData_Prepare(void)
{
????char?name[STU_NAME_LEN]?=?{0};
????int?num?=?0;
????int?c_score?=?0;
????StudentDef?stu;
????/*?名字?*/
????printf("Please?input?name:?");
????scanf("%s",?name);
????if?(strlen(name)?????{
????????strncpy((char*)&stu.name,?name,?strlen(name)+1);
????}
????else
????{
????????printf("The?name?is?too?long\n");
????}
????
????/*?學(xué)號(hào)?*/
????printf("Please?input?num?(0~100):?");
????scanf("%d",?&num);
????stu.num?=?num;
????/*?C語(yǔ)言分?jǐn)?shù)?*/
????printf("Please?input?c_score?(0~100):?");
????scanf("%d",?&c_score);
????stu.c_score?=?c_score;
????return?stu;
}
/********************************************************************************************************
**?函數(shù):?StudentsData_Packet,?JSON格式數(shù)據(jù)組包
**------------------------------------------------------------------------------------------------------
**?參數(shù):?_Stu:組student json數(shù)據(jù)包需要的數(shù)據(jù)
**?說(shuō)明:?
**?返回:?JSON格式的字符串?
********************************************************************************************************/
static?char?*StudentsData_Packet(pStudentDef?_Stu)
{
????char?*res_string?=?NULL;????//?返回值
????cJSON?*name?=?NULL;?????????//?名字
????cJSON?*num?=?NULL;??????????//?學(xué)號(hào)
????cJSON?*c_score?=?NULL;??????//?C語(yǔ)言分?jǐn)?shù)
????/*?創(chuàng)建一個(gè)JSON對(duì)象,{}擴(kuò)起來(lái)?*/
????cJSON?*obj?=?cJSON_CreateObject();
????if?(obj?==?NULL)
????{
????????goto?end;
????}
????/*?創(chuàng)建?"name":?"xxx"?鍵值對(duì)?*/
????name?=?cJSON_CreateString(_Stu->name);
????if?(name?==?NULL)
????{
????????goto?end;
????}
????cJSON_AddItemToObject(obj,?"name",?name);
????/*?創(chuàng)建?"num":?207?鍵值對(duì)?*/
????num?=?cJSON_CreateNumber(_Stu->num);
????if?(name?==?NULL)
????{
????????goto?end;
????}
????cJSON_AddItemToObject(obj,?"num",?num);
????
????/*?創(chuàng)建?"c_score":?95?鍵值對(duì)?*/
????c_score?=?cJSON_CreateNumber(_Stu->c_score);
????if?(name?==?NULL)
????{
????????goto?end;
????}
????cJSON_AddItemToObject(obj,?"c_score",?c_score);?
????res_string?=?cJSON_Print(obj);??????????//?呈現(xiàn)為JSON格式?
????//?res_string?=?cJSON_PrintUnformatted(obj);???//?呈現(xiàn)為無(wú)格式
????if?(res_string?==?NULL)
????{
????????fprintf(stderr,?"Failed?to?print?monitor.\n");
????}
/*?異常情況統(tǒng)一Delete(free)?*/
end:
????cJSON_Delete(obj);
????return?res_string;
}
/********************************************************************************************************
**?函數(shù):?StudentData_Send,?JSON格式字符串?dāng)?shù)據(jù)組包發(fā)送
**------------------------------------------------------------------------------------------------------
**?參數(shù):?_data:要發(fā)送的數(shù)據(jù)
**?說(shuō)明:?
**?返回:?JSON格式的字符串?
********************************************************************************************************/
static?void?StudentData_Send(const?char?*_data)
{
????printf("%s:?%s\n\n",?__FUNCTION__,?_data);
????/*?發(fā)布消息?*/
????mosquitto_publish(mosq,?NULL,?"test_topic",?strlen(_data)+1,?(const?char*)_data,?0,?0);
}
/********************************************************************************************************
**?函數(shù):?MqttClientInit,?MQTT客戶端初始化
**------------------------------------------------------------------------------------------------------
**?參數(shù):?void
**?說(shuō)明:?
**?返回:?
********************************************************************************************************/
static?void?MqttClientInit(void)
{
????/*?libmosquitto?庫(kù)初始化?*/
????mosquitto_lib_init();
????/*?創(chuàng)建mosquitto客戶端?*/
????mosq?=?mosquitto_new(NULL,?clean_session,?NULL);
????if(NULL?==?mosq)
????{
????????printf("Create?mqtt?client?failed...\n");
????????mosquitto_lib_cleanup();
????????return;
????}
????/*?連接服務(wù)器?*/
????if(mosquitto_connect(mosq,?"localhost",?1883,?60))
????{
????????printf("Unable?to?connect...\n");
????????return;
????}
????/*?網(wǎng)絡(luò)消息處理線程?*/
????int?loop?=?mosquitto_loop_start(mosq);
????if(loop?!=?MOSQ_ERR_SUCCESS)
????{
????????printf("mosquitto?loop?error\n");
????????return;
????}
}
/********************************************************************************************************
**?函數(shù):?MqttClientClean,?MQTT客戶端清理操作
**------------------------------------------------------------------------------------------------------
**?參數(shù):?void
**?說(shuō)明:?
**?返回:?
********************************************************************************************************/
static?void?MqttClientClean(void)
{
????mosquitto_destroy(mosq);
????mosquitto_lib_cleanup();
}
json_parse進(jìn)程源碼
「json_parse.c:」
左右滑動(dòng)查看全部代碼>>>
/*
-?程序功能:?接收J(rèn)SON數(shù)據(jù)并解析(MQTT訂閱者客戶端程序)
-?編譯命令:?gcc?cJSON.c?json_parse.c?-L?../mosquitto/build/lib?-lmosquitto?-o?json_parse
-?導(dǎo)出mosquitto動(dòng)態(tài)庫(kù):?export?LD_LIBRARY_PATH=../mosquitto/build/lib:$LD_LIBRARY_PATH
-?作者:ZhengN
-?公眾號(hào):嵌入式大雜燴
*/
#include?
#include?
#include?
#include?"cJSON.h"
#include?"../mosquitto/lib/mosquitto.h"
#define??STU_NAME_LEN??32
/*?學(xué)生結(jié)構(gòu)體?*/
typedef?struct?_Student
{
????char?name[STU_NAME_LEN];??//?名字??????
????int?num;??????????????????//?學(xué)號(hào)??????
????int?c_score;??????????????//?C語(yǔ)言分?jǐn)?shù)
}StudentDef,?*pStudentDef;
/*?內(nèi)部函數(shù)聲明?*/
static?void?StudentsData_Parse(pStudentDef?_Stu,?const?char?*_JsonStudnetData);
static?void?PrintParseResult(const?pStudentDef?_Stu);
static?void?SaveParseResult(const?pStudentDef?_Stu);
static?void?my_message_callback(struct?mosquitto?*mosq,?void?*userdata,?const?struct?mosquitto_message?*message);
static?void?my_connect_callback(struct?mosquitto?*mosq,?void?*userdata,?int?result);
/*?內(nèi)部全局變量?*/
static?FILE?*stu_fp?=?NULL;
/********************************************************************************************************
**?函數(shù):?main
**------------------------------------------------------------------------------------------------------
**?參數(shù):?
**?說(shuō)明:?
**?返回:?
********************************************************************************************************/
bool?clean_session?=?true;
int?main(void)
{?????????
????struct?mosquitto?*mosq?=?NULL;
????/*?libmosquitto?庫(kù)初始化?*/
????mosquitto_lib_init();
????/*?創(chuàng)建mosquitto客戶端?*/
????mosq?=?mosquitto_new(NULL,?clean_session,?NULL);
????if(NULL?==?mosq)
????{
????????printf("Create?mqtt?client?failed...\n");
????????mosquitto_lib_cleanup();
????????return?1;
????}
????/*?綁定連接、消息接收回調(diào)函數(shù)?*/
????mosquitto_connect_callback_set(mosq,?my_connect_callback);
????mosquitto_message_callback_set(mosq,?my_message_callback);
????/*?連接服務(wù)器?*/
????if(mosquitto_connect(mosq,?"localhost",?1883,?60))
????{
????????printf("Unable?to?connect...\n");
????????return?1;
????}
????/*?循環(huán)處理網(wǎng)絡(luò)消息?*/
????mosquitto_loop_forever(mosq,?-1,?1);
????/*?回收操作?*/
????mosquitto_destroy(mosq);
????mosquitto_lib_cleanup();
????return?0;
}
/********************************************************************************************************
**?函數(shù):?StudentsData_Parse,?JOSN格式學(xué)生期末數(shù)據(jù)解析
**------------------------------------------------------------------------------------------------------
**?參數(shù):?_JsonStudnetData:JSON數(shù)據(jù)???_Stu:保存解析出的有用數(shù)據(jù)
**?說(shuō)明:?
**?返回:?
********************************************************************************************************/
static?void?StudentsData_Parse(pStudentDef?_Stu,?const?char?*_JsonStudnetData)
{
????cJSON?*student_json?=?NULL;???//?student_json操作對(duì)象,可代表?{}?擴(kuò)起來(lái)的內(nèi)容
????cJSON?*name?=?NULL;?????????????
????cJSON?*num?=?NULL;
????cJSON?*c_score?=?NULL;
????
????/*?開(kāi)始解析?*/
????student_json?=?cJSON_Parse(_JsonStudnetData);
????if?(NULL?==?student_json)
????{
????????const?char?*error_ptr?=?cJSON_GetErrorPtr();
????????if?(error_ptr?!=?NULL)
????????{
????????????fprintf(stderr,?"Error?before:?%s\n",?error_ptr);
????????}
????????goto?end;
????}
????/*?解析獲取name得值?*/
????name?=?cJSON_GetObjectItemCaseSensitive(student_json,?"name");
????if?(cJSON_IsString(name)?&&?(name->valuestring?!=?NULL))
????{
????????memset(&_Stu->name,?0,?STU_NAME_LEN*sizeof(char));
????????memcpy(&_Stu->name,?name->valuestring,?strlen(name->valuestring));
????}
????/*?解析獲取num的值?*/
????num?=?cJSON_GetObjectItemCaseSensitive(student_json,?"num");
????if?(cJSON_IsNumber(num))
????{
????????_Stu->num?=?num->valueint;
????}
????/*?解析獲取c_score的值?*/
????c_score?=?cJSON_GetObjectItemCaseSensitive(student_json,?"c_score");
????if?(cJSON_IsNumber(c_score))
????{
????????_Stu->c_score?=?c_score->valueint;
????}
end:
????cJSON_Delete(student_json);
}
/********************************************************************************************************
**?函數(shù):?PrintParseResult,?打印輸出解析結(jié)果
**------------------------------------------------------------------------------------------------------
**?參數(shù):?
**?說(shuō)明:?
**?返回:?
********************************************************************************************************/
static?void?PrintParseResult(const?pStudentDef?_Stu)
{
????printf("name:?%s,?num:?%d,?c_score:?%d\n\n",?_Stu->name,?_Stu->num,?_Stu->c_score);
}
/********************************************************************************************************
**?函數(shù):?SaveParseResult,?保存解析結(jié)果
**------------------------------------------------------------------------------------------------------
**?參數(shù):?_Stu:需要保存的數(shù)據(jù)
**?說(shuō)明:?
**?返回:?
********************************************************************************************************/
static?void?SaveParseResult(const?pStudentDef?_Stu)
{
????char?write_buf[512]?=?{0};
????static?int?stu_count?=?0;
????/*?以可在文件末尾追加內(nèi)容的方式打開(kāi)文件?*/
?if((stu_fp?=?fopen("ParseResult.txt",?"a+"))?==?NULL)
?{
??printf("Open?file?error!\n");
??return?exit(EXIT_FAILURE);
?}?
????/*?按指定格式寫(xiě)入文件?*/
????snprintf(write_buf,?512,?"name:?%s,?num:?%d,?c_score:?%d\n",?_Stu->name,?_Stu->num,?_Stu->c_score);
????size_t?len?=?fwrite((char*)write_buf,?1,?strlen(write_buf),?stu_fp);
????/*?文件位置指針偏移?*/
????fseek(stu_fp,?len?*?stu_count,?SEEK_SET);
????stu_count++;
????/*?關(guān)閉文件?*/
????fclose(stu_fp);
}
/********************************************************************************************************
**?函數(shù):?my_message_callback,?消息接收回調(diào)函數(shù)
**------------------------------------------------------------------------------------------------------
**?參數(shù):?
**?返回:?
********************************************************************************************************/
static?void?my_message_callback(struct?mosquitto?*mosq,?void?*userdata,?const?struct?mosquitto_message?*message)
{
????StudentDef?stu?=?{0};
????if(message->payloadlen)
????{
????????printf("%s?%s\n",?message->topic,?(char*)message->payload);
????????/*?解析JSON數(shù)據(jù)?*/
????????StudentsData_Parse(&stu,?(const?char*)message->payload);??
????????/*?打印輸出解析結(jié)果?*/
????????PrintParseResult(&stu);??
????????/*?保存數(shù)據(jù)到文件?*/
????????SaveParseResult(&stu);?
????}
????else
????{
????????printf("%s?(null)\n",?message->topic);
????}
????fflush(stdout);
}
/********************************************************************************************************
**?函數(shù):?my_connect_callback,?連接回調(diào)函數(shù)
**------------------------------------------------------------------------------------------------------
**?參數(shù):?
**?返回:?
********************************************************************************************************/
static?void?my_connect_callback(struct?mosquitto?*mosq,?void?*userdata,?int?result)
{
????if(!result)
????{
????????/*?訂閱test_topic主題的消息?*/
????????mosquitto_subscribe(mosq,?NULL,?"test_topic",?0);
????}
????else
????{
????????fprintf(stderr,?"Connect?failed\n");
????}
}
編譯運(yùn)行
1、編譯生成json_parse、json_print程序:
左右滑動(dòng)查看全部代碼>>>
gcc?cJSON.c?json_parse.c?-L?../mosquitto/build/lib?-lmosquitto?-o?json_parse
gcc?cJSON.c?json_print.c?-L?../mosquitto/build/lib?-lmosquitto?-o?json_print????
這里用到鏈接動(dòng)態(tài)庫(kù)的方式生成可執(zhí)行程序。關(guān)于動(dòng)態(tài)鏈接與靜態(tài)鏈接,可查看往期筆記:《靜態(tài)鏈接與動(dòng)態(tài)鏈接補(bǔ)充(Linux)》、《什么是動(dòng)態(tài)鏈接與靜態(tài)鏈接?》。
2、執(zhí)行json_parse、json_print程序:
執(zhí)行這兩個(gè)程序會(huì)報(bào)錯(cuò):
./json_parse:?error?while?loading?shared?libraries:?libmosquitto.so.1:?cannot?open?shared?object?file:?No?such?file?or?directory
./json_print:?error?while?loading?shared?libraries:?libmosquitto.so.1:?cannot?open?shared?object?file:?No?such?file?or?directory
這是因?yàn)?不能找到共享庫(kù)文件libmosquitto.so.1
,加載失敗。
因?yàn)橐话闱闆r下Linux會(huì)在/usr/lib
路徑中搜索需要用到的庫(kù),而libmosquitto.so.1
庫(kù)并不在這個(gè)路徑下。
解決方法有兩種:一種就是把這個(gè)文件拷貝至/usr/lib
路徑下,但是一般不允許這樣做,一般用戶也不允許往這個(gè)路徑里拷貝東西。另一種就是把libmosquitto.so.1
庫(kù)所在路徑增加為動(dòng)態(tài)庫(kù)的搜索路徑,命令為:
左右滑動(dòng)查看全部代碼>>>
export?LD_LIBRARY_PATH=../mosquitto/build/lib:$LD_LIBRARY_PATH
關(guān)于這方面的說(shuō)明可以閱讀往期筆記:《靜態(tài)鏈接與動(dòng)態(tài)鏈接補(bǔ)充(Linux)》
按照上訴方法添加動(dòng)態(tài)庫(kù)搜索路徑之后就可以正常運(yùn)行這兩個(gè)程序:
ParseResult.txt文本里得到:
實(shí)驗(yàn)成功!
以上就是本次的分享,代碼寫(xiě)得比較倉(cāng)促,如有錯(cuò)誤,歡迎指出,謝謝!由于準(zhǔn)備demo花了挺多時(shí)間,包括注釋也寫(xiě)了很多。
所以本篇文章就不做過(guò)多的說(shuō)明,感興趣的朋友可以結(jié)合本篇文章的demo及mosquitto/client/pub_client.c
、mosquitto/client/sub_client.c
這兩個(gè)源文件。
本篇文章的demo:
可在本公眾號(hào)聊天界面回復(fù)關(guān)鍵詞:json_mqtt_demo
,即可獲取,若無(wú)法獲取可聯(lián)系我進(jìn)行獲取。
推薦資料:https://blog.csdn.net/Dancer__Sky/article/details/77855249
最近加群人有點(diǎn)多,兩個(gè)群都以加滿,現(xiàn)建③群,感興趣可自行加入:
猜你喜歡
簡(jiǎn)單認(rèn)識(shí)認(rèn)識(shí)mqtt及mosquitto
什么是Linux內(nèi)核空間與用戶空間?
1024G 嵌入式資源大放送!包括但不限于C/C++、單片機(jī)、Linux等。在公眾號(hào)聊天界面回復(fù)1024,即可免費(fèi)獲??!
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。文章僅代表作者個(gè)人觀點(diǎn),不代表本平臺(tái)立場(chǎng),如有問(wèn)題,請(qǐng)聯(lián)系我們,謝謝!