Translate

jueves, 2 de septiembre de 2021

Primeros pasos con Apache Kafka parte 10

 


Seguimos con Kafka

Hasta ahora, hemos visto muy pocos parámetros de configuración para los productores, solo el URI y los serializadores de bootstrap.servers obligatorios. El productor tiene una gran cantidad de parámetros de configuración; la mayoría están documentados en la documentación de Apache Kafka y muchos tienen valores predeterminados razonables, por lo que no hay razón para modificar cada parámetro. Sin embargo, algunos de los parámetros tienen un impacto significativo en el uso de la memoria, el rendimiento y la confiabilidad de los productores. Los revisaremos aquí.

acks

El parámetro acks controla cuántas réplicas de partición deben recibir el registro antes de que el productor pueda considerar que la escritura se ha realizado correctamente. Esta opción tiene un impacto significativo en la probabilidad de que se pierdan los mensajes. Hay tres valores permitidos para el parámetro acks:

• Si acks = 0, el productor no esperará una respuesta del broker antes de asumir que el mensaje se envió correctamente. Esto significa que si algo salió mal y el corredor no recibirá mensaje, el productor no lo sabrá y el mensaje se perderá. Sin embargo, debido a que el productor no está esperando respuesta del servidor, puede enviar mensajes tan rápido como lo admita la red, por lo que esta configuración se puede utilizar para lograr un rendimiento muy alto.

• Si acks = 1, el productor recibirá una respuesta de éxito del corredor en el momento en que la réplica del líder reciba el mensaje. Si el mensaje no se puede escribir al líder (por ejemplo, si el líder colapsó y aún no se eligió un nuevo líder), el productor recibirá una respuesta de error y podrá volver a intentar enviar el mensaje, evitando la pérdida potencial de datos. El mensaje aún puede perderse si el líder falla y se elige una réplica sin este mensaje como el nuevo líder (a través de unclean elección de líder). En este caso, el rendimiento depende de si enviamos mensajes de forma sincrónica o asincrónica. Si nuestro código de cliente espera una respuesta del servidor (al llamar al método get () del objeto Future devuelto al enviar un mensaje) obviamente aumentará la latencia significativamente (al menos por una red viaje ida y vuelta). Si el cliente usa devoluciones de llamada, la latencia se ocultará, pero el rendimiento estará limitado por la cantidad de mensajes en tránsito (es decir, cuántos mensajes enviará el productor antes de recibir respuestas del servidor).

• Si acks = all, el productor recibirá una respuesta exitosa del broker una vez que todas las réplicas sincronizadas hayan recibido el mensaje. Este es el modo más seguro, ya que puede asegurarse de que más de un corredor tenga el mensaje y de que el mensaje sobrevivirá incluso en el caso de una falla, sin embargo, la latencia que discutimos en el caso acks = 1 será aún mayor, ya que estaremos esperando que más de un broker reciba el mensaje.

buffer.memory

Esto establece la cantidad de memoria que el productor utilizará para almacenar en búfer los mensajes que esperan ser enviados a los intermediarios. Si los mensajes son enviados por la aplicación más rápido de lo que pueden entregarse al servidor, el productor puede quedarse sin espacio y las llamadas send () adicionales bloquearán o lanzarán una excepción, según el parámetro block.on.buffer.full ( reemplazado con max.block.ms en la versión 0.9.0.0, que permite bloquear durante un cierto tiempo y luego lanzar una excepción).

compression.type

De forma predeterminada, los mensajes se envían sin comprimir. Este parámetro se puede establecer en snappy, gzip o lz4, en cuyo caso se utilizarán los algoritmos de compresión correspondientes para comprimir los datos antes de enviarlos a los corredores. Google inventó la compresión rápida para proporcionar relaciones de compresión decentes con una sobrecarga de CPU baja y buena rendimiento, por lo que se recomienda en los casos en que tanto el rendimiento como el ancho de banda sean una preocupación. La compresión Gzip normalmente utilizará más CPU y tiempo, pero dará como resultado mejores relaciones de compresión, por lo que se recomienda en los casos en que el ancho de banda de la red es más restringido. Al habilitar la compresión, reduce la utilización de la red y el almacenamiento  que suele ser un cuello de botella al enviar mensajes a Kafka.

retries

