Vamos a hacer un ejemplo de spring boot que escriba un mensaje con Kafka. Primero tenemos que bajar nuestro proyecto de spring initializr en esta pagina podemos agregar spring-kafka o agregarlo luego :
implementation 'org.springframework.kafka:spring-kafka'
Usando Gradle o Maven.
Antes vamos a hacer un mensaje de ejemplo, que va ser el mensaje a escribir :
public class Message {
private long id;
private String message;
private LocalDateTime time;
public Message(long id, String message, LocalDateTime time) {
this.id = id;
this.message = message;
this.time = time;
}
public Message(String message, LocalDateTime time) {
this(new Random().nextLong(), message, time);
}
public Message(String message) {
this(message,LocalDateTime.now());
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public LocalDateTime getTime() {
return time;
}
public void setTime(LocalDateTime time) {
this.time = time;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Message message1 = (Message) o;
return Objects.equals(message, message1.message) &&
Objects.equals(time, message1.time);
}
@Override
public int hashCode() {
return Objects.hash(message, time);
}
@Override
public String toString() {
return "Message{" +
"message='" + message + '\'' +
", time=" + time +
'}';
}
}
Luego tenemos que hacer 2 objetos de configuración, uno para configurar el acceso a kafka y sus parámetros :
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Message> producerFactory() {
var configProps = new HashMap<String, Object>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
//Y creamos un template que nos ayuda a escribir mensaje con la configuración realizada.
@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Y luego configuramos el acceso al topic :
@Configuration
public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
var configs = new HashMap<String, Object>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topicDemo() {
return new NewTopic("demo", 1, (short) 1);
}
}
Ahora hacemos un servicio que escriba el mensaje:
@Service
public class ProducerService {
@Autowired
private KafkaTemplate<String, Message> kafkaTemplate;
@Value(value = "${kafka.topicName}")
private String topicName;
public void sendMessage(Message message) {
ListenableFuture<SendResult<String, Message>> future =
kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {
@Override
public void onSuccess(SendResult<String, Message> result) {
System.out.println("Sent message=[" + message.getTime() + message.getMessage() +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
System.out.println("Unable to send message=["
+ message + "] due to : " + ex.getMessage());
}
});
}
}
Y ya que estamos hacemos un controller que exponga un servicio Rest que escriba mensajes:
@RestController
@RequestMapping("demo")
public class DemoController {
@Autowired
private ProducerService service;
@GetMapping("/{msg}")
public void sendMessage(@PathVariable String msg) {
var message = new Message(msg);
service.sendMessage(message);
}
}
Y listo!!
Dejo link del repo : https://github.com/emanuelpeg/springBoot-Kafka-example