Estoy usando KafkaConsumer para consumir mensajes del servidor Kafka (temas).
- Funciona bien para temas creados antes de iniciar el código del consumidor ...
Pero el problema es que no funcionará si los temas se crearon dinámicamente (quiero decir después de que se inició el código del consumidor), pero la API dice que admitirá la creación dinámica de temas. Aquí está el enlace para su referencia.
Versión de Kafka utilizada: 0.9.0.1
https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
Aquí está el código JAVA ...
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Pattern r = Pattern.compile("siddu(\\d)*");
consumer.subscribe(r, new HandleRebalance());
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(partition.partition() + ": " +record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
NOTA: Los nombres de mis temas coinciden con la expresión regular ... Y si reinicio el consumidor, comenzará a leer los mensajes enviados al tema ...
Cualquier ayuda es muy apreciada...