Cuando el productor recibe un mensaje de error del servidor, el error podría ser transitorio (por ejemplo, falta de líder para una partición). En este caso, el valor del parámetro de reintentos controlará cuántas veces el productor reintentará enviar el mensaje antes de darse por vencido y notificar al cliente de un problema. De forma predeterminada, el productor esperará 100 ms entre reintentos, pero puede controlar esto mediante el parámetro retry.backoff.ms. Recomendamos probar cuánto tiempo se tarda en recuperarse de un corredor bloqueado (es decir, cuánto tiempo hasta que todas las particiones obtengan nuevos líderes) y establecer el número de reintentos y el retraso entre ellos de modo que la cantidad total de tiempo dedicado a reintentar sea mayor que el tiempo el clúster de Kafka necesita recuperarse del colapso; de lo contrario, el productor se rendirá demasiado pronto. El productor no volverá a intentar todos los errores. Algunos errores no son transitorios y no provocarán reintentos (p. Ej., Error "mensaje demasiado grande"). En general, debido a que el productor maneja los reintentos por usted, no tiene sentido manejar los reintentos dentro de su propia lógica de aplicación. ustedquerrá centrar sus esfuerzos en el manejo de errores no confiables o casos en los que se agotaron los reintentos.

batch.size

Cuando se envían varios registros a la misma partición, el productor los agrupará. Este parámetro controla la cantidad de memoria en bytes (¡no en mensajes!) que se utilizará para cada lote. Cuando el lote esté lleno, se enviarán todos los mensajes del lote. Sin embargo, esto no significa que el productor esperará a que el lote se llene. El productor enviará lotes medio llenos e incluso lotes con un solo mensaje en ellos. Por lo tanto, establecer un tamaño de lote demasiado grande no causará demoras en el envío de mensajes; solo usará más memoria para los lotes. Establecer el tamaño del lote demasiado pequeño agregará algunos gastos generales porque el productor necesitará enviar mensajes con más frecuencia.

linger.ms

linger.ms controla la cantidad de tiempo para esperar mensajes adicionales antes de enviar el lote actual. KafkaProducer envía un lote de mensajes cuando la corriente el lote de alquiler está lleno o cuando se alcanza el límite de linger.ms. De forma predeterminada, el productor enviará mensajes tan pronto como haya un hilo de remitente disponible para enviarlos, incluso si solo hay un mensaje en el lote. Al establecer linger.ms por encima de 0, le indicamos al productor que espere unos milisegundos para agregar mensajes adicionales al lote antes de enviarlo a los brokers. Esto aumenta la latencia pero también aumenta el rendimiento (debido a que enviamos más mensajes a la vez, hay menos gastos generales por mensaje).

client.id

Esta puede ser cualquier cadena y será utilizada por los intermediarios para identificar los mensajes enviados desde el cliente. Se utiliza en registros y métricas, y para cuotas.

max.in.flight.requests.per.connection

Esto controla cuántos mensajes enviará el productor al servidor sin recibir respuestas. Establecer este valor alto puede aumentar el uso de la memoria mientras mejora rendimiento, pero establecerlo demasiado alto puede reducir el rendimiento a medida que el procesamiento por lotes se vuelve menos eficiente. Establecer esto en 1 garantizará que los mensajes se escribirán al corredor en el orden en que se enviaron, incluso cuando se produzcan reintentos.

timeout.ms, request.timeout.ms, and metadata.fetch.timeout.ms

Estos parámetros controlan cuánto tiempo el productor esperará una respuesta del servidor cuando envíe datos (request.timeout.ms) y cuando solicite metadatos como el líderes actuales para las particiones en las que estamos escribiendo (metadata.fetch.timeout.ms). Si se alcanza el tiempo de espera sin respuesta, el productor volverá a intentar enviar o responder con un error (ya sea a través de una excepción o mediante el envío de devolución de llamada). timeout.ms controla el tiempo que el intermediario esperará para que las réplicas sincronizadas reconozcan el mensaje para cumplir con la configuración de acks; el intermediario devolverá un error si transcurre el tiempo sin los reconocimientos necesarios.

max.block.ms

Este parámetro controla cuánto tiempo bloqueará el productor cuando llame a send () y cuando solicite explícitamente metadatos a través de particionesFor (). Esos métodos bloquean cuando el búfer de envío del productor está lleno o cuando los metadatos no están disponibles. Cuando se alcanza max.block.ms, se lanza una excepción de tiempo de espera.

max.request.size

Esta configuración controla el tamaño de una solicitud de producto enviada por el productor. Limita tanto el tamaño del mensaje más grande que se puede enviar como la cantidad de mensajes que el productor puede enviar en una solicitud. Por ejemplo, con un tamaño de solicitud máximo predeterminado de 1 MB, el mensaje más grande que puede enviar es 1 MB o el productor puede agrupar 1,000 mensajes de tamaño 1 K cada uno en una solicitud. Además, el corredor tiene su propio límite en el tamaño del mensaje más grande que aceptará (message.max.bytes). Por lo general, es una buena idea hacer coincidir estas configuraciones, para que el productor no intente enviar mensajes de un tamaño que el broker rechace.

receive.buffer.bytes y send.buffer.bytes

