mqttadmin.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. package mqtt
  2. import (
  3. "scd_check_tools/conf"
  4. "scd_check_tools/logger"
  5. "scd_check_tools/tools"
  6. "encoding/json"
  7. "fmt"
  8. "io/ioutil"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "time"
  13. //import the Paho Go MQTT library
  14. MQTT "github.com/eclipse/paho.mqtt.golang"
  15. "github.com/streadway/amqp"
  16. //"golang.org/x/net/proxy"
  17. )
  18. //数据消息通道
  19. var MqttMessageChan = make(chan string, 1000)
  20. var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
  21. mqttDeal := new(ReceiveDataMessage)
  22. mqttDeal.Receive(msg.Topic(), string(msg.Payload()))
  23. }
  24. //接收rabbitmq的消息
  25. //接收到消息后通过messageChan通道传递给数据库处理过程
  26. var frabbit = func(msg amqp.Delivery) {
  27. msgStr := fmt.Sprintf("%s|%s", msg.RoutingKey, string(msg.Body))
  28. fmt.Println(tools.NowTime() + " ==== received data:" + msgStr)
  29. MqttMessageChan <- msgStr
  30. }
  31. var (
  32. url string
  33. topic string
  34. cnffile string
  35. )
  36. //连接RabbitMQ时,topic属性值为router key
  37. //仅RabbitMQ需要指定exchangename属性
  38. type Config struct {
  39. Mqtt string `json:"mqttname"`
  40. Host string `json:"host"`
  41. Port int `json:"port,string"`
  42. User string `json:"user"`
  43. Pwd string `json:"pwd"`
  44. Topic string `json:"topic"`
  45. ExchangeName string `json:"exchangename"`
  46. Clientid string `json:"clientid"`
  47. ExtShell_webtest string `json:"extshell_webtest"`
  48. }
  49. var rabbitMqClient *MessagingClient
  50. var mqttserverconn MQTT.Client
  51. func failOnError(err error, msg string) {
  52. if err != nil {
  53. fmt.Println("%s: %s", msg, err)
  54. os.Exit(1)
  55. }
  56. }
  57. func Start() {
  58. cnffile = "conf/mqtt.cnf"
  59. fmt.Println(tools.NowTime()+" 加载MQTT配置文件", cnffile)
  60. fileHanlder, err := os.Open(cnffile)
  61. if err != nil {
  62. fmt.Println(tools.NowTime()+" 配置文件mqtt.cnf:", err)
  63. return
  64. }
  65. txt, _ := ioutil.ReadAll(fileHanlder)
  66. fileHanlder.Close()
  67. txtStr := string(txt)
  68. if txtStr == "" {
  69. fmt.Println(tools.NowTime() + " 配置文件mqtt.cnf配置不正确,请正确配置")
  70. return
  71. }
  72. cfgdata := Config{}
  73. err = json.Unmarshal(txt, &cfgdata)
  74. if err != nil {
  75. fmt.Println(tools.NowTime()+" MQTT配置文件解析失败:", err)
  76. return
  77. }
  78. if cfgdata.Topic == "" {
  79. fmt.Println(tools.NowTime() + " 注意:当前未订阅任何主题,是否需要修复?")
  80. }
  81. if cfgdata.Mqtt == "" {
  82. cfgdata.Mqtt = "MQTT"
  83. }
  84. fmt.Println(tools.NowTime() + " 正在连接" + cfgdata.Mqtt + "服务器" + cfgdata.Host + ":" + strconv.Itoa(cfgdata.Port))
  85. if cfgdata.Mqtt == "rabbitmq" {
  86. RabbitMQ(cfgdata)
  87. } else {
  88. Mqtt(cfgdata)
  89. }
  90. //go PublishMessage()
  91. /*
  92. go func() {
  93. //向通道模拟写入数据
  94. for i := 0; i < 10; i++ {
  95. msg := "msg id:" + strconv.Itoa(i)
  96. messageChan <- msg
  97. fmt.Println(NowTime() + " " + strconv.Itoa(len(messageChan)))
  98. time.Sleep(1 * time.Second)
  99. }
  100. }()
  101. */
  102. }
  103. func RabbitMQ(cfgdata Config) {
  104. if cfgdata.ExchangeName == "" {
  105. fmt.Println("exchangename属性未配置")
  106. os.Exit(1)
  107. }
  108. //rabbitMqClient = new(MessagingClient)
  109. connstr := fmt.Sprintf("amqp://%s:%s@%s:%s", cfgdata.User, cfgdata.Pwd, cfgdata.Host, strconv.Itoa(cfgdata.Port))
  110. rbconn, err := amqp.Dial(fmt.Sprintf("%s/", connstr))
  111. failOnError(err, "rabbitMQ["+connstr+"] 连接失败")
  112. //rabbitMqClient.ConnectToRabbitBroker(connstr)
  113. RabbitMQSubscribe(rbconn, cfgdata.ExchangeName, "topic", cfgdata.Topic, "", frabbit)
  114. }
  115. func RabbitMQSubscribe(mconn *amqp.Connection, exchangeName string, exchangeType string, routerkey string, consumerName string, handlerFunc func(amqp.Delivery)) error {
  116. if mconn == nil {
  117. panic("Tried to Subscribe before connection was initialized. Don't do that.")
  118. }
  119. ch, err := mconn.Channel()
  120. failOnError(err, "Failed to open a channel")
  121. // defer ch.Close()
  122. err = ch.ExchangeDeclare(
  123. exchangeName, // name of the exchange
  124. exchangeType, // type
  125. true, // durable
  126. false, // delete when complete
  127. false, // internal
  128. false, // noWait
  129. nil, // arguments
  130. )
  131. failOnError(err, "Failed to register an Exchange")
  132. fmt.Printf("declared Exchange, declaring Queue (%s)", "")
  133. queue, err := ch.QueueDeclare(
  134. "", // name of the queue
  135. false, // durable
  136. false, // delete when usused
  137. false, // exclusive
  138. false, // noWait
  139. nil, // arguments
  140. )
  141. failOnError(err, "Failed to register an Queue")
  142. fmt.Printf("declared Queue (%d messages, %d consumers), binding to Exchange (key '%s')",
  143. queue.Messages, queue.Consumers, exchangeName)
  144. for _, v := range strings.Split(routerkey, ",") {
  145. err = ch.QueueBind(
  146. queue.Name, // name of the queue
  147. v, // bindingKey
  148. exchangeName, // sourceExchange
  149. false, // noWait
  150. nil, // arguments
  151. )
  152. if err != nil {
  153. return fmt.Errorf("Queue Bind: %s", err)
  154. }
  155. }
  156. msgs, err := ch.Consume(
  157. queue.Name, // queue
  158. consumerName, // consumer
  159. true, // auto-ack
  160. false, // exclusive
  161. false, // no-local
  162. false, // no-wait
  163. nil, // args
  164. )
  165. failOnError(err, "Failed to register a consumer")
  166. go consumeLoop2(msgs, handlerFunc)
  167. return nil
  168. }
  169. func consumeLoop2(deliveries <-chan amqp.Delivery, handlerFunc func(d amqp.Delivery)) {
  170. for d := range deliveries {
  171. // Invoke the handlerFunc func we passed as parameter.
  172. handlerFunc(d)
  173. }
  174. }
  175. func Mqtt(cfgdata Config) MQTT.Client {
  176. opt := MQTT.NewClientOptions().AddBroker(cfgdata.Host + ":" + strconv.Itoa(cfgdata.Port))
  177. if cfgdata.Clientid == "" {
  178. cfgdata.Clientid = "mqtt-receive-" + conf.GlobalConfig["appname"]
  179. }
  180. opt.ConnectTimeout = 10 * time.Second
  181. opt.SetClientID(cfgdata.Clientid)
  182. opt.SetDefaultPublishHandler(f)
  183. opt.SetUsername(cfgdata.User)
  184. opt.SetPassword(cfgdata.Pwd)
  185. mqttserverconn = MQTT.NewClient(opt)
  186. for {
  187. if token := mqttserverconn.Connect(); token.Wait() && token.Error() != nil {
  188. fmt.Println(tools.NowTime() + " MQTT服务器连接失败:" + token.Error().Error())
  189. time.Sleep(5 * time.Second)
  190. } else {
  191. break
  192. }
  193. }
  194. fmt.Println(tools.NowTime() + " MQTT服务器连接成功...")
  195. if cfgdata.Topic != "" {
  196. topiclist := strings.Split(cfgdata.Topic, ",")
  197. for _, v := range topiclist {
  198. errcnt := 5
  199. for {
  200. if token := mqttserverconn.Subscribe(v, 0, nil); token.Wait() && token.Error() != nil {
  201. fmt.Println(tools.NowTime() + v + " 主题订阅失败:" + token.Error().Error())
  202. time.Sleep(5 * time.Second)
  203. errcnt = errcnt - 1
  204. if errcnt == 0 {
  205. break
  206. }
  207. } else {
  208. fmt.Println(tools.NowTime() + " 主题" + v + "订阅成功")
  209. break
  210. }
  211. }
  212. }
  213. }
  214. return mqttserverconn
  215. }
  216. func PublishMessage(topic string, text string) {
  217. if mqttserverconn == nil {
  218. return
  219. }
  220. logger.Logger.Debug(" 发布数据" + text + "到主题" + topic)
  221. token := mqttserverconn.Publish(topic, 0, false, text)
  222. token.Wait()
  223. }