123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- package mqtt
- import (
- "fmt"
- "log"
- "github.com/streadway/amqp"
- )
- // Defines our interface for connecting and consuming messages.
- type IMessagingClient interface {
- ConnectToBroker(connectionString string)
- Publish(msg []byte, exchangeName string, exchangeType string) error
- PublishOnQueue(msg []byte, queueName string) error
- Subscribe(exchangeName string, exchangeType string, consumerName string, handlerFunc func(amqp.Delivery)) error
- SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error
- Close()
- }
- // Real implementation, encapsulates a pointer to an amqp.Connection
- type MessagingClient struct {
- conn *amqp.Connection
- }
- func (m *MessagingClient) ConnectToRabbitBroker(connectionString string) {
- if connectionString == "" {
- panic("Cannot initialize connection to broker, connectionString not set. Have you initialized?")
- }
- m = new(MessagingClient)
- var err error
- m.conn, err = amqp.Dial(fmt.Sprintf("%s/", connectionString))
- if err != nil {
- panic("Failed to connect to AMQP compatible broker at: " + connectionString)
- }
- }
- func (m *MessagingClient) Publish(body []byte, exchangeName string, exchangeType string) error {
- if m.conn == nil {
- panic("Tried to send message before connection was initialized. Don't do that.")
- }
- ch, err := m.conn.Channel() // Get a channel from the connection
- defer ch.Close()
- err = ch.ExchangeDeclare(
- exchangeName, // name of the exchange
- exchangeType, // type
- true, // durable
- false, // delete when complete
- false, // internal
- false, // noWait
- nil, // arguments
- )
- failOnError(err, "Failed to register an Exchange")
- queue, err := ch.QueueDeclare( // Declare a queue that will be created if not exists with some args
- "", // our queue name
- false, // durable
- false, // delete when unused
- false, // exclusive
- false, // no-wait
- nil, // arguments
- )
- err = ch.QueueBind(
- queue.Name, // name of the queue
- exchangeName, // bindingKey
- exchangeName, // sourceExchange
- false, // noWait
- nil, // arguments
- )
- err = ch.Publish( // Publishes a message onto the queue.
- exchangeName, // exchange
- exchangeName, // routing key q.Name
- false, // mandatory
- false, // immediate
- amqp.Publishing{
- Body: body, // Our JSON body as []byte
- })
- fmt.Printf("A message was sent: %v", body)
- return err
- }
- func (m *MessagingClient) PublishOnQueue(body []byte, queueName string) error {
- if m.conn == nil {
- panic("Tried to send message before connection was initialized. Don't do that.")
- }
- ch, err := m.conn.Channel() // Get a channel from the connection
- defer ch.Close()
- queue, err := ch.QueueDeclare( // Declare a queue that will be created if not exists with some args
- queueName, // our queue name
- false, // durable
- false, // delete when unused
- false, // exclusive
- false, // no-wait
- nil, // arguments
- )
- // Publishes a message onto the queue.
- err = ch.Publish(
- "", // exchange
- queue.Name, // routing key
- false, // mandatory
- false, // immediate
- amqp.Publishing{
- ContentType: "application/json",
- Body: body, // Our JSON body as []byte
- })
- fmt.Printf("A message was sent to queue %v: %v", queueName, body)
- return err
- }
- func (m *MessagingClient) Subscribe(exchangeName string, exchangeType string, consumerName string, handlerFunc func(amqp.Delivery)) error {
- if m == nil {
- panic("Tried to Subscribe before connection was initialized. Don't do that.")
- }
- ch, err := m.conn.Channel()
- failOnError(err, "Failed to open a channel")
- // defer ch.Close()
- err = ch.ExchangeDeclare(
- exchangeName, // name of the exchange
- exchangeType, // type
- true, // durable
- false, // delete when complete
- false, // internal
- false, // noWait
- nil, // arguments
- )
- failOnError(err, "Failed to register an Exchange")
- log.Printf("declared Exchange, declaring Queue (%s)", "")
- queue, err := ch.QueueDeclare(
- "", // name of the queue
- false, // durable
- false, // delete when usused
- false, // exclusive
- false, // noWait
- nil, // arguments
- )
- failOnError(err, "Failed to register an Queue")
- log.Printf("declared Queue (%d messages, %d consumers), binding to Exchange (key '%s')",
- queue.Messages, queue.Consumers, exchangeName)
- err = ch.QueueBind(
- queue.Name, // name of the queue
- exchangeName, // bindingKey
- exchangeName, // sourceExchange
- false, // noWait
- nil, // arguments
- )
- if err != nil {
- return fmt.Errorf("Queue Bind: %s", err)
- }
- msgs, err := ch.Consume(
- queue.Name, // queue
- consumerName, // consumer
- true, // auto-ack
- false, // exclusive
- false, // no-local
- false, // no-wait
- nil, // args
- )
- failOnError(err, "Failed to register a consumer")
- go consumeLoop(msgs, handlerFunc)
- return nil
- }
- func (m *MessagingClient) SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error {
- if m.conn == nil {
- panic("Tried to Subscribe Tpoic before connection was initialized. Don't do that.")
- }
- ch, err := m.conn.Channel()
- failOnError(err, "Failed to open a channel")
- log.Printf("Declaring Queue (%s)", queueName)
- queue, err := ch.QueueDeclare(
- queueName, // name of the queue
- false, // durable
- false, // delete when usused
- false, // exclusive
- false, // noWait
- nil, // arguments
- )
- failOnError(err, "Failed to register an Queue")
- msgs, err := ch.Consume(
- queue.Name, // queue
- consumerName, // consumer
- true, // auto-ack
- false, // exclusive
- false, // no-local
- false, // no-wait
- nil, // args
- )
- failOnError(err, "Failed to register a consumer")
- go consumeLoop(msgs, handlerFunc)
- return nil
- }
- func (m *MessagingClient) Close() {
- if m.conn != nil {
- m.conn.Close()
- }
- }
- func consumeLoop(deliveries <-chan amqp.Delivery, handlerFunc func(d amqp.Delivery)) {
- for d := range deliveries {
- // Invoke the handlerFunc func we passed as parameter.
- handlerFunc(d)
- }
- }
|