Sobes.tech
Назад к вопросам
Senior
2

Как реализовать паттерн request-reply на Kafka?

Компании, где спрашивали
прайм пейментс

Ответ от нейросети

sobes.tech AI

Паттерн request-reply на Kafka реализуется через использование двух топиков: один для запросов, другой — для ответов.

Основные шаги:

  1. Отправка запроса:

    • Клиент публикует сообщение в топик запросов.
    • В сообщении указывается уникальный идентификатор корреляции (correlationId) и имя топика для ответа (replyTo).
  2. Обработка запроса:

    • Сервис-получатель читает сообщение из топика запросов.
    • Выполняет необходимую обработку.
    • Отправляет ответ в топик, указанный в поле replyTo, сохраняя correlationId.
  3. Получение ответа:

    • Клиент слушает топик ответов.
    • По correlationId сопоставляет ответ с исходным запросом.

Пример кода (упрощённо):

// Отправка запроса
ProducerRecord<String, String> request = new ProducerRecord<>("request-topic", null, requestPayload);
request.headers().add("correlationId", correlationId.getBytes());
request.headers().add("replyTo", "response-topic".getBytes());
producer.send(request);

// Обработка запроса
ConsumerRecord<String, String> record = consumer.poll(...);
String replyTo = new String(record.headers().lastHeader("replyTo").value());
String correlationId = new String(record.headers().lastHeader("correlationId").value());
// ... обработка ...
ProducerRecord<String, String> response = new ProducerRecord<>(replyTo, null, responsePayload);
response.headers().add("correlationId", correlationId.getBytes());
producer.send(response);

// Получение ответа
ConsumerRecord<String, String> responseRecord = responseConsumer.poll(...);
String respCorrelationId = new String(responseRecord.headers().lastHeader("correlationId").value());
// Сопоставление с запросом по respCorrelationId

Важно обеспечить уникальность correlationId и корректно обрабатывать таймауты и повторные попытки.