viernes, 13 de agosto de 2021

Consumiendo mensajes desde Apache Kafka con Spring Boot


Vamos a hacer un ejemplo de spring boot que consuma un mensaje desde Kafka. Primero tenemos que bajar nuestro proyecto de spring initializr en esta pagina podemos agregar spring-kafka o agregarlo luego  : 

implementation 'org.springframework.kafka:spring-kafka'

Usando Gradle o Maven. 

Antes vamos a hacer un mensaje de ejemplo, que va ser el mensaje a escribir : 


public class Message {

    private long id;

    private String message;

    private LocalDateTime time;


    public Message() {}


    public Message(long id, String message, LocalDateTime time) {

        this.id = id;

        this.message = message;

        this.time = time;

    }


    public Message(String message, LocalDateTime time) {

        this(new Random().nextLong(), message, time);

    }


    public Message(String message) {

        this(message,LocalDateTime.now());

    }


    public long getId() {

        return id;

    }


    public void setId(long id) {

        this.id = id;

    }


    public String getMessage() {

        return message;

    }


    public void setMessage(String message) {

        this.message = message;

    }


    public LocalDateTime getTime() {

        return time;

    }


    public void setTime(LocalDateTime time) {

        this.time = time;

    }


    @Override

    public boolean equals(Object o) {

        if (this == o) return true;

        if (o == null || getClass() != o.getClass()) return false;

        Message message1 = (Message) o;

        return Objects.equals(message, message1.message) &&

                Objects.equals(time, message1.time);

    }


    @Override

    public int hashCode() {

        return Objects.hash(message, time);

    }


    @Override

    public String toString() {

        return "Message{" +

                "message='" + message + '\'' +

                ", time=" + time +

                '}';

    }

}

Tenemos que escribir 2 clases de configuración, una para indicar como nos conectar a Kafka y como esta serializado el objeto. 


@EnableKafka

@Configuration

public class KafkaConsumerConfig {


    @Value(value = "${kafka.bootstrapAddress}")

    private String bootstrapAddress;


    @Bean

    public ConsumerFactory<String, Message> consumerFactory() {

        var props = new HashMap<String, Object>();

        props.put(

                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,

                bootstrapAddress);

        props.put(

                ConsumerConfig.GROUP_ID_CONFIG,

                "demo");

        props.put(

                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

                StringDeserializer.class);

        props.put(

                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

                JsonDeserializer.class);

        props.put(JsonDeserializer.TRUSTED_PACKAGES,

                "com.assembly.demo.model");

        return new DefaultKafkaConsumerFactory<>(props);

    }


    @Bean

    public ConcurrentKafkaListenerContainerFactory<String, Message>

    kafkaListenerContainerFactory() {

        var factory = new ConcurrentKafkaListenerContainerFactory<String, Message>();

        factory.setConsumerFactory(this.consumerFactory());

        return factory;

    }

}

Y Otra para confugurar el topic : 


@Configuration

public class KafkaTopicConfig {

    @Value(value = "${kafka.bootstrapAddress}")

    private String bootstrapAddress;

    @Bean

    public KafkaAdmin kafkaAdmin() {

        var configs = new HashMap<String, Object>();

        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);

        return new KafkaAdmin(configs);

    }

    @Bean

    public NewTopic topicDemo() {

        return new NewTopic("demo", 1, (short) 1);

    }

}


Y ahora vamos a hacer un servicio que consuma el mensaje y lo escriba en pantalla : 


@Service

public class ConsumerService {

    @KafkaListener(topics = "${kafka.topicName}", groupId = "demo")

    public void listenGroupDemo(Message message) {

        System.out.println("Received Message in group demo : " + message);

    }

}

Y Listo!!

Dejo link del repo : https://github.com/emanuelpeg/springBoot-Kafka-example

2 comentarios:

  1. Hola ema, muy bueno el tutorial, aporto un fix en la configuración del consumer:

    @Bean
    public ConsumerFactory consumerFactory() {

    JsonDeserializer deserializer = new JsonDeserializer<>(Message.class);
    deserializer.setRemoveTypeHeaders(false);
    deserializer.addTrustedPackages("*");
    deserializer.setUseTypeMapperForKey(true);

    var props = new HashMap();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);

    props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo");

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);

    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
    }

    En mi caso rompía porque no cachaba el TRUSTED_PACKAGES..

    Saludos desde Lima Perú

    ResponderBorrar