Назад к вопросам
Senior
2
Как реализовать паттерн request-reply на Kafka?
Компании, где спрашивали
прайм пейментс
Ответ от нейросети
sobes.tech AI
Паттерн request-reply на Kafka реализуется через использование двух топиков: один для запросов, другой — для ответов.
Основные шаги:
-
Отправка запроса:
- Клиент публикует сообщение в топик запросов.
- В сообщении указывается уникальный идентификатор корреляции (correlationId) и имя топика для ответа (replyTo).
-
Обработка запроса:
- Сервис-получатель читает сообщение из топика запросов.
- Выполняет необходимую обработку.
- Отправляет ответ в топик, указанный в поле replyTo, сохраняя correlationId.
-
Получение ответа:
- Клиент слушает топик ответов.
- По correlationId сопоставляет ответ с исходным запросом.
Пример кода (упрощённо):
// Отправка запроса
ProducerRecord<String, String> request = new ProducerRecord<>("request-topic", null, requestPayload);
request.headers().add("correlationId", correlationId.getBytes());
request.headers().add("replyTo", "response-topic".getBytes());
producer.send(request);
// Обработка запроса
ConsumerRecord<String, String> record = consumer.poll(...);
String replyTo = new String(record.headers().lastHeader("replyTo").value());
String correlationId = new String(record.headers().lastHeader("correlationId").value());
// ... обработка ...
ProducerRecord<String, String> response = new ProducerRecord<>(replyTo, null, responsePayload);
response.headers().add("correlationId", correlationId.getBytes());
producer.send(response);
// Получение ответа
ConsumerRecord<String, String> responseRecord = responseConsumer.poll(...);
String respCorrelationId = new String(responseRecord.headers().lastHeader("correlationId").value());
// Сопоставление с запросом по respCorrelationId
Важно обеспечить уникальность correlationId и корректно обрабатывать таймауты и повторные попытки.