Vamos a hacer un ejemplo de spring boot que consuma un mensaje desde Kafka. Primero tenemos que bajar nuestro proyecto de spring initializr en esta pagina podemos agregar spring-kafka o agregarlo luego :
implementation 'org.springframework.kafka:spring-kafka'
Usando Gradle o Maven.
Antes vamos a hacer un mensaje de ejemplo, que va ser el mensaje a escribir :
public class Message {
private long id;
private String message;
private LocalDateTime time;
public Message() {}
public Message(long id, String message, LocalDateTime time) {
this.id = id;
this.message = message;
this.time = time;
}
public Message(String message, LocalDateTime time) {
this(new Random().nextLong(), message, time);
}
public Message(String message) {
this(message,LocalDateTime.now());
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public LocalDateTime getTime() {
return time;
}
public void setTime(LocalDateTime time) {
this.time = time;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Message message1 = (Message) o;
return Objects.equals(message, message1.message) &&
Objects.equals(time, message1.time);
}
@Override
public int hashCode() {
return Objects.hash(message, time);
}
@Override
public String toString() {
return "Message{" +
"message='" + message + '\'' +
", time=" + time +
'}';
}
}
Tenemos que escribir 2 clases de configuración, una para indicar como nos conectar a Kafka y como esta serializado el objeto.
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String, Message> consumerFactory() {
var props = new HashMap<String, Object>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"demo");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES,
"com.assembly.demo.model");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Message>
kafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, Message>();
factory.setConsumerFactory(this.consumerFactory());
return factory;
}
}
Y Otra para confugurar el topic :
@Configuration
public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
var configs = new HashMap<String, Object>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topicDemo() {
return new NewTopic("demo", 1, (short) 1);
}
}
Y ahora vamos a hacer un servicio que consuma el mensaje y lo escriba en pantalla :
@Service
public class ConsumerService {
@KafkaListener(topics = "${kafka.topicName}", groupId = "demo")
public void listenGroupDemo(Message message) {
System.out.println("Received Message in group demo : " + message);
}
}
Y Listo!!
Dejo link del repo : https://github.com/emanuelpeg/springBoot-Kafka-example