|
|
|
|
Supongamos que el tiempo de ida y vuelta de la red entre nuestra aplicación y el clúster de Kafka es de 10 ms. Si esperamos una respuesta después de enviar cada mensaje, enviar 100 mensajes tardará alrededor de 1 segundo. Por otro lado, si solo enviamos todos nuestros mensajes y no esperamos ninguna respuesta, enviar 100 mensajes apenas nos llevará tiempo. En la mayoría de los casos, realmente no necesitamos una respuesta: Kafka devuelve el tema, la partición y el desplazamiento del registro después de que se escribió, lo que generalmente no es requerido por la aplicación de envío. Por otro lado, necesitamos saber cuándo fallamos en enviar un mensaje por completo para poder lanzar una excepción, registrar un error o quizás escribir el mensaje en un archivo de “errores” para un análisis posterior.
Para enviar mensajes de forma asincrónica y manejar escenarios de error, el productor admite agregar un callback al enviar un registro. Veamos un ejemplo:
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
Seguimos con Kafka.
El primer paso para escribir mensajes a Kafka es crear un objeto productor. Un productor de Kafka tiene tres propiedades obligatorias:
bootstrap.servers
Lista de hosts: Como nos tenemos que conectar al cluster necesitamos, la ip de almenos una maquina pero se recomienda incluir al menos dos, por lo que en caso de que una se caiga, el productor aún podrá conectarse al clúster. Luego de conectarse, se le enviaran todas las ips del cluster.
key.serializer
Nombre de una clase que se utilizará para serializar las claves de los registros que produciremos en Kafka. Los brokers de Kafka esperan matrices de bytes como claves y valores de mensajes.
Sin embargo, la interfaz de productor permite, utilizando tipos parametrizados, enviar cualquier objeto Java como clave y valor. Esto hace que el código sea muy legible, pero también significa que el productor debe saber cómo convertir estos objetos en matrices de bytes.
key.serializer debe configurarse con el nombre de una clase que implemente la interfaz org.apache.kafka.common.serialization.Serializer. El productor utilizará esta clase para serializar el objeto clave en una matriz de bytes. El paquete de cliente de Kafka incluye ByteArraySerializer. Es necesario configurar key.serializer incluso si tiene la intención de enviar solo valores.
value.serializer Nombre de una clase que se utilizará para serializar los valores de los registros que produciremos en Kafka. De la misma manera que establece key.serializer en un nombre de una clase que serializará el objeto de clave de mensaje en una matriz de bytes, establece value.serializer en una clase que serializará el objeto de valor de mensaje.
El siguiente fragmento de código muestra cómo crear un nuevo productor estableciendo solo los parámetros obligatorios y usando valores predeterminados para todo lo demás:
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Vamos a hacer un ejemplo de spring boot que consuma un mensaje desde Kafka. Primero tenemos que bajar nuestro proyecto de spring initializr en esta pagina podemos agregar spring-kafka o agregarlo luego :
implementation 'org.springframework.kafka:spring-kafka'
Usando Gradle o Maven.
Antes vamos a hacer un mensaje de ejemplo, que va ser el mensaje a escribir :
public class Message {
private long id;
private String message;
private LocalDateTime time;
public Message() {}
public Message(long id, String message, LocalDateTime time) {
this.id = id;
this.message = message;
this.time = time;
}
public Message(String message, LocalDateTime time) {
this(new Random().nextLong(), message, time);
}
public Message(String message) {
this(message,LocalDateTime.now());
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public LocalDateTime getTime() {
return time;
}
public void setTime(LocalDateTime time) {
this.time = time;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Message message1 = (Message) o;
return Objects.equals(message, message1.message) &&
Objects.equals(time, message1.time);
}
@Override
public int hashCode() {
return Objects.hash(message, time);
}
@Override
public String toString() {
return "Message{" +
"message='" + message + '\'' +
", time=" + time +
'}';
}
}
Tenemos que escribir 2 clases de configuración, una para indicar como nos conectar a Kafka y como esta serializado el objeto.
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String, Message> consumerFactory() {
var props = new HashMap<String, Object>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"demo");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES,
"com.assembly.demo.model");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Message>
kafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, Message>();
factory.setConsumerFactory(this.consumerFactory());
return factory;
}
}
Y Otra para confugurar el topic :
@Configuration
public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
var configs = new HashMap<String, Object>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topicDemo() {
return new NewTopic("demo", 1, (short) 1);
}
}
Y ahora vamos a hacer un servicio que consuma el mensaje y lo escriba en pantalla :
@Service
public class ConsumerService {
@KafkaListener(topics = "${kafka.topicName}", groupId = "demo")
public void listenGroupDemo(Message message) {
System.out.println("Received Message in group demo : " + message);
}
}
Y Listo!!
Dejo link del repo : https://github.com/emanuelpeg/springBoot-Kafka-example
implementation 'org.springframework.kafka:spring-kafka'
Usando Gradle o Maven.
Antes vamos a hacer un mensaje de ejemplo, que va ser el mensaje a escribir :
public class Message {
private long id;
private String message;
private LocalDateTime time;
public Message(long id, String message, LocalDateTime time) {
this.id = id;
this.message = message;
this.time = time;
}
public Message(String message, LocalDateTime time) {
this(new Random().nextLong(), message, time);
}
public Message(String message) {
this(message,LocalDateTime.now());
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public LocalDateTime getTime() {
return time;
}
public void setTime(LocalDateTime time) {
this.time = time;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Message message1 = (Message) o;
return Objects.equals(message, message1.message) &&
Objects.equals(time, message1.time);
}
@Override
public int hashCode() {
return Objects.hash(message, time);
}
@Override
public String toString() {
return "Message{" +
"message='" + message + '\'' +
", time=" + time +
'}';
}
}
Luego tenemos que hacer 2 objetos de configuración, uno para configurar el acceso a kafka y sus parámetros :
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Message> producerFactory() {
var configProps = new HashMap<String, Object>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
//Y creamos un template que nos ayuda a escribir mensaje con la configuración realizada.
@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Y luego configuramos el acceso al topic :
@Configuration
public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
var configs = new HashMap<String, Object>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topicDemo() {
return new NewTopic("demo", 1, (short) 1);
}
}
Ahora hacemos un servicio que escriba el mensaje:
@Service
public class ProducerService {
@Autowired
private KafkaTemplate<String, Message> kafkaTemplate;
@Value(value = "${kafka.topicName}")
private String topicName;
public void sendMessage(Message message) {
ListenableFuture<SendResult<String, Message>> future =
kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {
@Override
public void onSuccess(SendResult<String, Message> result) {
System.out.println("Sent message=[" + message.getTime() + message.getMessage() +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
System.out.println("Unable to send message=["
+ message + "] due to : " + ex.getMessage());
}
});
}
}
Y ya que estamos hacemos un controller que exponga un servicio Rest que escriba mensajes:
@RestController
@RequestMapping("demo")
public class DemoController {
@Autowired
private ProducerService service;
@GetMapping("/{msg}")
public void sendMessage(@PathVariable String msg) {
var message = new Message(msg);
service.sendMessage(message);
}
}
Y listo!!
Dejo link del repo : https://github.com/emanuelpeg/springBoot-Kafka-example
:x
Ya sea que usemos Kafka como cola, bus de mensajes o plataforma de almacenamiento de datos, siempre un productor escribe datos en Kafka, un consumidor lee datos de Kafka o una aplicación cumple ambos roles.
Por ejemplo, en un sistema de procesamiento de transacciones de tarjetas de crédito, habrá una aplicación de cliente, tal vez una tienda en línea, responsable de enviar cada transacción a Kafka inmediatamente cuando se realiza un pago. Otra aplicación es responsable de verificar inmediatamente esta transacción con un motor de reglas y determinar si la transacción se aprueba o se niega. La respuesta de aprobación / denegación se puede volver a escribir a Kafka y la respuesta se puede propagar a la tienda en línea donde se inició la transacción. Una tercera aplicación puede leer tanto las transacciones como el estado de aprobación de Kafka y almacenarlos en una base de datos donde los analistas pueden luego revisar las decisiones y quizás mejorar el motor de reglas.
Apache Kafka se envía con API de cliente integradas que los desarrolladores pueden usar cuando desarrollan aplicaciones que interactúan con Kafka.
Hay muchas razones por las que una aplicación puede necesitar escribir mensajes en Kafka: registrar las actividades del usuario para auditoría o análisis, registrar métricas, almacenar mensajes de registro, registrar información de dispositivos inteligentes, comunicarse de forma asíncrona con otras aplicaciones, almacenar información en búfer antes de escribir en una base de datos y mucho más.
Esos diversos casos de uso también implican diversos requisitos: ¿cada mensaje es crítico o podemos tolerar la pérdida de mensajes? ¿Estamos de acuerdo con la duplicación accidental de mensajes? ¿Existe algún requisito estricto de latencia o rendimiento que debamos cumplir?
En el ejemplo de procesamiento de transacciones con tarjeta de crédito que presentamos anteriormente, podemos ver que es fundamental no perder un solo mensaje ni duplicar ningún mensaje. La latencia debe ser baja, pero se pueden tolerar latencias de hasta 500 ms y el rendimiento debe ser muy alto: esperamos procesar hasta un millón de mensajes por segundo.
Un caso de uso diferente podría ser almacenar información de clics de un sitio web. En ese caso, se puede tolerar la pérdida de algunos mensajes o algunos duplicados; La latencia puede ser alta siempre que no afecte a la experiencia del usuario. En otras palabras, no nos importa si el mensaje tarda unos segundos en llegar a Kafka, siempre que la página siguiente se cargue inmediatamente después de que el usuario haga clic en un enlace. El rendimiento dependerá del nivel de actividad que preveamos en nuestro sitio web.
Los diferentes requisitos influirán en la forma en que utilice la API del productor para escribir mensajes en Kafka y la configuración que utilice.
Si bien las API del productor son muy simples, hay un poco más que sucede bajo el capó del productor cuando enviamos datos.
Comenzamos a producir mensajes a Kafka creando un ProducerRecord, que debe incluir el tema al que queremos enviar el registro y un valor. Opcionalmente, también podemos especificar una clave y / o una partición. Una vez que enviamos el ProducerRecord, lo primero que hará el productor es serializar la clave y los objetos de valor en ByteArrays para que puedan enviarse a través de la red.
A continuación, los datos se envían a un particionador. Si especificamos una partición en ProducerRecord, el particionador no hace nada y simplemente devuelve la partición que especificamos. Si no lo hicimos, el particionador elegirá una partición por nosotros, generalmente basada en la clave ProducerRecord. Una vez que se selecciona una partición, el productor sabe a qué tema y partición irá el registro. Luego agrega el registro a un lote de registros que también se enviarán al mismo tema y partición. Un hilo independiente es responsable de enviar esos lotes de registros a los brokers de Kafka correspondientes.
Cuando el broker recibe los mensajes, envía una respuesta. Si los mensajes se escribieron correctamente en Kafka, devolverá un objeto RecordMetadata con el tema, la partición y el desplazamiento del registro dentro de la partición. Si el corredor no pudo escribir los mensajes, devolverá un error. Cuando el productor recibe un error, puede volver a intentar enviar el mensaje unas cuantas veces más antes de darse por vencido y devolver un error.
Como dato que podria observar Rust es el lenguaje más amado y Javascript el más utilizado.
Otra cosa que me llamo mucho la atención es que Svelte, este como framework front más amado, ya me tengo que poner a estudiar Svelte.
Dejo link : https://insights.stackoverflow.com/survey/2021
Cuando se trabaja con varios centros de datos en particular, a menudo se requiere que los mensajes se copien entre ellos. De esta manera, las aplicaciones en línea pueden tener acceso a la actividad del usuario en ambos sitios. Por ejemplo, si un usuario cambia información pública en su perfil, ese cambio deberá ser visible independientemente del centro de datos en el que se muestren los resultados de la búsqueda. O bien, los datos de monitoreo se pueden recopilar de muchos sitios en una sola ubicación central donde se alojan los sistemas de análisis y alerta. Los mecanismos de replicación dentro de los clústeres de Kafka están diseñados solo para funcionar dentro de un único clúster, no entre varios clústeres.
El proyecto Kafka incluye una herramienta llamada MirrorMaker, utilizada para este propósito. En esencia, MirrorMaker es simplemente un consumidor y un productor de Kafka, vinculado con una cola. Los mensajes se consumen de un clúster de Kafka y se generan para otro.
Me llego este mail y quiero compartirlo con ustedes :
| |||||||
| |||||||
| |||||||
Los corredores de Kafka están diseñados para operar como parte de un clúster. Dentro de un grupo de agentes, un agente también funcionará como controlador del grupo (elegido automáticamente entre los miembros activos del grupo). El controlador es responsable de las operaciones administrativas, incluida la asignación de particiones a los intermediarios y la supervisión de las fallas de los intermediarios. La partición es propiedad de un único intermediario en el clúster, y ese intermediario se denomina líder de la partición. Se puede asignar una partición a varios intermediarios, lo que dará como resultado la replicación de la partición. Esto proporciona redundancia de mensajes en la partición, de modo que otro corredor puede asumir el liderazgo si falla el corredor. Sin embargo, todos los consumidores y productores que operan en esa partición deben conectarse con el líder.
Una característica clave de Apache Kafka es la retención, que es el almacenamiento duradero de mensajes durante un período de tiempo. Los corredores de Kafka están configurados con una configuración de retención predeterminada para los temas, ya sea reteniendo mensajes durante un período de tiempo (por ejemplo, 7 días) o hasta que el tema alcance un cierto tamaño en bytes (por ejemplo, 1 GB). Una vez que se alcanzan estos límites, los mensajes caducan y se eliminan para que la configuración de retención sea una cantidad mínima de datos disponibles en cualquier momento. Los temas individuales también se pueden configurar con su propia configuración de retención para que los mensajes se almacenen solo mientras sean útiles. Los temas también se pueden configurar como registro compactado, lo que significa que Kafka retendrá solo el último mensaje producido con una clave específica. Esto puede ser útil para datos de tipo registro de cambios, donde solo la última actualización es interesante.
Los productores crean nuevos mensajes. En otros sistemas de publicación / suscripción, estos pueden denominarse editores o escritores. En general, se producirá un mensaje sobre un tema específico.
De forma predeterminada, al productor no le importa en qué partición se escribe un mensaje específico y equilibrará los mensajes en todas las particiones de un tema de manera uniforme. En algunos casos, el productor enviará mensajes a particiones específicas. Esto generalmente se hace usando la clave del mensaje y un particionador que generará un hash de la clave y lo asignará a una partición específica. Esto asegura que todos los mensajes producidos con una clave determinada se escribirán en la misma partición. El productor también podría usar un particionador personalizado que siga otras reglas comerciales para mapear mensajes a particiones.
Los consumidores leen mensajes. En otros sistemas de publicación / suscripción, estos clientes pueden denominarse suscriptores o lectores. El consumidor se suscribe a uno o más temas y lee los mensajes en el orden en que fueron producidos. El consumidor realiza un seguimiento de los mensajes que ya ha consumido al realizar un seguimiento del desplazamiento de los mensajes. El desplazamiento es otro bit de metadatos, un valor entero que aumenta continuamente, que Kafka agrega a cada mensaje a medida que se produce. Cada mensaje de una partición determinada tiene un desplazamiento único. Al almacenar el desplazamiento del último mensaje consumido para cada partición, ya sea en Zookeeper o en el propio Kafka, un consumidor puede detenerse y reiniciarse sin perder su lugar.
Los consumidores trabajan como parte de un grupo de consumidores, que es uno o más consumidores que trabajan juntos para consumir un tema. El grupo asegura que cada partición solo sea consumida por un miembro. El mapeo de un consumidor a una partición a menudo se denomina propiedad de la partición por parte del consumidor.
De esta manera, los consumidores pueden escalar horizontalmente para consumir temas con una gran cantidad de mensajes. Además, si un solo consumidor falla, los miembros restantes del grupo reequilibrarán las particiones que se consumen para reemplazar al miembro faltante.