Estos son los tamaños de los búferes de envío y recepción de TCP que utilizan los sockets al escribir y leer datos. Si se establecen en -1, se utilizarán los valores predeterminados del sistema operativo. Es una buena idea aumentarlos cuando los productores o consumidores se comunican con intermediarios en un centro de datos diferente porque esos enlaces de red suelen tener mayor latencia y menor ancho de banda.


Apache Kafka conserva el orden de los mensajes dentro de una partición. Esto significa que si los mensajes se enviaron desde el productor en un orden específico, el broker los escribirá en una partición en ese orden y todos los consumidores los leerán en ese orden. Para algunos casos de uso, el orden es muy importante. Hay una gran diferencia entre depositar $ 100 en una cuenta y luego retirarlos, ¡y al revés! Sin embargo, algunos casos de uso son menos sensibles.

Establecer el parámetro de reintentos en un valor distinto de cero y el max.in.flights.requests.per.session en más de uno significa que es posible que el bróker no pueda escribir el primer lote de mensajes, logre escribir el segundo (que ya estaba en vuelo), y luego vuelva a intentar el primer lote y tenga éxito, invirtiendo así el orden.

Por lo general, establecer el número de reintentos a cero no es una opción en un sistema confiable, por lo que si garantizar el orden es crítico, recomendamos configurar in.flight.requests.per.session = 1 para asegurarse de que mientras se reintenta un lote de mensajes, no se enviarán mensajes adicionales (porque tiene el potencial de invertir el orden correcto).

Esto limitará severamente el rendimiento del productor, por lo tanto, utilícelo solo cuando el pedido sea importante.

miércoles, 1 de septiembre de 2021

30 años de Linux



El 25 de agosto de 1991, Linus Torvalds presenta el nucleo linux con estas palabras : 

"Hola a todos los que estáis ahí fuera usando minix –

Estoy desarrollando un sistema operativo (libre) (sólo por hobby, no será grande y profesional como gnu) para clónicos de los AT 386(486). Esta idea está madurando desde abril, y ahora está comenzando a estar lista. Me gustaría recibir cualquier comentario en cosas que a la gente le gustan y no le gustan de minix, ya que mi sistema operativo se parece a él un poco (misma disposición física del sistema de ficheros (por razones prácticas) entre otras cosas).

Ya he trasladado bash (1.08) y gcc (1.40) y parece que las cosas funcionan. Esto implica que podría tener algo práctico en pocos meses, y me gustaría saber qué características le gustarían más a la gente tener. Será bienvenida cualquier sugerencia, pero no prometo que las implementaré todas ellas :-) "

Este nucleo nos acompaña desde entonces y hace mucho mejores nuestras vidas....

Feliz Cumpleaños!!! 


viernes, 27 de agosto de 2021

Imperativo vs reactivo


Para comprender mejor el contraste entre los 2 mundos, necesitamos explicar la diferencia entre los modelos de ejecución reactiva e imperativa. 

En el enfoque tradicional e imperativo, los frameworks asignan un hilo para manejar una solicitud. Entonces, todo el procesamiento de la solicitud se ejecuta en este hilo de trabajo. Este modelo no escala muy bien. De hecho, para manejar múltiples solicitudes simultáneas, necesita múltiples subprocesos; por lo que la simultaneidad de su aplicación está limitada por el número de subprocesos. Además, estos subprocesos se bloquean tan pronto como su código interactúe con servicios remotos. Por lo tanto, conduce a un uso ineficiente de los recursos, ya que es posible que necesite más subprocesos, y cada subproceso, ya que están asignados a subprocesos del sistema operativo, tiene un costo en términos de memoria y CPU.

Por otro lado, el modelo reactivo se basa en E/S sin bloqueo y en un modelo de ejecución diferente. La E/S sin bloqueo proporciona una forma eficaz de tratar las E/S simultáneas. Una cantidad mínima de subprocesos denominados subprocesos de E/S puede manejar muchas E/S simultáneas. Con un modelo de este tipo, el procesamiento de solicitudes no se delega a un subproceso de trabajo, sino que utiliza estos subprocesos de E/S directamente. Ahorra memoria y CPU, ya que no es necesario crear subprocesos de trabajo para manejar las solicitudes. También mejora la simultaneidad ya que elimina la restricción sobre el número de subprocesos. Finalmente, también mejora el tiempo de respuesta ya que reduce el número de cambios de hilo.

Entonces, con el modelo de ejecución reactiva, las solicitudes se procesan mediante subprocesos de E/S. Pero eso no es todo. Un subproceso de E/S puede manejar varias solicitudes simultáneas. ¿Cómo? Aquí está el truco y una de las diferencias más significativas entre reactivo e imperativo.

