Comportamiento extraño de Kafka Stream y Consumer Group

0

Tengo dos preguntas de alto nivel divididas en preguntas más individuales, ambas preguntas de alto nivel se refieren a un grupo de consumidores que una API de Apache Kafka Streams está creando y usando.

En primer lugar, está la salida del kafka-consumer-group.shscript. Obtengo un resultado extraño que realmente no me dice dónde se encuentra un consumidor en particular, aunque parece estar conectado a un grupo / tema / partición en particular:

TOPIC    PARTITION    CURRENT-OFFSET    LOG-END-OFFSET    LAG
STANDARD_DATA                  9          11              11              0          myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-1-consumer-4fd9dc15-d8a7-4598-85a9-3761ae6a747b/1.1.1.1                 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-1-consumer
STANDARD_DATA                  0          4               11              7          myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-5-consumer-28e1c7bf-860d-44d6-bf58-5e0ff875587c/1.1.1.1                 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-5-consumer
STANDARD_DATA                  4          -               10              -          myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-4-consumer-a3023af6-eafb-4633-85f1-048c20c4dfb3/1.1.1.1                 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-4-consumer
STANDARD_DATA                  5          -               10              -          myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-3-consumer-a81f1399-1fc4-4579-b24f-fa8fee01fabf/1.1.1.1                 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-3-consumer
STANDARD_DATA                  3          -               12              -          myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-2-consumer-6a83bfcc-2c6e-4e9d-a819-029ac8c6ae17/1.1.1.1                 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-2-consumer
STANDARD_DATA                  8          12              12              0          myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-4-consumer-6d46bed3-70c4-4c7f-8e53-f9591192bc3f/1.1.1.1                 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-4-consumer
STANDARD_DATA                  7          -               11              -          myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-3-consumer-5313315b-ded9-4fe7-ac9d-d8d5b20dd5b9/1.1.1.1                 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-3-consumer
STANDARD_DATA                  2          10              10              0          myConsumer-b9402faf-4b37-479f-82be-a17eaa180c62-StreamThread-1-consumer-c08a648f-548e-47a8-8bc5-7b6fa3bc1fb5/1.1.1.1                  myConsumer-b9402faf-4b37-479f-82be-a17eaa180c62-StreamThread-1-consumer
STANDARD_DATA                  1          2               10              8          myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-2-consumer-08d99679-d430-4e9f-a3b9-11e558ca34a4/1.1.1.1                 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-2-consumer
STANDARD_DATA                  6          -               12              -          myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-5-consumer-666040f8-d4d0-49e9-9db6-c6efee49ebe1/1.1.1.1                 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-5-consumer
  1. ¿Por qué algunos CURRENT-OFFSETS (tercera columna) y LAG (cuarta columna) aparecen como '-' cuando puedo consultar la API de Kafka directamente para distinguir que de hecho están al día?

(consultado a través de la API de golang)

4                      myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-4-consumer-a3023af6-eafb-4633-85f1-048c20c4dfb3    OFFSET: 10        LOG-END: 10                LAG: 0
  1. Además, ¿por qué ese desplazamiento no se muestra como se representa en los registros (también conocido como, debería ponerse al día) en general?

Mi segunda pregunta de alto nivel es la de las corrientes. Tenemos un proceso de flujo en funcionamiento que, en momentos aleatorios (principalmente durante un reinicio), se restablece al primer desplazamiento disponible en temas particulares. A lo largo del código no hay 'reinicio' y el OFFSET_RESET no se toca. También puedo confirmar que no estamos usando el 'exactamente una vez', por lo que no estoy seguro de dónde entran en juego exactamente estos restablecimientos de compensación.

Una vez más, es básicamente:

El proceso de flujo se agita a través de los datos, algo ~ sucede ~ y luego nuestras compensaciones vuelven a la base 0, procesándose de nuevo. Esto puede estar sucediendo durante días o semanas antes de que decida reiniciarse también, por lo que la confirmación de compensaciones ESTÁ sucediendo.

2

Acerca de la salida de kafka-consumer-groups.sh: A -en CURRENT-OFFSET indica que no hay ningún desplazamiento comprometido para esta partición. Esto implica que el retraso tampoco se puede calcular (por lo tanto, también se obtiene un valor -allí).

Si leo su declaración correctamente, si consulta las compensaciones con golang, muestra que la partición 4 está en la compensación 10, en contraste con lo que kafka-consumer-groups.shmuestra, no estoy seguro de por qué este es el caso ...

Acerca de las compensaciones reiniciadas: es posible que deba aumentar la configuración del corredor offsets.retention.minutes; el valor predeterminado es 24 horas (consulte https://docs.confluent.io/current/streams/faq.html#why-is-my-application-re-processing- datos desde el principio ).

También tenga en cuenta que la API de Streams usa la política de restablecimiento predeterminada "más temprano" (en contraste con la API de consumidor que usa "más reciente" como predeterminada). Puede cambiar la política de restablecimiento en la API de Streams a través de StreamsConfig: https://docs.confluent.io/current/streams/developer-guide.html#non-streams-configuration-parameters

4
  • esto tiene un sentido absolutamente perfecto. Voy a ajustar estas configuraciones y ver si resuelve el problema. ¿Existe normalmente una buena práctica en torno a los minutos de compensación, retención? Nuestras aplicaciones tienen un período de inactividad, por lo que con la corrección de la política de reinicio, no estoy seguro de si realmente necesitaríamos aumentarla. ¿pensamientos?
    jbkc85
    2 oct 2017 a las 21:37
  • perdón - pregunta: si un consumidor está sentado escuchando un tema de un grupo de consumidores y no llegan mensajes hasta después del período de offsets.retention.minutes, ¿eso seguirá restableciendo al consumidor una vez que llegue un nuevo mensaje? Por ejemplo, si las compensaciones se atascan en 55 durante 30 horas, y luego aparece el número 56 ... si el consumidor es parte del grupo de consumidores, ¿se vuelve a conectar y consume automáticamente en la compensación 57 (porque se restablece a la compensación más reciente?)
    jbkc85
    2 oct 2017 a las 21:42
  • 1
    Si pierde el desplazamiento con auto.offset.reset = latest, teóricamente podría suceder que omita algunos registros y no los procese. Por lo tanto, se recomienda aumentar el tiempo de retención de compensación incluso si usa "más reciente". 3 oct 2017 a las 3:23
  • eso es lo que estaba pensando Matthias J. Sax. Solo quería aclarar para asegurarme de que no estoy tratando de hackear las cosas =).
    jbkc85
    3 oct 2017 a las 20:28