Очереди сообщений (2025)
Навроцкий Артем
Очереди сообщений (2025)
Навроцкий Артем
Зачем нужны очереди сообщений?
- Часто какую-то крупную задачу нужно выполнить за один шаг. В БД часто используют транзакции.
- В случае использования нескольких БД нужны распределённые транзакции.
- Согласованность требует согласованного восстановления, что не всегда желательно или хотя бы возможно.
- Очереди позволяют "отложить" какое-то действие, чтобы оно после пришло в согласованное состояние.
Публикация/подписка (push)
Клиент подписывается на события. Брокер отправляет в клиента события по мере поступления.
- Ниже задержка
- Либо брокер подключается к клиенту, либо постоянное подключение клиента к брокеру
Опрос брокера (pull)
Клиент периодически проверяет состояние очереди.
- Высокая задержка
- Потребитель опрашивает брокера
- Потребители сами контролируют нагрузку
- Пакетная обработка
Опрос с ожиданием (pull + long pulling)
Клиент периодически проверяет состояние очереди. Если сообщений нет, то запрос ждёт их какое-то время.
Позволяет получить плюсы pull и push подходов.
Гарантии доставки сообщений
Мир не идеален
- Сеть ненадежна
- Программы ненадежны
- Оборудование ненадежно
Не больше одного раза (at most once)
- Взять сообшение из очереди
- Удалить из очереди
- Отправить сообщение
- Забыть про него
Хотя бы один раз (at least once)
- Взять сообшение из очереди
- Отправить сообщение
- Дождаться подтверждения о получении
- Удалить сообщение
- Если подтверждения не было, перейти к п. 1
Хотя бы один раз (at least once)
На стороне брокера
- Взять сообшение из очереди
- Отправить сообщение
- Дождаться подтверждения о получении
- Удалить сообщение
- Если подтверждения не было, перейти к п. 1
Хотя бы один раз (at least once)
На стороне клиента
- Присвоить сообщениям уникальный монотонно возрастающий номер
- Потребитель запрашивает сообщения с пропущенными номерами
Потребитель должен быть ровно один.
Ровно один раз (exactly once)

Ровно один раз (exactly once)
Задача двух генералов
- Две армии, каждая руководимая своим генералом, готовятся к штурму города.
- Лагеря армий располагаются на двух холмах, разделённых долиной.
- Долина занята противником и любой из посыльных может быть перехвачен.
- Для успешного штурма генералы должны атаковать город одновременно.
- Если атакует одна армия, то она гарантированно уничтожается.
- Надо договорится о времени одновременной атаки.

Ровно один раз (exactly once)
Попытка достижения нужного результата
- Сообщения получают уникальный идентификатор
- Брокер отправляет сообщения пока не получит подтверждение
- Потребитель выполняет дедупликацию
Ограничение: потребитель должен быть один.
Ровно один раз (exactly once)
Удалось гарантировать доставку, но гарантировать обработку ровно один раз все равно невозможно:
- Есть список полученных сообщений
- Берем сообщение
- Выполняем работу
- Удаляем сообщение
Решает проблему идемпотентность выполняемой работы, но она не всегда достижима.
Идемпотентность
- Идемпотентность
- Свойство объекта или операции при повторном применении операции к объекту давать тот же результат, что и при
первом
Один брокер
| Масштабируемость | Доступность | Надежность |
|---|
| Отсутствует | Низкая | Низкая |
Несколько брокеров
| Масштабируемость | Доступность | Надежность |
|---|
| Есть | Высокая | Средняя |
Несколько брокеров + дублирование
| Масштабируемость | Доступность | Надежность |
|---|
| Есть | Высокая | Высокая |
Требуется идемпотентность или дедупликация
Репликация
| Масштабируемость | Доступность | Надежность |
|---|
| Есть | Высокая | Высокая |
Проблему надёжности для схемы с несколькими брокерами можно решить через репликацию.
Загрузка нового видео пользователем
- Проверить видео модератором
- Если проверка пройдена, то:
- Закодировать с разным разрешением
- Обновить поисковые индексы
- Обновить ленты подписчиков

Что может пойти не так?
- Сервис модерации не доступен
- Прошли модерацию, но упал API Gateway
- Прошли модерацию, но недоступен Encoder, Indexer или User Feeds
- Запросы приходят чаще, чем один и больше сервисов способны их обработать
Что делать?
На стороне API Gateway:
- Входящий запрос сохранять в таблицу в виде запросов в сервисы, которые нужно сделать
- После успешного запроса в сервис удалять из базы соответствующую запись
- Периодически повторять запросы, которые не были успешно выполнены
Архитектура с использованием очередей
Архитектура с использованием очередей

