Sobes.tech
Назад к вопросам
Senior
8

Как бы вы реализовали веб-сервис, который синхронно отправляет сообщение в Kafka и ждёт ответа из другого топика?

Компании, где спрашивали
Позитив Технолоджис

Ответ от нейросети

sobes.tech AI

Для реализации веб-сервиса, который синхронно отправляет сообщение в Kafka и ждёт ответа из другого топика, можно использовать следующий подход:

  1. Отправка сообщения с уникальным корреляционным идентификатором (correlation ID). Это позволит связать запрос и ответ.

  2. Подписка на топик с ответами с фильтрацией по correlation ID.

  3. Использование механизма ожидания с таймаутом (например, CompletableFuture или CountDownLatch), чтобы блокировать поток до получения ответа или истечения времени ожидания.

Пример на Java с использованием Kafka Consumer и Producer:

public String sendAndReceive(String requestTopic, String responseTopic, String message, String correlationId) throws Exception {
    // Отправка сообщения с correlationId в заголовках
    ProducerRecord<String, String> record = new ProducerRecord<>(requestTopic, message);
    record.headers().add("correlationId", correlationId.getBytes(StandardCharsets.UTF_8));
    producer.send(record);

    // Создаём объект для ожидания ответа
    CompletableFuture<String> futureResponse = new CompletableFuture<>();

    // В consumer'е при получении сообщения из responseTopic проверяем correlationId
    consumer.subscribe(Collections.singletonList(responseTopic));
    new Thread(() -> {
        while (!futureResponse.isDone()) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> rec : records) {
                String respCorrelationId = new String(rec.headers().lastHeader("correlationId").value(), StandardCharsets.UTF_8);
                if (correlationId.equals(respCorrelationId)) {
                    futureResponse.complete(rec.value());
                    return;
                }
            }
        }
    }).start();

    // Ожидаем ответ с таймаутом
    return futureResponse.get(10, TimeUnit.SECONDS);
}

Такой подход обеспечивает синхронное поведение поверх асинхронной системы Kafka, позволяя сервису ждать конкретный ответ, связанный с отправленным сообщением.