¿Qué determina la compensación del consumidor de Kafka?

209

Soy relativamente nuevo en Kafka. He experimentado un poco con él, pero algunas cosas no me quedan claras con respecto a la compensación del consumidor. Por lo que he entendido hasta ahora, cuando un consumidor comienza, la compensación desde la que comenzará a leer está determinada por la configuración auto.offset.reset(corríjame si me equivoco).

Ahora digamos, por ejemplo, que hay 10 mensajes (compensaciones de 0 a 9) en el tema, y ​​un consumidor consumió 5 de ellos antes de que cayera (o antes de que yo matara al consumidor). Luego diga que reinicio ese proceso de consumidor. Mis preguntas son:

  1. Si auto.offset.resetse establece en earliest, ¿siempre comenzará a consumir desde el desplazamiento 0?

  2. Si auto.offset.resetse establece en latest, ¿comenzará a consumir desde el desplazamiento 5?

  3. ¿El comportamiento con respecto a este tipo de escenarios es siempre determinista?

No dude en comentar si algo en mi pregunta no está claro.

315

Es un poco más complejo de lo que describiste.
La auto.offset.resetconfiguración se activa SOLO si su grupo de consumidores no tiene un desplazamiento válido comprometido en algún lugar (2 almacenamientos de desplazamiento admitidos ahora son Kafka y Zookeeper), y también depende del tipo de consumidor que utilice.

Si utiliza un consumidor de Java de alto nivel, imagine los siguientes escenarios:

  1. Tiene un consumidor en un grupo de consumidores group1que consumió 5 mensajes y murió. La próxima vez que inicie este consumidor, ni siquiera usará esa auto.offset.resetconfiguración y continuará desde el lugar donde murió porque solo obtendrá el desplazamiento almacenado del almacenamiento de desplazamiento (Kafka o ZK como mencioné).

  2. Tiene mensajes en un tema (como lo describió) y comienza un consumidor en un nuevo grupo de consumidores group2. No hay ningún desplazamiento almacenado en ningún lugar y esta vez la auto.offset.resetconfiguración decidirá si comenzar desde el principio del tema ( earliest) o desde el final del tema ( latest)

Una cosa más que afecta a qué valor de compensación corresponderá earliesty qué latestconfiguraciones es la política de retención de registros. Imagina que tienes un tema con retención configurado en 1 hora. Produce 5 mensajes y luego, una hora más tarde, publica 5 mensajes más. El latestdesplazamiento seguirá siendo el mismo que en el ejemplo anterior, pero earliestno podrá serlo 0porque Kafka ya eliminará estos mensajes y, por lo tanto, será el primer desplazamiento disponible 5.

Todo lo mencionado anteriormente no está relacionado SimpleConsumery cada vez que lo ejecute, decidirá por dónde empezar a usar la auto.offset.resetconfiguración.

Si utiliza la versión Kafka mayores de 0,9, usted tiene que reemplazar earliest, latestcon smallest, largest.

7
  • 4
    Muchas gracias por la respuesta. Entonces, en cuanto al consumidor de alto nivel, una vez que un consumidor tiene algo comprometido (ya sea en ZK o Kafka), auto.offset.reset¿no tiene ninguna importancia a partir de entonces? ¿El único significado de esa configuración es cuando no hay nada comprometido (e idealmente eso sería en la primera puesta en marcha del consumidor)? Asif Iqbal 4 de septiembre de 2015 a las 15:48
  • 3
    Exactamente como lo describisteserejja 5 de septiembre de 2015 a las 7:09
  • 1
    @serejja Hola, ¿qué tal si siempre tengo 1 consumidor por grupo y el escenario n. ° 1 de tu respuesta ocurre para mí? ¿Será lo mismo? ha9u63ar 5 de julio de 2017 a las 21:55
  • 1
    @ ha9u63ar no entendió bien tu pregunta. Si reinicia su consumidor en el mismo grupo, entonces sí, no usará auto.offset.resety continuará desde el desplazamiento comprometido. Si siempre usa un grupo de consumidores diferente (como generarlo al iniciar el consumidor), entonces el consumidor siempre respetaráauto.offset.resetserejja 6 de julio de 2017 a las 12:59
  • @serejja sí y eso no me funciona. ¿Podría echar un vistazo a esto ? Este es mi problemaha9u63ar 6/07/17 a las 13:24
93

Solo una actualización: desde Kafka 0.9 en adelante, Kafka está usando una nueva versión Java del consumidor y los nombres de los parámetros auto.offset.reset han cambiado; Del manual:

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

earliest: automatically reset the offset to the earliest offset

latest: automatically reset the offset to the latest offset

none: throw exception to the consumer if no previous offset is found for the consumer's group

anything else: throw exception to the consumer.

Pasé un tiempo para encontrar esto después de verificar la respuesta aceptada, por lo que pensé que podría ser útil para la comunidad publicarlo.

3
  • La respuesta aceptada está escrita en términos de los nuevos nombres; esta respuesta no proporciona nada único, ¿verdad? (Si no tenía 90 votos a favor al momento de escribir este artículo, sugiero que lo elimine;))Ruben Bartelink 14 de enero a las 13:09
  • Sorprendentemente, mucha gente lo encontró útil. Israel Zinc 14 de enero a las 14:38
  • Estoy de acuerdo en que una respuesta no obtiene tantos votos a favor por accidente. Pero el punto con respecto a la respuesta original ya no es AFAICT, así que no puedo pensar en una razón por la que lo votaría ahora. (También había visto esa parte específica del manual antes de aterrizar aquí también). Aparte: esta respuesta también es bastante útil en este espacioRuben Bartelink 14 de enero a las 19:24
14

Además, hay compensaciones.retención.minutos. Si el tiempo transcurrido desde la última confirmación es> offsets.retention.minutes, auto.offset.resettambién se activa

3
  • 2
    ¿No parece esto redundante con la retención de registros? ¿Debería basarse la retención de ofset en la retención de registros? mike01010 1 de febrero de 2018 a las 2:53
  • @ mike01010 eso es correcto. Debe basarse en la retención de registros, esa es una de las soluciones propuestas en el ticket. Prolong default value of offsets.retention.minutes to be at least twice larger than log.retention.hours. issues.apache.org/jira/browse/KAFKA-3806saheb 26/03/18 a las 14:35
  • 2
    Esa respuesta me asustó por un tiempo, hasta que verifico la documentación de offsets.retention.minutes: <b> Después de que un grupo de consumidores pierde todos sus consumidores (es decir, se vacía), sus compensaciones se mantendrán durante este período de retención antes de descartarse. </b> consumidores (usando la asignación manual), las compensaciones expirarán después de la hora de la última confirmación más este período de retención. (Esto es para Kafka 2.3)jumping_monkey 21/10/19 a las 7:48