段时间在使用MQTTnet,都说这个东西比较好,可是翻了翻网上没有例子给参考一下。
今天算是找到了,给高手的帖子做个宣传吧.
原网址如下:https://blog.csdn.net/chenlu5201314/article/details/94740765
由于GitHub上介绍的东西比较少,以我的水平真是不知道怎么用,先照葫芦画瓢,再看看怎么回事吧:
功能:
把订阅与发布做成一个类,还带有自动重连的功能
using System.Threading;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client; //客户端需要用到
using MQTTnet.Client.Options; //具体连接时需要用到的属性,ID的名称,要连接Server的名称,接入时用到的账号和密码,掉线时是否重新清除原有名称,还有许多…
using MQTTnet.Packets; //这个没用上
using MQTTnet.Protocol; //这个也没用上
using MQTTnet.Client.Receiving; //接收
using MQTTnet.Client.Disconnecting; //断线
using MQTTnet.Client.Connecting; //连接
新建一个类:先写一下变量和一些字段
class HOSMQTT
{
private static MqttClient mqttClient = null;
private static IMqttClientOptions options = null;
private static bool runState = false;
private static bool running = false;
///
private static string ServerUrl = "182.61.51.85";
///
private static int Port = ;
///
private static string Password = "ruichi8888";
///
private static string UserId = "admin";
///
private static string Topic = "China/Hunan/Yiyang/Nanxian";
///
private static bool Retained = false;
///
private static int QualityOfServiceLevel = ;
}
先看一下Start方法
public static void Start()
{
try
{
runState = true;
Thread thread = new Thread(Work); //原帖中是这样写的 Thread thread = new Thread(new ThreadStart( Work));
thread.IsBackground = true;
thread.Start();
}
catch (Exception ex)
{
Console.WriteLine( "启动客户端出现问题:" + ex.ToString());
}
}
没进入正题之前,先普及一下基本知识
具体请看下面的连接
https://www.cnblogs.com/rosesmall/p/8358348.html
进入整体,介绍连接方法 Work
private static void Work()
{
running = true;
Console.WriteLine("Work >>Begin");
try
{
var factory = new MqttFactory(); //声明一个MQTT客户端的标准步骤 的第一步
mqttClient = factory.CreateMqttClient() as MqttClient; //factory.CreateMqttClient()实际是一个接口类型(IMqttClient),这里是把他的类型变了一下
options = new MqttClientOptionsBuilder() //实例化一个MqttClientOptionsBulider
.WithTcpServer(ServerUrl, Port)
.WithCredentials(UserId, Password)
.WithClientId("XMan")
.Build();
mqttClient.ConnectAsync(options); //连接服务器
//下面这些东西是什么,为什么要这么写,直到刚才我还是不懂,不过在GitHub的网址我发现了出处.
mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(new Func
mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(new Func
mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(new Action
while (runState)
{
Thread.Sleep();
}
}
catch(Exception exp)
{
Console.WriteLine(exp);
}
Console.WriteLine("Work >>End");
running = false;
runState = false;
}
先来看看MqttClient 类里面都有什么东西
需要实现的接口,如何实现,说重点!
在GitHub上有个地方进去看看就知道了‘
这个页面的最下方写着如何实现 https://github.com/chkr1011/MQTTnet/wiki/Upgrading-guide
private void Something()
{
mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnAppMessage);
mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected);
mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected);
}
private async void OnAppMessage(MqttApplicationMessageReceivedEventArgs e)
{
}
private async void OnConnected(MqttClientConnectedEventArgs e)
{
}
private async void OnDisconnected(MqttClientDisconnectedEventArgs e)
{
}
在开始Connected方法之前有必要看一下关于同步和异步的知识,
现学现卖简单说一下:
Task就是异步的调用,就在不影响主线程运行的另一个线程,但是他能像线程池一样更高效的利用现有的空闲线程
async必须用来修饰Task ,void,或者Task
记住要是Task 实现异步功能,必须用 async 修饰,且async 与await成对出现。
详见下面大神写的大作:https://www.cnblogs.com/doforfuture/p/6293926.html
下面是什么意思?
mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(new Func
MqttClientConnectedHandlerDelegate 这个实例实现了mqttClient.ConnectedHandler接口
new Func
使用Func委托传入MqttClientConnectedEventArgs类型的参数,返回的类型是Task,Task是一个类,这个类没有返回值,如果有返回值就是Task
是委托就要带一个方法取实现,这个方法就是Connected。
这句话的意思是,用MqttClientConnectedHandlerDelegate实现接口,同时使用委托取调用Connected的方法,并且给这个方法传入一个MqttClientConnectedEventArgs参数,
这个委托的返回值是Task(就是不需要返回类型的异步调用),这也就定义了Connected的类型必须是async Task。
好了来看下 Connected,这个函数什么意思
就是与服务器连接之后要干什么,订阅一个Topic,或几个Topic。连接之前已经连接了Connectasync(),如果断线还会重连,后面会提到。
这个就连接之后需要做的事----订阅!
private static async Task Connected(MqttClientConnectedEventArgs e)
{
try
{
List<TopicFilter> listTopic = new List<TopicFilter>();
if (listTopic.Count() <= )
{
var topicFilterBulder = new TopicFilterBuilder().WithTopic(Topic).Build();
listTopic.Add(topicFilterBulder);
Console.WriteLine("Connected >>Subscribe " + Topic);
} await mqttClient.SubscribeAsync(listTopic.ToArray());
Console.WriteLine("Connected >>Subscribe Success");
}
catch (Exception exp)
{
Console.WriteLine(exp.Message);
}
}
TopicFilter是一个Topic详细信息的类
掉线的发生时会执行这个函数
private static async Task Disconnected(MqttClientDisconnectedEventArgs e)
{
try
{
Console.WriteLine("Disconnected >>Disconnected Server");
await Task.Delay(TimeSpan.FromSeconds());
try
{
await mqttClient.ConnectAsync(options);
}
catch (Exception exp)
{
Console.WriteLine("Disconnected >>Exception " + exp.Message);
}
}
catch (Exception exp)
{
Console.WriteLine(exp.Message);
}
}
越写问题越多,这个为什么断线的时候会执行这个方法,这不是事件,只是接口!
怎么实现的?看了一下源码,一时只看了大概,这些功能的绑定都是在ConnectAsync的时候就完成了!
下面接收到消息的时候
/// <summary>
/// 接收消息触发事件
/// </summary>
/// <param name="e"></param>
private static void MqttApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
try
{
string text = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
string Topic = e.ApplicationMessage.Topic; string QoS = e.ApplicationMessage.QualityOfServiceLevel.ToString();
string Retained = e.ApplicationMessage.Retain.ToString();
Console.WriteLine("MessageReceived >>Topic:" + Topic + "; QoS: " + QoS + "; Retained: " + Retained + ";");
Console.WriteLine("MessageReceived >>Msg: " + text);
}
catch (Exception exp)
{
Console.WriteLine(exp.Message);
}
}
最后就是发布:一般会选择0,如果选择其他的情况在订阅端不在的时候,服务器可能会崩溃
///
/// 发布主题
/// 发布内容
///
public static void Publish( string Topic,string Message)
{
try
{
if (mqttClient == null)
return;
if (mqttClient.IsConnected == false)
mqttClient.ConnectAsync(options);
if (mqttClient.IsConnected == false)
{
Console.WriteLine("Publish >>Connected Failed! ");
return;
}
Console.WriteLine("Publish >>Topic: " + Topic + "; QoS: " + QualityOfServiceLevel + "; Retained: " + Retained + ";");
Console.WriteLine("Publish >>Message: " + Message);
MqttApplicationMessageBuilder mamb = new MqttApplicationMessageBuilder()
.WithTopic(Topic)
.WithPayload(Message).WithRetainFlag(Retained);
if (QualityOfServiceLevel == )
{
mamb = mamb.WithAtMostOnceQoS();
}
else if (QualityOfServiceLevel == )
{
mamb = mamb.WithAtLeastOnceQoS();
}
else if (QualityOfServiceLevel == )
{
mamb = mamb.WithExactlyOnceQoS();
}
mqttClient.PublishAsync(mamb.Build());
}
catch (Exception exp)
{
Console.WriteLine("Publish >>" + exp.Message);
}
}
重点补充一下:
这两天验证的时候发现,对于MQTT服务器来说客户端的用户名必须是唯一的,
举例:同一台电脑上,两个程序同时发布(publish)到一个MQTT服务器,必须设置两个不同的ClientId,否则只有一个能连接上。
Paho的使用:
具体说明也可以看这位高手的:
https://blog.csdn.net/weixin_42133779/article/details/80226633
只是有一点需要强调以下:
Paho的目录不要太深,之前我就是三层文件夹下面,结果无法运行。一直提示 error code=13
没办法把他放在D盘的根目录下就可以了
纸上得来终觉浅,要改造成自己想要的些东西,还要花些功夫!不过这已经很好了!谢谢各位高手的贡献
手机扫一扫
移动阅读更方便
你可能感兴趣的文章