Cuando el procesamiento de una solicitud requiere interactuar con un servicio remoto, como una API HTTP o una base de datos, no bloquea la ejecución mientras espera la respuesta. En cambio, programa la operación de E/S y adjunta una continuación, es decir, el código restante de procesamiento de la solicitud. Esta continuación se puede pasar como una devolución de llamada (una función invocada con el resultado de E/S) o utilizar construcciones más avanzadas como programación reactiva o co-rutinas. 

Independientemente de cómo se exprese la programación no bloqueante, lo esencial es la liberación del subproceso de E/S y, en consecuencia, el hecho de que este subproceso pueda utilizarse para procesar otra solicitud. Cuando se completa la E/S, el subproceso de E/S continúa el procesamiento de la solicitud pendiente.

Entonces, a diferencia del modelo imperativo, donde la E/S bloquea la ejecución, los conmutadores reactivos a un diseño basado en la continuación, donde se liberan los subprocesos de E/S y se invoca la continuación cuando se completan las E/S. Como resultado, el subproceso de E/S puede manejar múltiples solicitudes concurrentes, mejorando la concurrencia general de la aplicación.

Pero hay una trampa. Necesitamos una forma de escribir código continuation-passing o no bloqueante. Hay muchas maneras de hacer esto. Por ejemplo en Quarkus propone:

  • Mutiny: una biblioteca de programación reactiva intuitiva e impulsada por eventos
  • Co-rutinas de Kotlin: una forma de escribir código asincrónico de manera secuencial

Dejo link: https://quarkus.io/guides/getting-started-reactive

Libros gratuitos

 

Download IT Guides!

 

Android UI Design

Android’s user interface is based on direct manipulation, using touch inputs that loosely correspond to real-world actions, like swiping, tapping, pinching and reverse pinching to...

 
 

JPA Minibook

The basic Java framework to access the database is JDBC. Unfortunately, with JDBC, a lot of hand work is needed to convert a database query result into Java classes. Other disadvantages...

 
 

Microservices for Java Developers

Microservices are a software development technique – a variant of the service-oriented architecture (SOA) structural style – that arranges an application as a collection of loosely...

 
 

JMeter Tutorial

JMeter is an application that offers several possibilities to configure and execute load, performance and stress tests using different technologies and protocols. It allows simulating...

 

 

The Best Web Programming Languages to Learn

A more comprehensive list of tasks to which web development commonly refers, may include web engineering, web design, web content development, client liaison, client-side/server-side...

 
 

Web Developer Interview Questions

A more comprehensive list of tasks to which web development commonly refers, may include web engineering, web design, web content development, client liaison, client-side/server-side...

 
 

Amazon AWS Lambda Tutorial

AWS Lambda is an event-driven, serverless computing platform provided by Amazon as a part of the Amazon Web Services. It is a computing service that runs code in response to events and...

 
 

Jetty Server Cookbook

The web server is used in products such as Apache ActiveMQ, Alfresco, Apache Geronimo, Apache Maven, Apache Spark, Google App Engine, Eclipse, FUSE, iDempiere, Twitter’s Streaming API and...

 

miércoles, 25 de agosto de 2021

Primeros pasos con Apache Kafka parte 9


Seguimos con Kafka

Supongamos que el tiempo de ida y vuelta de la red entre nuestra aplicación y el clúster de Kafka es de 10 ms. Si esperamos una respuesta después de enviar cada mensaje, enviar 100 mensajes tardará alrededor de 1 segundo. Por otro lado, si solo enviamos todos nuestros mensajes y no esperamos ninguna respuesta, enviar 100 mensajes apenas nos llevará tiempo. En la mayoría de los casos, realmente no necesitamos una respuesta: Kafka devuelve el tema, la partición y el desplazamiento del registro después de que se escribió, lo que generalmente no es requerido por la aplicación de envío. Por otro lado, necesitamos saber cuándo fallamos en enviar un mensaje por completo para poder lanzar una excepción, registrar un error o quizás escribir el mensaje en un archivo de “errores” para un análisis posterior.

Para enviar mensajes de forma asincrónica y manejar escenarios de error, el productor admite agregar un callback al enviar un registro. Veamos un ejemplo:

private class DemoProducerCallback implements Callback {

    @Override

    public void onCompletion(RecordMetadata recordMetadata, Exception e) {

        if (e != null) {

            e.printStackTrace();

        }

    }

}

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());

Para usar callbaks, necesitamos una clase que implemente la interfaz org.apache.kafka.clients.producer.Callback, que tiene una única función onCompletion().

Si Kafka devolvió un error, onCompletion() tendrá una excepción no nula. Aquí lo “manejamos” imprimiendo, pero el código de producción probablemente tendrá funciones de manejo de errores más robustas. Los registros son los mismos que antes. Y pasamos un objeto de devolución de llamada cuando enviamos el registro.

domingo, 22 de agosto de 2021

Primeros pasos con Apache Kafka parte 8


Seguimos con Kafka

