当前位置:   article > 正文

C#实现数据采集系统-数据反写(1)MQTT订阅接收消息

C#实现数据采集系统-数据反写(1)MQTT订阅接收消息

C#实现数据采集系统-数据反写

实现步骤

  1. MQTT订阅,接收消息
  2. 反写内容写入通信类,添加到写入队列中 链接-消息内容处理和写入通信类队列
  3. 实现Modbustcp通信写入

具体实现

1.MQTT订阅,接收消息

Mqtt实现采集数据转发

Mqtt控制类增加订阅方法

  1. 增加一个通用的订阅方法,需要的参数是一个主题和一个委托,将主题跟对应的委托方法对应存储,然后再mqtt中订阅,收到对应的主题消息,然后执行对应的方法。
 public void SubscribeTopic(string topic, Action<string, string> topicAction)
 {
     //订阅
 }
  • 1
  • 2
  • 3
  • 4

然后需要一个键值对用于存储这个关系

 private Dictionary<string, Action<string, string>> _topicActions;
  • 1

订阅方法实现:订阅主题,添加到_topicActions,如果已经连接,则直接订阅,没有连接,则等待连上的时候自动订阅,增加锁来确保订阅成功

/// <summary>
/// 订阅主题,添加到_topicActions,如果已经连接,则直接订阅
/// </summary>
/// <param name="topic"></param>
/// <param name="topicAction"></param>
public void SubscribeTopic(string topic, Action<string, string> topicAction)
{
    
    lock (_topicActionsLock)
    {
        if (!_topicActions.ContainsKey(topic))
        {
            _topicActions.Add(topic, topicAction);
            if (_mqttClient.IsConnected)
            {
                _mqttClient.SubscribeAsync(topic);
            }
        }
    }
    
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

在连接方法中,添加订阅

在这里插入图片描述

public void MqttConnect()
{
    while (!_mqttClient.IsConnected)
    {
        try
        {
            Console.WriteLine($"正在连接……");
            _mqttClient.ConnectAsync(_clientOptions).GetAwaiter().GetResult();
        }
        catch (Exception ex)
        {
            Task.Delay(1000).Wait();
            Console.WriteLine("连接mqtt服务器失败");
        }
    }
    lock (_topicActionsLock)
    {
        foreach (var item in _topicActions)
        {
            _mqttClient.SubscribeAsync(item.Key);
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  1. 添加接收消息事件
 //客户端接收消息事件
 _mqttClient.ApplicationMessageReceivedAsync +=
     MqttClient_ApplicationMessageReceivedAsync;
     
     

  /// <summary>
  /// 接收消息
  /// </summary>
  /// <param name="args"></param>
  /// <returns></returns>
  private async Task MqttClient_ApplicationMessageReceivedAsync(
      MqttApplicationMessageReceivedEventArgs args
  )
  {
      try
      {
          Console.WriteLine($"收到消息:{args.ApplicationMessage.Topic}");
          if (_topicActions.ContainsKey(args.ApplicationMessage.Topic))
          {
              _topicActions[args.ApplicationMessage.Topic]
                  .Invoke(
                      args.ApplicationMessage.Topic,
                      Encoding.UTF8.GetString(args.ApplicationMessage.Payload)
                  );
          }
      }
      catch (Exception ex)
      {
          Console.WriteLine(ex.Message);
      }
  }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

完整Mqtt代码

 public class MqttControllor
 {
     private MqttConfig _config;
     private string _clientId;
     MqttClientOptions _clientOptions;

     private IMqttClient _mqttClient;

     private readonly object _topicActionsLock = new object();
     private Dictionary<string, Action<string, string>> _topicActions;

     public MqttControllor(MqttConfig config, bool isAutoConnect = true)
     {
         _topicActions = new Dictionary<string, Action<string, string>>();

         _config = config;
         _clientId = config.ClientId == "" ? Guid.NewGuid().ToString() : config.ClientId;
         MqttClientOptionsBuilder optionsBuilder = new MqttClientOptionsBuilder()
             .WithTcpServer(_config.Ip, _config.Port)
             .WithCredentials(_config.Username, _config.Password)
             .WithClientId(_clientId);

         _clientOptions = optionsBuilder.Build();

         _mqttClient = new MqttFactory().CreateMqttClient();

         // 客户端连接关闭事件
         _mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;
         //客户端接收消息事件
         _mqttClient.ApplicationMessageReceivedAsync +=
             MqttClient_ApplicationMessageReceivedAsync;
         if (isAutoConnect)
         {
             Task.Run(() =>
             {
                 MqttConnect();
             });
         }
     }

     /// <summary>
     /// 接收消息
     /// </summary>
     /// <param name="args"></param>
     /// <returns></returns>
     private async Task MqttClient_ApplicationMessageReceivedAsync(
         MqttApplicationMessageReceivedEventArgs args
     )
     {
         try
         {
             Console.WriteLine($"收到消息:{args.ApplicationMessage.Topic}");
             if (_topicActions.ContainsKey(args.ApplicationMessage.Topic))
             {
                 _topicActions[args.ApplicationMessage.Topic]
                     .Invoke(
                         args.ApplicationMessage.Topic,
                         Encoding.UTF8.GetString(args.ApplicationMessage.Payload)
                     );
             }
         }
         catch (Exception ex)
         {
             Console.WriteLine(ex.Message);
         }
     }

     private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
     {
         Console.WriteLine($"客户端已断开与服务端的连接……");
         //断开重连
         _mqttClient = new MqttFactory().CreateMqttClient();
         MqttConnect();
         return Task.CompletedTask;
     }

     public void MqttConnect()
     {
         while (!_mqttClient.IsConnected)
         {
             try
             {
                 Console.WriteLine($"正在连接……");
                 _mqttClient.ConnectAsync(_clientOptions).GetAwaiter().GetResult();
             }
             catch (Exception ex)
             {
                 Task.Delay(1000).Wait();
                 Console.WriteLine("连接mqtt服务器失败");
             }
         }
         Console.WriteLine($"客户端已连接到服务端……");
         //连接成功,订阅主题
         lock (_topicActionsLock)
         {
             foreach (var item in _topicActions)
             {
                 _mqttClient.SubscribeAsync(item.Key);
             }
         }
     }

     /// <summary>
     /// 订阅主题,添加到_topicActions,如果已经连接,则直接订阅
     /// </summary>
     /// <param name="topic"></param>
     /// <param name="topicAction"></param>
     public void SubscribeTopic(string topic, Action<string, string> topicAction)
     {
         lock (_topicActionsLock)
         {
             if (!_topicActions.ContainsKey(topic))
             {
                 _topicActions.Add(topic, topicAction);
                 if (_mqttClient.IsConnected)
                 {
                     _mqttClient.SubscribeAsync(topic);
                 }
             }
         }
     }

     /// <summary>
     /// 推送消息
     /// </summary>
     /// <param name="topic">主题</param>
     /// <param name="data">消息内容</param>
     /// <param name="qsLevel"></param>
     /// <param name="retain"></param>
     public void Publish(string topic, string data, int qsLevel = 0, bool retain = false)
     {
         qsLevel = Math.Clamp(qsLevel, 0, 2);

         if (!_mqttClient.IsConnected)
         {
             throw new Exception("mqtt未连接");
         }
         var message = new MqttApplicationMessage
         {
             Topic = topic,
             PayloadSegment = Encoding.UTF8.GetBytes(data),
             QualityOfServiceLevel = (MqttQualityOfServiceLevel)qsLevel,
             Retain = retain // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。
         };

         _mqttClient.PublishAsync(message);
     }
 }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/1020837
推荐阅读
相关标签
  

闽ICP备14008679号