mqttadmin.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package mqtt
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "rtzh_elec_temperature/conf"
  8. "rtzh_elec_temperature/logger"
  9. "rtzh_elec_temperature/tools"
  10. "strconv"
  11. "strings"
  12. "time"
  13. //import the Paho Go MQTT library
  14. MQTT "github.com/eclipse/paho.mqtt.golang"
  15. "github.com/streadway/amqp"
  16. //"golang.org/x/net/proxy"
  17. )
  18. //数据消息通道
  19. var MqttMessageChan = make(chan string, 1000)
  20. var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
  21. mqttDeal := new(ReceiveDataMessage)
  22. mqttDeal.Receive(msg.Topic(), string(msg.Payload()))
  23. }
  24. //接收rabbitmq的消息
  25. //接收到消息后通过messageChan通道传递给数据库处理过程
  26. var frabbit = func(msg amqp.Delivery) {
  27. msgStr := fmt.Sprintf("%s|%s", msg.RoutingKey, string(msg.Body))
  28. fmt.Println(tools.NowTime() + " ==== received data:" + msgStr)
  29. MqttMessageChan <- msgStr
  30. }
  31. var (
  32. url string
  33. topic string
  34. cnffile string
  35. )
  36. //连接RabbitMQ时,topic属性值为router key
  37. //仅RabbitMQ需要指定exchangename属性
  38. type Config struct {
  39. Mqtt string `json:"mqttname"`
  40. Host string `json:"host"`
  41. Port int `json:"port,string"`
  42. User string `json:"user"`
  43. Pwd string `json:"pwd"`
  44. Topic string `json:"topic"`
  45. ExchangeName string `json:"exchangename"`
  46. Clientid string `json:"clientid"`
  47. ExtShell_webtest string `json:"extshell_webtest"`
  48. }
  49. var rabbitMqClient *MessagingClient
  50. var Mqttserverconn MQTT.Client
  51. func failOnError(err error, msg string) {
  52. if err != nil {
  53. fmt.Println("%s: %s", msg, err)
  54. os.Exit(1)
  55. }
  56. }
  57. func Start() {
  58. cnffile = "conf/mqtt.cnf"
  59. fmt.Println(tools.NowTime()+" 加载MQTT配置文件", cnffile)
  60. fileHanlder, err := os.Open(cnffile)
  61. if err != nil {
  62. fmt.Println(tools.NowTime()+" 配置文件mqtt.cnf:", err)
  63. return
  64. }
  65. txt, _ := ioutil.ReadAll(fileHanlder)
  66. fileHanlder.Close()
  67. txtStr := string(txt)
  68. if txtStr == "" {
  69. fmt.Println(tools.NowTime() + " 配置文件mqtt.cnf配置不正确,请正确配置")
  70. return
  71. }
  72. cfgdata := Config{}
  73. err = json.Unmarshal(txt, &cfgdata)
  74. if err != nil {
  75. fmt.Println(tools.NowTime()+" MQTT配置文件解析失败:", err)
  76. return
  77. }
  78. if cfgdata.Topic == "" {
  79. fmt.Println(tools.NowTime() + " 注意:当前未订阅任何主题,是否需要修复?")
  80. }
  81. if cfgdata.Mqtt == "" {
  82. cfgdata.Mqtt = "MQTT"
  83. }
  84. fmt.Println(tools.NowTime() + " 正在连接" + cfgdata.Mqtt + "服务器" + cfgdata.Host + ":" + strconv.Itoa(cfgdata.Port))
  85. if cfgdata.Mqtt == "rabbitmq" {
  86. RabbitMQ(cfgdata)
  87. } else {
  88. Mqtt(cfgdata)
  89. }
  90. //go PublishMessage()
  91. /*
  92. go func() {
  93. //向通道模拟写入数据
  94. for i := 0; i < 10; i++ {
  95. msg := "msg id:" + strconv.Itoa(i)
  96. messageChan <- msg
  97. fmt.Println(NowTime() + " " + strconv.Itoa(len(messageChan)))
  98. time.Sleep(1 * time.Second)
  99. }
  100. }()
  101. */
  102. }
  103. func RabbitMQ(cfgdata Config) {
  104. if cfgdata.ExchangeName == "" {
  105. fmt.Println("exchangename属性未配置")
  106. os.Exit(1)
  107. }
  108. //rabbitMqClient = new(MessagingClient)
  109. connstr := fmt.Sprintf("amqp://%s:%s@%s:%s", cfgdata.User, cfgdata.Pwd, cfgdata.Host, strconv.Itoa(cfgdata.Port))
  110. rbconn, err := amqp.Dial(fmt.Sprintf("%s/", connstr))
  111. failOnError(err, "rabbitMQ["+connstr+"] 连接失败")
  112. //rabbitMqClient.ConnectToRabbitBroker(connstr)
  113. RabbitMQSubscribe(rbconn, cfgdata.ExchangeName, "topic", cfgdata.Topic, "", frabbit)
  114. }
  115. func RabbitMQSubscribe(mconn *amqp.Connection, exchangeName string, exchangeType string, routerkey string, consumerName string, handlerFunc func(amqp.Delivery)) error {
  116. if mconn == nil {
  117. panic("Tried to Subscribe before connection was initialized. Don't do that.")
  118. }
  119. ch, err := mconn.Channel()
  120. failOnError(err, "Failed to open a channel")
  121. // defer ch.Close()
  122. err = ch.ExchangeDeclare(
  123. exchangeName, // name of the exchange
  124. exchangeType, // type
  125. true, // durable
  126. false, // delete when complete
  127. false, // internal
  128. false, // noWait
  129. nil, // arguments
  130. )
  131. failOnError(err, "Failed to register an Exchange")
  132. fmt.Printf("declared Exchange, declaring Queue (%s)", "")
  133. queue, err := ch.QueueDeclare(
  134. "", // name of the queue
  135. false, // durable
  136. false, // delete when usused
  137. false, // exclusive
  138. false, // noWait
  139. nil, // arguments
  140. )
  141. failOnError(err, "Failed to register an Queue")
  142. fmt.Printf("declared Queue (%d messages, %d consumers), binding to Exchange (key '%s')",
  143. queue.Messages, queue.Consumers, exchangeName)
  144. for _, v := range strings.Split(routerkey, ",") {
  145. err = ch.QueueBind(
  146. queue.Name, // name of the queue
  147. v, // bindingKey
  148. exchangeName, // sourceExchange
  149. false, // noWait
  150. nil, // arguments
  151. )
  152. if err != nil {
  153. return fmt.Errorf("Queue Bind: %s", err)
  154. }
  155. }
  156. msgs, err := ch.Consume(
  157. queue.Name, // queue
  158. consumerName, // consumer
  159. true, // auto-ack
  160. false, // exclusive
  161. false, // no-local
  162. false, // no-wait
  163. nil, // args
  164. )
  165. failOnError(err, "Failed to register a consumer")
  166. go consumeLoop2(msgs, handlerFunc)
  167. return nil
  168. }
  169. func consumeLoop2(deliveries <-chan amqp.Delivery, handlerFunc func(d amqp.Delivery)) {
  170. for d := range deliveries {
  171. // Invoke the handlerFunc func we passed as parameter.
  172. handlerFunc(d)
  173. }
  174. }
  175. func Mqtt(cfgdata Config) MQTT.Client {
  176. opt := MQTT.NewClientOptions().AddBroker(cfgdata.Host + ":" + strconv.Itoa(cfgdata.Port))
  177. cfgdata.Clientid = fmt.Sprintf("client-%s-%d", conf.GlobalConfig["appname"], time.Now().Unix())
  178. opt.ConnectTimeout = 10 * time.Second
  179. opt.SetClientID(cfgdata.Clientid)
  180. opt.SetDefaultPublishHandler(f)
  181. opt.SetUsername(cfgdata.User)
  182. opt.SetPassword(cfgdata.Pwd)
  183. Mqttserverconn = MQTT.NewClient(opt)
  184. var errcnt = 10
  185. for {
  186. if token := Mqttserverconn.Connect(); token.Wait() && token.Error() != nil {
  187. logger.Logger.Error("MQTT服务器连接失败:" + token.Error().Error())
  188. time.Sleep(1 * time.Second)
  189. errcnt = errcnt - 1
  190. if errcnt == 0 {
  191. panic(token.Error())
  192. }
  193. } else {
  194. break
  195. }
  196. }
  197. logger.Logger.Println("MQTT服务器连接成功...")
  198. if cfgdata.Topic != "" {
  199. topiclist := strings.Split(cfgdata.Topic, ",")
  200. for _, v := range topiclist {
  201. errcnt = 5
  202. for {
  203. if token := Mqttserverconn.Subscribe(v, 0, nil); token.Wait() && token.Error() != nil {
  204. logger.Logger.Println(v + " 主题订阅失败:" + token.Error().Error())
  205. time.Sleep(1 * time.Second)
  206. errcnt = errcnt - 1
  207. if errcnt == 0 {
  208. os.Exit(1)
  209. }
  210. } else {
  211. break
  212. }
  213. }
  214. logger.Logger.Debug("主题" + v + "订阅成功")
  215. }
  216. }
  217. return Mqttserverconn
  218. }
  219. func PublishMessage(topic string, text string) {
  220. if Mqttserverconn == nil {
  221. return
  222. }
  223. logger.Logger.Debug("发布数据" + text + "到主题" + topic)
  224. token := Mqttserverconn.Publish(topic, 0, false, text)
  225. if !token.Wait() {
  226. logger.Logger.Error(token.Error())
  227. }
  228. }