La forma más sencilla de enviar un mensaje es la siguiente:

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");

try {

producer.send(record);

} catch (Exception e) {

e.printStackTrace();

}

El productor acepta objetos ProducerRecord, por lo que comenzamos creando uno. ProducerRecord tiene varios constructores. Aquí usamos uno que requiere el nombre del tema al que estamos enviando datos, que siempre es una cadena, y la clave y el valor que estamos enviando a Kafka, que en este caso también son cadenas. Los tipos de clave y valor deben coincidir con nuestros objetos de serializador y productor.

Usamos el método send() del objeto productor para enviar el ProducerRecord. El mensaje se colocará en un búfer y se enviará al broker en un hilo separado. El método send () devuelve un objeto Java Future con RecordMetadata, pero como simplemente ignoramos el valor devuelto, no tenemos forma de saber si el mensaje se envió correctamente o no. Este método de envío de mensajes se puede utilizar cuando es aceptable que algun mensaje no llegue

Si bien ignoramos los errores que pueden ocurrir al enviar mensajes a los brokers de Kafka, es posible que obtengamos una excepción si el productor encontró errores antes de enviar el mensaje a Kafka. Pueden ser una SerializationException cuando no se serializa el mensaje, una BufferExhaustedException o TimeoutException si el búfer está lleno, o una InterruptException si se interrumpió el hilo de envío.

sábado, 21 de agosto de 2021

Primeros pasos con Apache Kafka parte 7



Seguimos con Kafka

El primer paso para escribir mensajes a Kafka es crear un objeto productor. Un productor de Kafka tiene tres propiedades obligatorias:

bootstrap.servers

Lista de hosts: Como nos tenemos que conectar al cluster necesitamos, la ip de almenos una maquina pero se recomienda incluir al menos dos, por lo que en caso de que una se caiga, el productor aún podrá conectarse al clúster. Luego de conectarse, se le enviaran todas las ips del cluster. 

key.serializer

Nombre de una clase que se utilizará para serializar las claves de los registros que produciremos en Kafka. Los brokers de Kafka esperan matrices de bytes como claves y valores de mensajes.

Sin embargo, la interfaz de productor permite, utilizando tipos parametrizados, enviar cualquier objeto Java como clave y valor. Esto hace que el código sea muy legible, pero también significa que el productor debe saber cómo convertir estos objetos en matrices de bytes.

key.serializer debe configurarse con el nombre de una clase que implemente la interfaz org.apache.kafka.common.serialization.Serializer. El productor utilizará esta clase para serializar el objeto clave en una matriz de bytes. El paquete de cliente de Kafka incluye ByteArraySerializer. Es necesario configurar key.serializer incluso si tiene la intención de enviar solo valores.

value.serializer Nombre de una clase que se utilizará para serializar los valores de los registros que produciremos en Kafka. De la misma manera que establece key.serializer en un nombre de una clase que serializará el objeto de clave de mensaje en una matriz de bytes, establece value.serializer en una clase que serializará el objeto de valor de mensaje.

El siguiente fragmento de código muestra cómo crear un nuevo productor estableciendo solo los parámetros obligatorios y usando valores predeterminados para todo lo demás:

private Properties kafkaProps = new Properties();

kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");

kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<String, String>(kafkaProps);

Con una interfaz tan simple, está claro que la mayor parte del control sobre el comportamiento del productor se realiza estableciendo las propiedades de configuración correctas. La documentación de Apache Kafka cubre todas las opciones de configuración, y repasaremos las importantes más adelante.

Una vez que creamos una instancia de un productor, es hora de comenzar a enviar mensajes. Hay tres métodos principales para enviar mensajes:

Dispara y olvida
Enviamos un mensaje al servidor y realmente no nos importa si llega con éxito o no. La mayoría de las veces, llegará correctamente, ya que Kafka tiene una alta disponibilidad y el productor volverá a intentar enviar mensajes automáticamente. Sin embargo, algunos mensajes se pueden perder con este método, hay casos de uso que no nos importa estar seguros si llegan los mensajes para esos casos es este método. 

Envío sincrónico
Enviamos un mensaje, el método send () devuelve un objeto Future y usamos get () para esperar y ver si el envío fue exitoso o no.

Envío asincrónico
Llamamos al método send() con una función de devolución de llamada, que se activa cuando recibe una respuesta del corredor de Kafka. 

En los ejemplos que siguen, veremos cómo enviar mensajes usando estos métodos y cómo manejar los diferentes tipos de errores que pueden ocurrir pero eso es una historia para otro post...

viernes, 13 de agosto de 2021

Consumiendo mensajes desde Apache Kafka con Spring Boot


Vamos a hacer un ejemplo de spring boot que consuma un mensaje desde Kafka. Primero tenemos que bajar nuestro proyecto de spring initializr en esta pagina podemos agregar spring-kafka o agregarlo luego  : 

