RabbitMQ — это брокер сообщений, который отлично подходит для систем pub-sub в архитектуре микросервисов. На моей текущей работе мы используем RabbitMQ в нашем конвейере приема данных. Все сервисы написаны на Go, и все они проталкивают данные через сотни очередей RabbitMQ. Давайте посмотрим, как эффективно публиковать и подписываться на Rabbit, при этом GO как можно быстрее.

Краткий обзор кролика

Двумя основными объектами, о которых следует помнить при работе с Rabbit, являются ключи маршрутизации и очереди. Сервис публикует сообщение (в нашем случае JSON) для ключа маршрутизации. Затем RabbitMQ копирует это сообщение в каждую очередь, которая подписана на этот ключ маршрутизации.

Служба подписки (потребитель) может извлекать сообщения из очереди по одному. Стоит отметить, что очередь также может получать сообщения от нескольких ключей маршрутизации, но мы не будем углубляться в это здесь.

Подключение к Go

Во-первых, нет смысла изобретать велосипед. Мы будем использовать пакет amqp, предоставленный streadway, для обработки деталей подключения.

В большинстве наших проектов мы создаем небольшой пакет Rabbit в внутренней папке проекта. Он предоставляет только ту функциональность кролика, которая важна для нашего проекта.

// Conn -
type Conn struct {
	Channel *amqp.Channel
}

// GetConn -
func GetConn(rabbitURL string) (Conn, error) {
	conn, err := amqp.Dial(rabbitURL)
	if err != nil {
		return Conn{}, err
	}

	ch, err := conn.Channel()
	return Conn{
		Channel: ch,
	}, err
}

Структура Conn будет просто поддерживать соединение с Rabbit. Мы также представляем метод для получения нового соединения, используя только URI соединения. Например, amqp://имя пользователя:пароль@localhost

Издательский

Публикация довольно проста и является потокобезопасной. Мы выставляем функцию, которая публикует с помощью соединения, приложение просто предоставляет ключ маршрутизации и полезную нагрузку:

// Publish -
func (conn Conn) Publish(routingKey string, data []byte) error {
	return conn.Channel.Publish(
		// exchange - yours may be different
		"events",
		routingKey,
		// mandatory - we don't care if there I no queue
		false,
		// immediate - we don't care if there is no consumer on the queue
		false,
		amqp.Publishing{
			ContentType:  "application/json",
			Body:         data,
			DeliveryMode: amqp.Persistent,
		})
}

Здесь следует отметить несколько вещей.

Одна из целей этого пакета, который мы создаем для нашего приложения, — установить значения по умолчанию для более мощного пакета AMQP и контролировать, какие функции доступны для нашего приложения. Например, мы знаем, что наше приложение всегда будет использовать обмен "events", и мы знаем, что нам не нужны обязательные или немедленные флаги, установленные для нашего варианта использования.

Потребление

Потребление немного сложнее, чем публикация. Здесь мы используем простой шаблон, в котором приложение предоставляет функцию-обработчик, очередь, ключ маршрутизации, к которому привязана очередь, и сколько горутин должен запускать обработчик одновременно.

// StartConsumer -
func (conn Conn) StartConsumer(
	queueName,
	routingKey string,
	handler func(d amqp.Delivery) bool,
	concurrency int) error {

	// create the queue if it doesn't already exist
	_, err := conn.Channel.QueueDeclare(queueName, true, false, false, false, nil)
	if err != nil {
		return err
	}

	// bind the queue to the routing key
	err = conn.Channel.QueueBind(queueName, routingKey, "events", false, nil)
	if err != nil {
		return err
	}

	// prefetch 4x as many messages as we can handle at once
	prefetchCount := concurrency * 4
	err = conn.Channel.Qos(prefetchCount, 0, false)
	if err != nil {
		return err
	}

	msgs, err := conn.Channel.Consume(
		queueName, // queue
		"",        // consumer
		false,     // auto-ack
		false,     // exclusive
		false,     // no-local
		false,     // no-wait
		nil,       // args
	)
	if err != nil {
		return err
	}

	// create a goroutine for the number of concurrent threads requested
	for i := 0; i < concurrency; i++ {
		fmt.Printf("Processing messages on thread %v...\n", i)
		go func() {
			for msg := range msgs {
				// if tha handler returns true then ACK, else NACK
				// the message back into the rabbit queue for
				// another round of processing
				if handler(msg) {
					msg.Ack(false)
				} else {
					msg.Nack(false, true)
				}
			}
			fmt.Println("Rabbit consumer closed - critical Error")
			os.Exit(1)
		}()
	}
	return nil
}

Примечательные предметы:

Если вам важна скорость, не бойтесь запускать параллелизм не менее 100. Предполагая, что ваш обработчик написан потокобезопасным способом, это хороший способ убедиться, что ваше приложение использует весь доступный ЦП, не ограничиваясь вводом-выводом.

Если обработчик вашего приложения работает очень быстро (возможно, сеть или диск не задействованы), вам может потребоваться изменить множитель предварительной выборки с 4 на более высокий. Счетчик предварительной выборки сообщает соединению Rabbit, сколько сообщений нужно получать с сервера за раз. Чем выше число, тем меньше время ожидания сетевых вызовов для каждого сообщения.

Наши программы эфемерны — мы не возражаем, если они просто перезапускаются время от времени, когда случаются плохие вещи. По этой причине, если потребитель кролика по какой-либо причине не работает, мы используем команду os.Exit(1). Наши журналы подхватывают его, и мы просто перезагружаемся. Если это не работает для вашего варианта использования, вы можете решить это более элегантно.

Тестирование пакета

func main() {
	conn, err := rabbit.GetConn("amqp://guest:guest@localhost")
	if err != nil {
		panic(err)
	}

	go func() {
		for {
			time.Sleep(time.Second)
			conn.Publish("test-key", []byte(`{"message":"test"}`))
		}
	}()

	err = conn.StartConsumer("test-queue", "test-key", handler, 2)

	if err != nil {
		panic(err)
	}

	forever := make(chan bool)
	<-forever
}

func handler(d amqp.Delivery) bool {
	if d.Body == nil {
		fmt.Println("Error, no message body!")
		return false
	}
	fmt.Println(string(d.Body))
	return true
}

Спасибо за чтение

Напишите мне в твиттере @wagslane, если у вас есть какие-либо вопросы или комментарии.

Следите за мной на Dev.to: wagslane

Пост Подключение к RabbitMQ в Golang впервые появился на Qvault.