赞
踩
首先,确保您已安装了Go并设置了GOPATH。
使用以下命令安装eclipse/paho.mqtt.golang库:
go get -u github.com/eclipse/paho.mqtt.golang
创建一个名为main.go的文件
package main import ( "fmt" "log" "os" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) func onMessageReceived(client mqtt.Client, message mqtt.Message) { fmt.Printf("接收topic: %s\nMessage: %s\n", message.Topic(), message.Payload()) // 在这里将消息转发回业务平台,您可以根据需要修改此部分 } func main() { opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883") opts.SetClientID("emqx_NTkxOD123213") opts.SetUsername("cs") opts.SetPassword("123456") opts.SetDefaultPublishHandler(onMessageReceived) client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) os.Exit(1) } // 订阅主题 if token := client.Subscribe("/v1/test/2", 0, nil); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) os.Exit(1) } // 发送代码指令 token := client.Publish("/v1/test/3", 0, false, "holle") token.Wait() time.Sleep(3 * time.Second) client.Disconnect(250) }
转换为一个持续运行的应用程序,将main函数中的最后一个client.Disconnect(250)行删除,并添加一个无限循环来保持程序运行。
package main import ( "fmt" "log" "os" "os/signal" "syscall" mqtt "github.com/eclipse/paho.mqtt.golang" ) func onMessageReceived(client mqtt.Client, message mqtt.Message) { fmt.Printf("接收topic: %s\nMessage: %s\n", message.Topic(), message.Payload()) // 在这里将消息转发回业务平台,您可以根据需要修改此部分 } func main() { opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883") opts.SetClientID("emqx_NTkxOD123213") opts.SetUsername("cs") opts.SetPassword("123456") opts.SetDefaultPublishHandler(onMessageReceived) client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) os.Exit(1) } // 订阅主题 if token := client.Subscribe("/v1/test/2", 0, nil); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) os.Exit(1) } // 发送代码指令 token := client.Publish("/v1/test/3", 0, false, "holle") token.Wait() // 处理系统信号,以便在接收到SIGINT或SIGTERM时优雅地关闭程序 signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM) <-signalChan fmt.Println("Received signal, shutting down...") client.Disconnect(250) }
现在,该程序将持续运行,直到接收到中断信号(如通过按Ctrl+C或发送SIGTERM信号)。在接收到信号后,它将断开与MQTT服务器的连接并关闭。将此应用程序部署到服务器上,并将其配置为在后台运行即可。
opts.SetDefaultPublishHandler(onMessageReceived)
会设置一个默认处理器,当没有为特定主题设置处理器时,这个处理器会被调用。
client.Subscribe(topic, qos, handler)
订阅一个主题,并为这个主题设置一个处理器。如果你传递 nil 作为处理器,那么默认处理器会被调用。
client.Subscribe("/v1/test/2", 0, func(client paho.Client, message paho.Message) {
fmt.Printf("特定主题收到消息: 主题 = %s, 信息 = %s\n", message.Topic(), message.Payload())
})
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。