implementation 'org.springframework.kafka:spring-kafka'

Usando Gradle o Maven. 

Antes vamos a hacer un mensaje de ejemplo, que va ser el mensaje a escribir : 


public class Message {

    private long id;

    private String message;

    private LocalDateTime time;


    public Message() {}


    public Message(long id, String message, LocalDateTime time) {

        this.id = id;

        this.message = message;

        this.time = time;

    }


    public Message(String message, LocalDateTime time) {

        this(new Random().nextLong(), message, time);

    }


    public Message(String message) {

        this(message,LocalDateTime.now());

    }


    public long getId() {

        return id;

    }


    public void setId(long id) {

        this.id = id;

    }


    public String getMessage() {

        return message;

    }


    public void setMessage(String message) {

        this.message = message;

    }


    public LocalDateTime getTime() {

        return time;

    }


    public void setTime(LocalDateTime time) {

        this.time = time;

    }


    @Override

    public boolean equals(Object o) {

        if (this == o) return true;

        if (o == null || getClass() != o.getClass()) return false;

        Message message1 = (Message) o;

        return Objects.equals(message, message1.message) &&

                Objects.equals(time, message1.time);

    }


    @Override

    public int hashCode() {

        return Objects.hash(message, time);

    }


    @Override

    public String toString() {

        return "Message{" +

                "message='" + message + '\'' +

                ", time=" + time +

                '}';

    }

}

Tenemos que escribir 2 clases de configuración, una para indicar como nos conectar a Kafka y como esta serializado el objeto. 


@EnableKafka

@Configuration

public class KafkaConsumerConfig {


    @Value(value = "${kafka.bootstrapAddress}")

    private String bootstrapAddress;


    @Bean

    public ConsumerFactory<String, Message> consumerFactory() {

        var props = new HashMap<String, Object>();

        props.put(

                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,

                bootstrapAddress);

        props.put(

                ConsumerConfig.GROUP_ID_CONFIG,

                "demo");

        props.put(

                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

                StringDeserializer.class);

        props.put(

                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

                JsonDeserializer.class);

        props.put(JsonDeserializer.TRUSTED_PACKAGES,

                "com.assembly.demo.model");

        return new DefaultKafkaConsumerFactory<>(props);

    }


    @Bean

    public ConcurrentKafkaListenerContainerFactory<String, Message>

    kafkaListenerContainerFactory() {

        var factory = new ConcurrentKafkaListenerContainerFactory<String, Message>();

        factory.setConsumerFactory(this.consumerFactory());

        return factory;

    }

}

Y Otra para confugurar el topic : 


@Configuration

public class KafkaTopicConfig {

    @Value(value = "${kafka.bootstrapAddress}")

    private String bootstrapAddress;

    @Bean

    public KafkaAdmin kafkaAdmin() {

        var configs = new HashMap<String, Object>();

        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);

        return new KafkaAdmin(configs);

    }

    @Bean

    public NewTopic topicDemo() {

        return new NewTopic("demo", 1, (short) 1);

    }

}


Y ahora vamos a hacer un servicio que consuma el mensaje y lo escriba en pantalla : 


@Service

public class ConsumerService {

    @KafkaListener(topics = "${kafka.topicName}", groupId = "demo")

    public void listenGroupDemo(Message message) {

        System.out.println("Received Message in group demo : " + message);

    }

}

Y Listo!!

Dejo link del repo : https://github.com/emanuelpeg/springBoot-Kafka-example

Escribiendo mensajes en Apanche Kafka con Spring Boot


Vamos a hacer un ejemplo de spring boot que escriba un mensaje con Kafka. Primero tenemos que bajar nuestro proyecto de spring initializr en esta pagina podemos agregar spring-kafka o agregarlo luego  : 

implementation 'org.springframework.kafka:spring-kafka'

Usando Gradle o Maven. 

Antes vamos a hacer un mensaje de ejemplo, que va ser el mensaje a escribir : 


public class Message {

    private long id;

    private String message;

    private LocalDateTime time;


    public Message(long id, String message, LocalDateTime time) {

        this.id = id;

        this.message = message;

        this.time = time;

    }


    public Message(String message, LocalDateTime time) {

        this(new Random().nextLong(), message, time);

    }


    public Message(String message) {

        this(message,LocalDateTime.now());

    }


    public long getId() {

        return id;

    }


    public void setId(long id) {

        this.id = id;

    }


    public String getMessage() {

        return message;

    }


    public void setMessage(String message) {

        this.message = message;

    }


    public LocalDateTime getTime() {

        return time;

    }


    public void setTime(LocalDateTime time) {

        this.time = time;

    }

    @Override