- Убрали состояние из API Gateway
- Получили гарантии доставки
- Добавили эластичность
- Уменьшили связанность
- Сократили задержки
- Избежали ситуации когда один сервис может перегрузить другой сервис
- Отказались от повторов в случае ошибок
- Появилась возможность легко горизонтально масштабироваться
Популярные очереди сообщений
Популярные очереди сообщений
On-premises
- RabbitMQ
- Apache Kafka
- NATS
- ZeroMQ
На основе СУБД
Облачные
- Amazon SQS
- Yandex MQ
- CloudAMQP (RabbitMQ)
RabbitMQ
- pub/sub брокер
- Протоколы: AMQP, MQTT, STOMP
- Приоритеты, отложенные задачи
- Сильно деградирует под нагрузкой
- Доставленные сообщения удаляются
RabbitMQ
- Exchange могут соединяться с другими Exchange или с собственно очередями (queue) образуя маршруты (route)
- Очереди могут хранить сообщения только в памяти или обеспечивать их сохранность (durable)
- Producer шлет сообщения в Exchange, пройдя по маршруту сообщение оказывается в очереди
- Потребитель (consumer) подписывается на конкретную очередь и начинает получать сообщения из нее
- Для очереди можно установить максимальный размер или TTL для сообщений
- Очереди могут реплицироваться (Quorum Queues)
RabbitMQ
Типы Exchange
- Direct
- Прямая отправка сообщений в одну или несколько очередей с совпадающим значением ключа
маршрутизации (routing key)
- Topic
- Сообщение отправляется в очереди по значению ключа маршрутизации (routing key),
заданного по шаблону
- Fanout
- Все сообщения отправляются во все очереди независимо от ключа маршрутизации
- Headers
- Маршрутизация по атрибутам, заданным в заголовке сообщения
Масштабирование RabbitMQ
Масштабирование в основном вертикальное, но возможно очереди распределить между узлами

Kafka
- Производительная
- Хорошо горизонтально масштабируется
- Оптимизирована под пакетную обработку
- Гарантии последовательной обработки
- Отсутствие приоритетов сообщений
- Возможность повторного получения сообщений
- Ограниченное количество потребителей
Kafka
- Логический топик делится на партиции
- Разные партиции могут находится на разных серверах
- Для продюсера есть стратегии записи в партиции по кругу (round robin) и по хешу от ключа сообщения, таким
образом если у сообщения есть ключ, то сообщения с одним ключом попадают в одну партицию
- Партиции реплицируются между узлами кластера
- Потребители обьединяются в группы (consumer group), гарантируется, что из одной партиции читает один член
группы
- Разные группы потребителей могут читать одни и те же сообщения
- У каждого члена группы свое смещение (offset) в партиции, которое указывает на последнее полученное
сообщение
Масштабирование Kafka

- Кластер состоит из узлов - брокеров
- Партиция - основа горизонтальной масштабируемости
- Партиции распределяются между брокерами с указанным replication factor
- Партиция физически состоит из файлов-сегментов, которые реплицируются между брокерами
- У каждой партиции есть лидер
- Клиенты (продьюсеры и потребители) получают информацию о том, кто лидер партиции от любого брокера в
кластере
Что выбрать?
Коммуникация между сервисами
- Высокая пропускная способность, масштабируемость
- Потеря сообщений некритична
NATS
Сложный роутинг
А точно нужно?
Во всех остальных случаях
Kafka
Dead letter queue (repair queue)

- Отбросить
- Повторить с номера 6
- Переместить в другую очередь
CQRS
Command and Query Responsibility Segregation
Подход к проектированию системы, который разделяет операции чтения и записи данных на две отдельные модели.
- Queries ‐ методы, которые возвращают результат, не изменяя состояние объекта
- Commands ‐ методы, которые изменяют состояние объекта
CQRS
- Сложное взаимодействие
- Много запросов

CQRS
- Простота
- Производительность
- Масштабирование

Transactional outbox

- Получили сообщение
- Создали во внутренней базе запись со статусом проверки
- Проверка прошла
- Отправили сообщение в следующую очередь
- Удалили запись
Transactional outbox

- Получили сообщение
- Создали во внутренней базе запись со статусом проверки
- Проверка прошла
- В транзакции удалили запись и сделали запись в другой таблице
- Отдельный процесс читает вторую таблицу, отправляет сообщения и удаляет записи (at least once)
Диагностика очередей
Мониторинг очередей
- Количество необработанных сообщений. Нормальное состояние - очередь пустая
- Время
- Полное время обработки сообщения (QoS)
- Время обработки потребителем
- Количество повторов и отказов
- Количество сообщений
Логирование сообщений
В крайнем случае можно будет восстановить данные распарсив логи.
Проектируем ленту новостей
- Пользователи могут размещать посты (текст и медиафайлы)
- Другие пользователи, подписанные на этого пользователя должны увидеть в своей ленте новостей эти посты
- Лента формируется от новых постов к старым
- Когда пользователь размещает новый пост кго подписчики должны получать push-уведомление