consumidor de kafka para detectar dinámicamente los temas agregados

6

Estoy usando KafkaConsumer para consumir mensajes del servidor Kafka (temas).

  • Funciona bien para temas creados antes de iniciar el código del consumidor ...

Pero el problema es que no funcionará si los temas se crearon dinámicamente (quiero decir después de que se inició el código del consumidor), pero la API dice que admitirá la creación dinámica de temas. Aquí está el enlace para su referencia.

Versión de Kafka utilizada: 0.9.0.1

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Aquí está el código JAVA ...

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Pattern r = Pattern.compile("siddu(\\d)*");

    consumer.subscribe(r, new HandleRebalance());
    try {
         while(true) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(partition.partition()  + ": "  +record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }

NOTA: Los nombres de mis temas coinciden con la expresión regular ... Y si reinicio el consumidor, comenzará a leer los mensajes enviados al tema ...

Cualquier ayuda es muy apreciada...

15

Hubo una respuesta a esto en los archivos de correo de apache kafka. Lo estoy copiando a continuación:

The consumer supports a configuration option "metadata.max.age.ms" which basically controls how often topic metadata is fetched. By default, this is set fairly high (5 minutes), which means it will take up to 5 minutes to discover new topics matching your regular expression. You can set this lower to discover topics quicker.

Entonces, en tus accesorios puedes:

props.put("metadata.max.age.ms", 5000);

Esto hará que su consumidor descubra nuevos temas cada 5 segundos.

2
  • 1
    también depende de cómo establezca la propiedad del consumidor "auto.offset.reset". si es "más reciente", seleccionará los mensajes más recientes / [no consumidos antes] de temas conocidos (después del inicio del consumidor), pero no los temas dinámicos. si lo configura como "más temprano" y también coloca consumer.seekToBeginning (consumer.assignment ()); antes de la encuesta: hágalo solo una vez, luego reconocerá temas dinámicos / nuevos, pero también obtendrá todos los registros desde el principio cada vezSasha Bond 31 oct 2018 a las 19:04
  • ¿Podemos forzar la solicitud de recuperación de metadatos de alguna manera? por ejemplo, consumer.fetchMeta () o algo así? andrii 20/07/20 a las 17:10
4

Puedes conectarte a Zookeeper. Mira el código de muestra . En esencia, creará un observador en el nodo Zookeeper /brokers/topics. Cuando se agregan nuevos niños aquí, se agrega un nuevo tema y su observador se activará.

Tenga en cuenta que la diferencia entre esta y la otra respuesta es que esta es un disparador donde la otra es una encuesta: esta será lo más cerca posible del tiempo real, la otra estará dentro del intervalo de sondeo en el mejor de los casos. .

4
  • Gracias por tu respuesta y ayuda ... básicamente quería usar la api de KafkaConsumer para lograr esto y lo resolví yo mismo ..siddu 26/03/2016 a las 19:47
  • @madlad vea mi respuesta a continuación. bhspencer 25 de agosto de 2016 a las 17:15
  • El enlace 'el código de muestra' no es válido, también la pregunta era sobre el consumo de mensajes, y no solo sobre el nuevo tema ... el nuevo tema estará disponible en consumr.listTopics (). keySet () de todos modosSasha Bond 31 oct 2018 a las 18:47
  • Enlace fijo: también se agregó una línea sobre la diferencia entre los dos enfoques. David Griffin 1/11/18 a las 17:24
2

Aquí está la solución que funcionó para mí usando la api KafkaConsumer. Aquí está el código Java para ello.

private static Consumer<Long, String> createConsumer(String topic) {
    final Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG,
            "KafkaExampleConsumer");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    // Create the consumer using props.
    final Consumer<Long, String> consumer =
            new KafkaConsumer<>(props);
    // Subscribe to the topic.
    consumer.subscribe(Collections.singletonList(topic));
    return consumer;
}

public static void runConsumer(String topic) throws InterruptedException {
    final Consumer<Long, String> consumer = createConsumer(topic);

    ConsumerRecords<Long, String> records = consumer.poll(100);
    for (ConsumerRecord<Long, String> record : records)
        System.out.printf("hiiiii offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    consumer.commitAsync();
    consumer.close();
    //System.out.println("DONE");
}

usando esto podemos consumir el mensaje de temas creados dinámicamente.