    public boolean equals(Object o) {

        if (this == o) return true;

        if (o == null || getClass() != o.getClass()) return false;

        Message message1 = (Message) o;

        return Objects.equals(message, message1.message) &&

                Objects.equals(time, message1.time);

    }

    @Override

    public int hashCode() {

        return Objects.hash(message, time);

    }

    @Override

    public String toString() {

        return "Message{" +

                "message='" + message + '\'' +

                ", time=" + time +

                '}';

    }

}


Luego tenemos que hacer 2 objetos de configuración, uno para configurar el acceso a kafka y sus parámetros : 

@Configuration

public class KafkaProducerConfig {


    @Value(value = "${kafka.bootstrapAddress}")

    private String bootstrapAddress;


    @Bean

    public ProducerFactory<String, Message> producerFactory() {

        var configProps = new HashMap<String, Object>();

        configProps.put(

                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,

                bootstrapAddress);

        configProps.put(

                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

                StringSerializer.class);

        configProps.put(

                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,

                JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(configProps);

    }

//Y creamos un template que nos ayuda a escribir mensaje con la configuración realizada. 

    @Bean

    public KafkaTemplate<String, Message> kafkaTemplate() {

        return new KafkaTemplate<>(producerFactory());

    }

}


Y luego configuramos el acceso al topic : 


@Configuration

public class KafkaTopicConfig {

    @Value(value = "${kafka.bootstrapAddress}")

    private String bootstrapAddress;


    @Bean

    public KafkaAdmin kafkaAdmin() {

        var configs = new HashMap<String, Object>();

        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);

        return new KafkaAdmin(configs);

    }

    @Bean

    public NewTopic topicDemo() {

        return new NewTopic("demo", 1, (short) 1);

    }

}

Ahora hacemos un servicio que escriba el mensaje: 


@Service

public class ProducerService {

    @Autowired

    private KafkaTemplate<String, Message> kafkaTemplate;


    @Value(value = "${kafka.topicName}")

    private String topicName;


    public void sendMessage(Message message) {


        ListenableFuture<SendResult<String, Message>> future =

                kafkaTemplate.send(topicName, message);


        future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {


            @Override

            public void onSuccess(SendResult<String, Message> result) {

                System.out.println("Sent message=[" + message.getTime() + message.getMessage() +

                        "] with offset=[" + result.getRecordMetadata().offset() + "]");

            }

            @Override

            public void onFailure(Throwable ex) {

                System.out.println("Unable to send message=["

                        + message + "] due to : " + ex.getMessage());

            }

        });

    }

}


Y ya que estamos hacemos un controller que exponga un servicio Rest que escriba mensajes: 

@RestController

@RequestMapping("demo")

public class DemoController {

    @Autowired

    private ProducerService service;


    @GetMapping("/{msg}")

    public void sendMessage(@PathVariable String msg) {

        var message = new Message(msg);

        service.sendMessage(message);

    }

}

Y listo!! 

Dejo link del repo : https://github.com/emanuelpeg/springBoot-Kafka-example

Red Hat Developer eBooks


Medio tarde encontré este link el cual tiene unos buenos libros para desarrolladores y que quiero compartir con ustedes : 

https://developers.redhat.com/ebooks

lunes, 9 de agosto de 2021

Aprende Vim jugando con vim-adventures


Vim es un excelente editor pero pero no es tan fácil de aprender, por lo menos para mi. En fin si existiera un juegito en el cual podamos aprender jugando a utilizar Vim. Bueno este juego existe y se llama:  

https://vim-adventures.com/

:x

sábado, 7 de agosto de 2021

Primeros pasos con Apache Kafka parte 6




Ya sea que usemos Kafka como cola, bus de mensajes o plataforma de almacenamiento de datos, siempre un productor escribe datos en Kafka, un consumidor lee datos de Kafka o una aplicación cumple ambos roles.

Por ejemplo, en un sistema de procesamiento de transacciones de tarjetas de crédito, habrá una aplicación de cliente, tal vez una tienda en línea, responsable de enviar cada transacción a Kafka inmediatamente cuando se realiza un pago. Otra aplicación es responsable de verificar inmediatamente esta transacción con un motor de reglas y determinar si la transacción se aprueba o se niega. La respuesta de aprobación / denegación se puede volver a escribir a Kafka y la respuesta se puede propagar a la tienda en línea donde se inició la transacción. Una tercera aplicación puede leer tanto las transacciones como el estado de aprobación de Kafka y almacenarlos en una base de datos donde los analistas pueden luego revisar las decisiones y quizás mejorar el motor de reglas.

Apache Kafka se envía con API de cliente integradas que los desarrolladores pueden usar cuando desarrollan aplicaciones que interactúan con Kafka.

Hay muchas razones por las que una aplicación puede necesitar escribir mensajes en Kafka: registrar las actividades del usuario para auditoría o análisis, registrar métricas, almacenar mensajes de registro, registrar información de dispositivos inteligentes, comunicarse de forma asíncrona con otras aplicaciones, almacenar información en búfer antes de escribir en una base de datos y mucho más.

