rabbitmq.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package mqtt
  2. import (
  3. "fmt"
  4. "log"
  5. "github.com/streadway/amqp"
  6. )
  7. // Defines our interface for connecting and consuming messages.
  8. type IMessagingClient interface {
  9. ConnectToBroker(connectionString string)
  10. Publish(msg []byte, exchangeName string, exchangeType string) error
  11. PublishOnQueue(msg []byte, queueName string) error
  12. Subscribe(exchangeName string, exchangeType string, consumerName string, handlerFunc func(amqp.Delivery)) error
  13. SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error
  14. Close()
  15. }
  16. // Real implementation, encapsulates a pointer to an amqp.Connection
  17. type MessagingClient struct {
  18. conn *amqp.Connection
  19. }
  20. func (m *MessagingClient) ConnectToRabbitBroker(connectionString string) {
  21. if connectionString == "" {
  22. panic("Cannot initialize connection to broker, connectionString not set. Have you initialized?")
  23. }
  24. m = new(MessagingClient)
  25. var err error
  26. m.conn, err = amqp.Dial(fmt.Sprintf("%s/", connectionString))
  27. if err != nil {
  28. panic("Failed to connect to AMQP compatible broker at: " + connectionString)
  29. }
  30. }
  31. func (m *MessagingClient) Publish(body []byte, exchangeName string, exchangeType string) error {
  32. if m.conn == nil {
  33. panic("Tried to send message before connection was initialized. Don't do that.")
  34. }
  35. ch, err := m.conn.Channel() // Get a channel from the connection
  36. defer ch.Close()
  37. err = ch.ExchangeDeclare(
  38. exchangeName, // name of the exchange
  39. exchangeType, // type
  40. true, // durable
  41. false, // delete when complete
  42. false, // internal
  43. false, // noWait
  44. nil, // arguments
  45. )
  46. failOnError(err, "Failed to register an Exchange")
  47. queue, err := ch.QueueDeclare( // Declare a queue that will be created if not exists with some args
  48. "", // our queue name
  49. false, // durable
  50. false, // delete when unused
  51. false, // exclusive
  52. false, // no-wait
  53. nil, // arguments
  54. )
  55. err = ch.QueueBind(
  56. queue.Name, // name of the queue
  57. exchangeName, // bindingKey
  58. exchangeName, // sourceExchange
  59. false, // noWait
  60. nil, // arguments
  61. )
  62. err = ch.Publish( // Publishes a message onto the queue.
  63. exchangeName, // exchange
  64. exchangeName, // routing key q.Name
  65. false, // mandatory
  66. false, // immediate
  67. amqp.Publishing{
  68. Body: body, // Our JSON body as []byte
  69. })
  70. fmt.Printf("A message was sent: %v", body)
  71. return err
  72. }
  73. func (m *MessagingClient) PublishOnQueue(body []byte, queueName string) error {
  74. if m.conn == nil {
  75. panic("Tried to send message before connection was initialized. Don't do that.")
  76. }
  77. ch, err := m.conn.Channel() // Get a channel from the connection
  78. defer ch.Close()
  79. queue, err := ch.QueueDeclare( // Declare a queue that will be created if not exists with some args
  80. queueName, // our queue name
  81. false, // durable
  82. false, // delete when unused
  83. false, // exclusive
  84. false, // no-wait
  85. nil, // arguments
  86. )
  87. // Publishes a message onto the queue.
  88. err = ch.Publish(
  89. "", // exchange
  90. queue.Name, // routing key
  91. false, // mandatory
  92. false, // immediate
  93. amqp.Publishing{
  94. ContentType: "application/json",
  95. Body: body, // Our JSON body as []byte
  96. })
  97. fmt.Printf("A message was sent to queue %v: %v", queueName, body)
  98. return err
  99. }
  100. func (m *MessagingClient) Subscribe(exchangeName string, exchangeType string, consumerName string, handlerFunc func(amqp.Delivery)) error {
  101. if m == nil {
  102. panic("Tried to Subscribe before connection was initialized. Don't do that.")
  103. }
  104. ch, err := m.conn.Channel()
  105. failOnError(err, "Failed to open a channel")
  106. // defer ch.Close()
  107. err = ch.ExchangeDeclare(
  108. exchangeName, // name of the exchange
  109. exchangeType, // type
  110. true, // durable
  111. false, // delete when complete
  112. false, // internal
  113. false, // noWait
  114. nil, // arguments
  115. )
  116. failOnError(err, "Failed to register an Exchange")
  117. log.Printf("declared Exchange, declaring Queue (%s)", "")
  118. queue, err := ch.QueueDeclare(
  119. "", // name of the queue
  120. false, // durable
  121. false, // delete when usused
  122. false, // exclusive
  123. false, // noWait
  124. nil, // arguments
  125. )
  126. failOnError(err, "Failed to register an Queue")
  127. log.Printf("declared Queue (%d messages, %d consumers), binding to Exchange (key '%s')",
  128. queue.Messages, queue.Consumers, exchangeName)
  129. err = ch.QueueBind(
  130. queue.Name, // name of the queue
  131. exchangeName, // bindingKey
  132. exchangeName, // sourceExchange
  133. false, // noWait
  134. nil, // arguments
  135. )
  136. if err != nil {
  137. return fmt.Errorf("Queue Bind: %s", err)
  138. }
  139. msgs, err := ch.Consume(
  140. queue.Name, // queue
  141. consumerName, // consumer
  142. true, // auto-ack
  143. false, // exclusive
  144. false, // no-local
  145. false, // no-wait
  146. nil, // args
  147. )
  148. failOnError(err, "Failed to register a consumer")
  149. go consumeLoop(msgs, handlerFunc)
  150. return nil
  151. }
  152. func (m *MessagingClient) SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error {
  153. if m.conn == nil {
  154. panic("Tried to Subscribe Tpoic before connection was initialized. Don't do that.")
  155. }
  156. ch, err := m.conn.Channel()
  157. failOnError(err, "Failed to open a channel")
  158. log.Printf("Declaring Queue (%s)", queueName)
  159. queue, err := ch.QueueDeclare(
  160. queueName, // name of the queue
  161. false, // durable
  162. false, // delete when usused
  163. false, // exclusive
  164. false, // noWait
  165. nil, // arguments
  166. )
  167. failOnError(err, "Failed to register an Queue")
  168. msgs, err := ch.Consume(
  169. queue.Name, // queue
  170. consumerName, // consumer
  171. true, // auto-ack
  172. false, // exclusive
  173. false, // no-local
  174. false, // no-wait
  175. nil, // args
  176. )
  177. failOnError(err, "Failed to register a consumer")
  178. go consumeLoop(msgs, handlerFunc)
  179. return nil
  180. }
  181. func (m *MessagingClient) Close() {
  182. if m.conn != nil {
  183. m.conn.Close()
  184. }
  185. }
  186. func consumeLoop(deliveries <-chan amqp.Delivery, handlerFunc func(d amqp.Delivery)) {
  187. for d := range deliveries {
  188. // Invoke the handlerFunc func we passed as parameter.
  189. handlerFunc(d)
  190. }
  191. }