Translate

viernes, 13 de agosto de 2021

Escribiendo mensajes en Apanche Kafka con Spring Boot


Vamos a hacer un ejemplo de spring boot que escriba un mensaje con Kafka. Primero tenemos que bajar nuestro proyecto de spring initializr en esta pagina podemos agregar spring-kafka o agregarlo luego  : 

implementation 'org.springframework.kafka:spring-kafka'

Usando Gradle o Maven. 

Antes vamos a hacer un mensaje de ejemplo, que va ser el mensaje a escribir : 


public class Message {

    private long id;

    private String message;

    private LocalDateTime time;


    public Message(long id, String message, LocalDateTime time) {

        this.id = id;

        this.message = message;

        this.time = time;

    }


    public Message(String message, LocalDateTime time) {

        this(new Random().nextLong(), message, time);

    }


    public Message(String message) {

        this(message,LocalDateTime.now());

    }


    public long getId() {

        return id;

    }


    public void setId(long id) {

        this.id = id;

    }


    public String getMessage() {

        return message;

    }


    public void setMessage(String message) {

        this.message = message;

    }


    public LocalDateTime getTime() {

        return time;

    }


    public void setTime(LocalDateTime time) {

        this.time = time;

    }

    @Override

    public boolean equals(Object o) {

        if (this == o) return true;

        if (o == null || getClass() != o.getClass()) return false;

        Message message1 = (Message) o;

        return Objects.equals(message, message1.message) &&

                Objects.equals(time, message1.time);

    }

    @Override

    public int hashCode() {

        return Objects.hash(message, time);

    }

    @Override

    public String toString() {

        return "Message{" +

                "message='" + message + '\'' +

                ", time=" + time +

                '}';

    }

}


Luego tenemos que hacer 2 objetos de configuración, uno para configurar el acceso a kafka y sus parámetros : 

@Configuration

public class KafkaProducerConfig {


    @Value(value = "${kafka.bootstrapAddress}")

    private String bootstrapAddress;


    @Bean

    public ProducerFactory<String, Message> producerFactory() {

        var configProps = new HashMap<String, Object>();

        configProps.put(

                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,

                bootstrapAddress);

        configProps.put(

                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

                StringSerializer.class);

        configProps.put(

                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,

                JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(configProps);

    }

//Y creamos un template que nos ayuda a escribir mensaje con la configuración realizada. 

    @Bean

    public KafkaTemplate<String, Message> kafkaTemplate() {

        return new KafkaTemplate<>(producerFactory());

    }

}


Y luego configuramos el acceso al topic : 


@Configuration

public class KafkaTopicConfig {

    @Value(value = "${kafka.bootstrapAddress}")

    private String bootstrapAddress;


    @Bean

    public KafkaAdmin kafkaAdmin() {

        var configs = new HashMap<String, Object>();

        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);

        return new KafkaAdmin(configs);

    }

    @Bean

    public NewTopic topicDemo() {

        return new NewTopic("demo", 1, (short) 1);

    }

}

Ahora hacemos un servicio que escriba el mensaje: 


@Service

public class ProducerService {

    @Autowired

    private KafkaTemplate<String, Message> kafkaTemplate;


    @Value(value = "${kafka.topicName}")

    private String topicName;


    public void sendMessage(Message message) {


        ListenableFuture<SendResult<String, Message>> future =

                kafkaTemplate.send(topicName, message);


        future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {


            @Override

            public void onSuccess(SendResult<String, Message> result) {

                System.out.println("Sent message=[" + message.getTime() + message.getMessage() +

                        "] with offset=[" + result.getRecordMetadata().offset() + "]");

            }

            @Override

            public void onFailure(Throwable ex) {

                System.out.println("Unable to send message=["

                        + message + "] due to : " + ex.getMessage());

            }

        });

    }

}


Y ya que estamos hacemos un controller que exponga un servicio Rest que escriba mensajes: 

@RestController

@RequestMapping("demo")

public class DemoController {

    @Autowired

    private ProducerService service;


    @GetMapping("/{msg}")

    public void sendMessage(@PathVariable String msg) {

        var message = new Message(msg);

        service.sendMessage(message);

    }

}

Y listo!! 

Dejo link del repo : https://github.com/emanuelpeg/springBoot-Kafka-example