Esos diversos casos de uso también implican diversos requisitos: ¿cada mensaje es crítico o podemos tolerar la pérdida de mensajes? ¿Estamos de acuerdo con la duplicación accidental de mensajes? ¿Existe algún requisito estricto de latencia o rendimiento que debamos cumplir?

En el ejemplo de procesamiento de transacciones con tarjeta de crédito que presentamos anteriormente, podemos ver que es fundamental no perder un solo mensaje ni duplicar ningún mensaje. La latencia debe ser baja, pero se pueden tolerar latencias de hasta 500 ms y el rendimiento debe ser muy alto: esperamos procesar hasta un millón de mensajes por segundo.

Un caso de uso diferente podría ser almacenar información de clics de un sitio web. En ese caso, se puede tolerar la pérdida de algunos mensajes o algunos duplicados; La latencia puede ser alta siempre que no afecte a la experiencia del usuario. En otras palabras, no nos importa si el mensaje tarda unos segundos en llegar a Kafka, siempre que la página siguiente se cargue inmediatamente después de que el usuario haga clic en un enlace. El rendimiento dependerá del nivel de actividad que preveamos en nuestro sitio web.

Los diferentes requisitos influirán en la forma en que utilice la API del productor para escribir mensajes en Kafka y la configuración que utilice.


Si bien las API del productor son muy simples, hay un poco más que sucede bajo el capó del productor cuando enviamos datos. 

Comenzamos a producir mensajes a Kafka creando un ProducerRecord, que debe incluir el tema al que queremos enviar el registro y un valor. Opcionalmente, también podemos especificar una clave y / o una partición. Una vez que enviamos el ProducerRecord, lo primero que hará el productor es serializar la clave y los objetos de valor en ByteArrays para que puedan enviarse a través de la red.

A continuación, los datos se envían a un particionador. Si especificamos una partición en ProducerRecord, el particionador no hace nada y simplemente devuelve la partición que especificamos. Si no lo hicimos, el particionador elegirá una partición por nosotros, generalmente basada en la clave ProducerRecord. Una vez que se selecciona una partición, el productor sabe a qué tema y partición irá el registro. Luego agrega el registro a un lote de registros que también se enviarán al mismo tema y partición. Un hilo independiente es responsable de enviar esos lotes de registros a los brokers de Kafka correspondientes.

Cuando el broker recibe los mensajes, envía una respuesta. Si los mensajes se escribieron correctamente en Kafka, devolverá un objeto RecordMetadata con el tema, la partición y el desplazamiento del registro dentro de la partición. Si el corredor no pudo escribir los mensajes, devolverá un error. Cuando el productor recibe un error, puede volver a intentar enviar el mensaje unas cuantas veces más antes de darse por vencido y devolver un error.

viernes, 6 de agosto de 2021

Resultados de la encuesta de stack overflow para 2021


Como todos los años tenemos los resultados de la encuesta que hace stack overflow a miles de desarrolladores. Es te año sin muchas sorpresas. 

Como dato que podria observar Rust es el lenguaje más amado y Javascript el más utilizado.  

Otra cosa que me llamo mucho la atención es que Svelte, este como framework front más amado, ya me tengo que poner a estudiar Svelte.


Dejo link : https://insights.stackoverflow.com/survey/2021

jueves, 5 de agosto de 2021

Primeros pasos con Apache Kafka parte 5

 

Seguimos con Kafka.

A medida que crecen las implementaciones de Kafka, a menudo resulta ventajoso tener varios clústeres. Hay varias razones por las que esto puede resultar útil:

  • Segregación de tipos de datos
  • Aislamiento por requisitos de seguridad
  • Varios centros de datos (recuperación ante desastres)

Cuando se trabaja con varios centros de datos en particular, a menudo se requiere que los mensajes se copien entre ellos. De esta manera, las aplicaciones en línea pueden tener acceso a la actividad del usuario en ambos sitios. Por ejemplo, si un usuario cambia información pública en su perfil, ese cambio deberá ser visible independientemente del centro de datos en el que se muestren los resultados de la búsqueda. O bien, los datos de monitoreo se pueden recopilar de muchos sitios en una sola ubicación central donde se alojan los sistemas de análisis y alerta. Los mecanismos de replicación dentro de los clústeres de Kafka están diseñados solo para funcionar dentro de un único clúster, no entre varios clústeres.

El proyecto Kafka incluye una herramienta llamada MirrorMaker, utilizada para este propósito. En esencia, MirrorMaker es simplemente un consumidor y un productor de Kafka, vinculado con una cola. Los mensajes se consumen de un clúster de Kafka y se generan para otro.