¿Cuál es la diferencia entre implementar Deserializer y Serde en Kafka Consumer API?

2

Intento simular el proyecto kafka-clickstream-enrich kafka-stream de Gwen (Chen) Shapira en GitHub ( https://github.com/onurtokat/kafka-clickstream-enrich ). Cuando consumo un tema usando la clase de consumidor usando deserializadores, encuentro un error. La clase Serde personalizada tiene serializador y deserializador. Pero, trato de entender por qué se usa el serde personalizado para el deserializador, luego la API del consumidor da un error ya que no es una instancia de org.apache.kafka.common.serialization.Deserializer

El tema se puede consumir usando KTable con Serdes.Integer () Serializer y el nuevo ProfileSerde () Deserializer como se muestra a continuación.

KTable<Integer, UserProfile> profiles = builder.table(Constants.USER_PROFILE_TOPIC,
                Consumed.with(Serdes.Integer(), new ProfileSerde()),
                Materialized.as("profile-store"));

Serde personalizado se define como;

static public final class ProfileSerde extends WrapperSerde<UserProfile> {
        public ProfileSerde() {
            super(new JsonSerializer<UserProfile>(), new JsonDeserializer<UserProfile>(UserProfile.class));
        }
    }

Y Serde genérico se personaliza como me gusta a continuación;

package com.onurtokat.serde;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;


public class WrapperSerde<T> implements Serde<T> {

    final private Serializer<T> serializer;
    final private Deserializer<T> deserializer;

    public WrapperSerde(Serializer<T> serializer, Deserializer<T> deserializer) {
        this.serializer = serializer;
        this.deserializer = deserializer;
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        serializer.configure(configs, isKey);
        deserializer.configure(configs, isKey);
    }

    @Override
    public void close() {
        serializer.close();
        deserializer.close();
    }

    @Override
    public Serializer<T> serializer() {
        return serializer;
    }

    @Override
    public Deserializer<T> deserializer() {
        return deserializer;
    }
}

Mi consumidor es tan simple y se puede ver a continuación;

package com.onurtokat.consumers;

import com.onurtokat.ClickstreamEnrichment;
import com.onurtokat.Constants;
import com.onurtokat.model.UserProfile;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumeProfileData {

    public static void main(String[] args) {
        //prepare config
        Properties config = new Properties();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClickstreamEnrichment.ProfileSerde.class);

        KafkaConsumer<Integer, UserProfile> consumerProfileTopic = new KafkaConsumer<>(config);
        consumerProfileTopic.subscribe(Arrays.asList(Constants.USER_PROFILE_TOPIC));
        while (true) {
            ConsumerRecords<Integer, UserProfile> records = consumerProfileTopic.poll(Duration.ofMillis(100));
            for (ConsumerRecord<Integer, UserProfile> record : records) {
                System.out.println(record.key() + " " + record.value());
            }
        }
    }
}

El error cuando intento consumir el tema con mi consumidor es;

log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.consumer.ConsumerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639)
    at com.onurtokat.consumers.ConsumeProfileData.main(ConsumeProfileData.java:25)
Caused by: org.apache.kafka.common.KafkaException: com.onurtokat.ClickstreamEnrichment$ProfileSerde is not an instance of org.apache.kafka.common.serialization.Deserializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:712)
    ... 3 more
10

La diferencia es:

  • Los Serdes son utilizados por la API Streams de Kafka (también conocida como Kafka Streams). Un Serde es un contenedor para un par de (1) serializador y (2) deserializador para el mismo tipo de datos; consulte los dos puntos siguientes. Es decir, a Serde<T>tiene a Serializer<T>y a Deserializer<T>. El primer fragmento de código que publicó (con, por ejemplo, a KTable) es un fragmento de código de Kafka Streams, por eso necesita un Serde. Kafka Streams necesita un Serdeporque produce mensajes (para los que necesita un Serializer) y lee mensajes (para los que necesita un Deserializer).
  • Los deserializadores son utilizados por la API de consumo de Kafka (también conocida como cliente de consumo) para leer mensajes. Su último fragmento de código (con, por ejemplo, a KafkaConsumer) está utilizando el cliente consumidor y, por lo tanto, necesita un Deserializer, no un Serde.
  • La API de productor de Kafka (también conocida como cliente de productor) utiliza serializadores para escribir mensajes.

Con respecto a:

Caused by: org.apache.kafka.common.KafkaException: com.onurtokat.ClickstreamEnrichment$ProfileSerde is not an instance of org.apache.kafka.common.serialization.Deserializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:712)
    ... 3 more

A su código de cliente consumidor de Kafka se le asignó un lugar Serdedonde esperaba un Deserializer.

2
  • 2
    Gracias Micheal G. Noll. Buena explicación. 4 de junio de 2019 a las 12:37
  • 1
    Gran explicación de hecho. 7 de julio a las 20:37
4

Parece que no entendiste:

The topic can be consumed using KTable with Serdes.Integer() Serializer and new ProfileSerde() Deserializer like below.

debe proporcionar Consumed.with()KeySerde y ValueSerde.

Respecto a la Excepción:

está bastante claro: debe configurar una implementación de Deserializer (no Serde )

config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, //here);
2
  • Supongo que debería obtener el deserializador de esta manera ClickstreamEnrichment.ProfileSerde.deserializer ()
    dmkvl
    24 de mayo de 2019 a las 15:34
  • tienes razón. Debo implementar deserializador en lugar de usar serde. Gracias por tu respuesta. 4 de junio de 2019 a las 12:38