Translate

Mostrando las entradas con la etiqueta Apache Kafka. Mostrar todas las entradas
Mostrando las entradas con la etiqueta Apache Kafka. Mostrar todas las entradas

sábado, 29 de octubre de 2022

Apache Kafka 3.3 reemplaza a ZooKeeper con el nuevo protocolo KRaft


Tal vez lo más molesto de instalar y levantar kafka para un desarrollador, es instalar y levantar zookeeper. No porque sea difícil, sino que no se entiende mucho la utilidad en entornos de desarrollo. 

Apache Software Foundation ha lanzado Apache Kafka 3.3.1 con muchas funciones y mejoras nuevas. En particular, esta es la primera versión que marca el protocolo KRaft (Kafka Raft) como listo para la producción. 

KRaft es el protocolo de consenso desarrollado para permitir la gestión de metadatos directamente en Apache Kafka. Esto simplifica enormemente la arquitectura de Kafka al consolidar la responsabilidad de los metadatos en Kafka mismo sin el requisito de una herramienta de terceros como Apache ZooKeeper. Este nuevo modo KRaft mejora la escalabilidad y la resiliencia de la partición al tiempo que simplifica las implementaciones de Apache Kafka que ahora se pueden implementar de forma independiente.

Dejo link: https://www.mail-archive.com/announce@apache.org/msg07620.html

viernes, 26 de noviembre de 2021

Primeros pasos con Apache Kafka parte 21

Seguimos con Kafka

Hasta ahora nos hemos centrado en aprender la API del consumidor, pero solo hemos analizado algunas de las propiedades de configuración, solo los obligatorios bootstrap.servers, group.id, key.deserializer y value.deserializer. Toda la configuración del consumidor está documentada en la documentación de Apache Kafka. La mayoría de los parámetros tienen una razón pueden tener valores predeterminados y no requieren modificación, pero algunos tienen implicaciones en el rendimiento y la disponibilidad de los consumidores. Echemos un vistazo a algunas de las propiedades más importantes.

fetch.min.bytes

Esta propiedad permite que un consumidor especifique la cantidad mínima de datos que desea recibir del corredor al buscar registros. Si un corredor recibe una solicitud de registros de un consumidor, pero los nuevos registros suman menos bytes que min.fetch.bytes, el corredor esperará hasta que haya más mensajes disponibles antes de devolver los registros al consumidor. Esto reduce la carga tanto para el consumidor como para el corredor, ya que tienen que manejar menos mensajes de ida y vuelta en los casos en que los temas no tienen mucha actividad nueva (o para horas de menor actividad del día). Deberá establecer este parámetro más alto que el predeterminado si el consumidor está usando demasiada CPU cuando no hay muchos datos disponibles, o reducir la carga de los intermediarios cuando tiene una gran cantidad de consumidores.

fetch.max.wait.ms

Al establecer fetch.min.bytes, le indica a Kafka que espere hasta que tenga suficientes datos para enviar antes de responder al consumidor. fetch.max.wait.ms te permite controlar cuánto tiempo esperar. De forma predeterminada, Kafka esperará hasta 500 ms. Esto da como resultado hasta 500 ms de latencia adicional en caso de que no haya suficientes datos fluyendo al tema de Kafka para satisfacer la cantidad mínima de datos para devolver. Si desea limitar la latencia potencial (generalmente debido a que los SLA controlan la latencia máxima de la aplicación), puede establecer fetch.max.wait.ms en un valor más bajo. Si establece fetch.max.wait.ms en 100 ms y fetch.min.bytes en 1 MB, Kafka recibirá una solicitud de recuperación del consumidor y responderá con datos cuando tenga 1 MB de datos para devolver o después de 100 ms, lo que ocurra primero.

max.partition.fetch.bytes

Esta propiedad controla el número máximo de bytes que el servidor devolverá por partición. El valor predeterminado es 1 MB, lo que significa que cuando KafkaConsumer.poll () devuelve ConsumerRecords, el objeto de registro utilizará como máximo un máximo de partición.fetch.bytes por partición asignada al consumidor. Entonces, si un tema tiene 20 particiones y tiene 5 consumidores, cada consumidor deberá tener 4 MB de memoria disponible para Consumer Records. En la práctica, querrá asignar más memoria ya que cada consumidor necesitará manejar más particiones si otros consumidores del grupo fallan. máx. partition.fetch.bytes debe ser más grande que el mensaje más grande que aceptará un corredor (determinado por la propiedad max.message.size en la configuración del corredor), o el corredor puede tener mensajes que el consumidor no podrá consumir, en los caso que el consumidor se cuelgue tratando de leerlos. Otra consideración importante al establecer max.partition.fetch.bytes es la cantidad de tiempo que le toma al consumidor procesar los datos. Como recordará, el consumidor debe llamar a poll () con la frecuencia suficiente para evitar el tiempo de espera de la sesión y el reequilibrio posterior. Si la cantidad de datos que devuelve un solo poll () es muy grande, el consumidor puede tardar más en procesar, lo que significa que no llegará a la siguiente iteración del ciclo de sondeo a tiempo para evitar un tiempo de espera de sesión. Si esto ocurre, las dos opciones son reducir el máx. partición.fetch.bytes o para aumentar el tiempo de espera de la sesión.

session.timeout.ms

La cantidad de tiempo que un consumidor puede estar fuera de contacto con los corredores mientras aún se considera vivo es de 3 segundos. Si pasa más de session.timeout.ms sin que el consumidor envíe un latido al coordinador del grupo, se considera muerto y el coordinador del grupo activará un reequilibrio del grupo de consumidores para asignar particiones del consumidor muerto a los otros consumidores del grupo. . Esta propiedad está estrechamente relacionada con heartbeat.interval.ms. heartbeat.interval.ms controla la frecuencia con la que el método poll () de KafkaConsumer enviará un latido al coordinador del grupo, mientras que session.timeout.ms controla cuánto tiempo puede pasar un consumidor sin enviar un latido. Por lo tanto, esas dos propiedades generalmente se modifican juntas: heatbeat.interval.ms debe ser menor que session.timeout.ms, y generalmente se establece en un tercio del valor de tiempo de espera. Entonces, si session.timeout.ms es de 3 segundos, heartbeat.interval.ms debería ser de 1 segundo. Establecer session.timeout.ms por debajo del valor predeterminado permitirá a los grupos de consumidores detectar y recuperarse de fallas antes, pero también puede causar reequilibrios no deseados como resultado de que los consumidores tarden más en completar el ciclo de sondeo o la recolección de basura. Configurar session.timeout.ms más alto reducirá la posibilidad de un reequilibrio accidental, pero también significa que tomará más tiempo detectar una falla real.

auto.offset.reset

Esta propiedad controla el comportamiento del consumidor cuando comienza a leer una partición para la cual no tiene una compensación comprometida o si la compensación comprometida que tiene no es válida (generalmente porque el consumidor estuvo inactivo durante tanto tiempo que el registro con esa compensación ya estaba obsoleto en el corredor). El valor predeterminado es "más reciente", lo que significa que, sin una compensación válida, el consumidor comenzará a leer los registros más recientes (registros que se escribieron después de que el consumidor comenzó a ejecutar). La alternativa es "la más temprana", lo que significa que, sin un desplazamiento válido, el consumidor leerá todos los datos de la partición, comenzando desde el principio.

enable.auto.commit

Este parámetro controla si el consumidor confirmará las compensaciones automáticamente y su valor predeterminado es verdadero. Configúrelo en falso si prefiere controlar cuándo se confirman las compensaciones, lo cual es necesario para minimizar los duplicados y evitar la pérdida de datos. Si establece enable.auto.commit en true, es posible que también desee controlar la frecuencia con la que se confirmarán las compensaciones mediante auto.commit.interval.ms.

division.assignment.strategy

Aprendimos que las particiones se asignan a los consumidores en un grupo de consumidores. Un PartitionAssignor es una clase que, dados los consumidores y los temas a los que se suscribieron, decide qué particiones se asignarán a qué consumidor. De forma predeterminada, Kafka tiene dos estrategias de asignación:

  • Range: Asigna a cada consumidor un subconjunto consecutivo de particiones de cada tema al que se suscribe. Entonces, si los consumidores C1 y C2 están suscritos a dos temas, T1 y T2, y cada uno de los temas tiene tres particiones, luego a C1 se le asignarán las particiones 0 y 1 de los temas T1 y T2, mientras que a C2 se le asignará la partición 2 de esos temas. Debido a que cada tema tiene un número impar de particiones y la asignación se realiza para cada tema de forma independiente, el primer consumidor termina con más particiones que el segundo. Esto sucede siempre que se utiliza la asignación de rango y el número de consumidores no divide el número de particiones en cada tema de forma ordenada.
  • RoundRobin: Toma todas las particiones de todos los temas suscritos y las asigna a los consumidores de forma secuencial, una por una. Si C1 y C2 se describieron anteriormente, usaron RoundRobin asignación, C1 tendría las particiones 0 y 2 del tema T1 y la partición 1 del tema T2. C2 tendría la partición 1 del tema T1 y las particiones 0 y 2 del tema T2. En general, si todos los consumidores están suscritos a los mismos temas (un escenario muy común), la asignación de RoundRobin terminará con todos los consumidores con el mismo número de particiones (o como máximo una diferencia de partición). 
division.assignment.strategy permite elegir una estrategia de asignación de partición. El valor predeterminado es org.apache.kafka.clients.consumer.RangeAssignor, que implementa la estrategia de rango descrita anteriormente. Puede reemplazarlo con org.apache.kafka.clients.consumer.RoundRobinAssignor. Una opción más avanzada es implementar su propia estrategia de asignación, en cuyo caso partition.assignment.strategy debe apuntar al nombre de su clase.

client.id

Esta puede ser cualquier cadena y será utilizada por los intermediarios para identificar los mensajes enviados desde el cliente. Se utiliza en registros y métricas, y para cuotas.

registros.max.poll.

Esto controla el número máximo de registros que devolverá una sola llamada a poll (). Esto es útil para ayudar a controlar la cantidad de datos que su aplicación necesitará procesar en el ciclo de sondeo. 

receive.buffer.bytes y send.buffer.bytes

Estos son los tamaños de los búferes de envío y recepción de TCP que utilizan los sockets al escribir y leer datos. Si se establecen en -1, se utilizarán los valores predeterminados del sistema operativo. Puede ser una buena idea aumentarlos cuando los productores o consumidores se comunican con intermediarios en un centro de datos diferente, porque esos enlaces de red suelen tener mayor latencia y menor ancho de banda.

viernes, 19 de noviembre de 2021

Haciendo microservicios con Spring Cloud

 

El equipo de Spring ha integrado una gran cantidad de proyectos open source usados en el mercado en un subproyecto de Spring conocido colectivamente como Spring Cloud. 

Spring Cloud envuelve el trabajo de empresas de código abierto como Pivotal, HashiCorp y Netflix en la implementaciones de patrones para trabajar con microservicios orientados a la nube. Spring Cloud simplifica la instalación y configuración de estos proyectos en aplicaciones Spring para que el desarrollador pueda concentrarse en escribir código, sin quedar enterrado en los detalles de la configuración de toda la infraestructura necesaria para construir e implementar una aplicación de microservicio.

Veamos un gráfico que muestra las implementaciones a patrones de microservicios con spring cloud : 



Empecemos por el principio, todo estos framework se apoyan en spring boot. Spring Boot es la tecnología central utilizada en la implementación de microservicios. Spring Boot simplifica enormemente el desarrollo de microservicios al simplificar las tareas principales de crear microservicios basados en REST. Spring Boot también simplifica enormemente la asignación de verbos de estilo HTTP (GET, PUT, POST y DELETE) a URL y la serialización del protocolo JSON hacia y desde objetos Java, así como la asignación de excepciones de Java a códigos de error HTTP estándar.

Spring Cloud Config maneja la administración de los datos de configuración de la aplicación a través de un servicio centralizado para que los datos de configuración de la aplicación (particularmente los datos de configuración específicos de su entorno) estén claramente separados de su microservicio implementado. Esto asegura que no importa cuántas instancias de microservicio aparezcan, siempre tendrán la misma configuración. Spring Cloud Config tiene su propio repositorio de administración de propiedades, pero también se integra con proyectos de código abierto como los siguientes:

  • Git: Git (https://git-scm.com/) es un sistema de control de versiones de código abierto que le permite administrar y realizar un seguimiento de los cambios en cualquier tipo de archivo de texto. Spring Cloud Config puede integrarse con un repositorio respaldado por Git y leer los datos de configuración de la aplicación fuera del repositorio.
  • Consul — Consul (https://www.consul.io/) es una herramienta de descubrimiento de servicios de código abierto que permite que las instancias de servicio se registren en el servicio. Los clientes del servicio pueden preguntarle a Consul dónde se encuentran las instancias del servicio. Consul también incluye una base de datos basada en el almacén de valores clave que Spring Cloud Config puede utilizar para almacenar datos de configuración de la aplicación.
  • Eureka — Eureka (https://github.com/Netflix/eureka) es un proyecto de Netflix de código abierto que, como Consul, ofrece capacidades de descubrimiento de servicios similares. Eureka también tiene una base de datos de valores clave que se puede usar con Spring Cloud Config.

Con el descubrimiento del servicio Spring Cloud, puede abstraer la ubicación física (IP y / o nombre del servidor) de donde se implementan sus servidores de los clientes que consumen el servicio. Los consumidores de servicios invocan la lógica empresarial de los servidores a través de un nombre lógico en lugar de una ubicación física. El descubrimiento de servicios de Spring Cloud también maneja el registro y la cancelación del registro de las instancias de servicios a medida que se inician y apagan. El descubrimiento de servicios de Spring Cloud se puede implementar utilizando Consul (https://www.consul.io/) y Eureka (https://github.com/Netflix/eureka) como su motor de descubrimiento de servicios.

Spring Cloud se integra en gran medida con los proyectos de código abierto de Netflix. Para los patrones de resiliencia del cliente de microservicio, Spring Cloud envuelve las bibliotecas de Netflix Hystrix (https://github.com/Netflix/Hystrix) y el proyecto Ribbon (https://github.com/Netflix/Ribbon) y hace que sean fácil de usar. 

Con las bibliotecas de Netflix Hystrix, puede implementar rápidamente patrones de resistencia del cliente de servicio, como los patrones Circuit breakers y Bulkhead.

Si bien el proyecto Netflix Ribbon simplifica la integración con agentes de descubrimiento de servicios como Eureka, también proporciona equilibrio de carga del lado del cliente de las llamadas de servicio de un consumidor de servicios. Esto hace posible que un cliente continúe realizando llamadas de servicio incluso si el agente de descubrimiento de servicios no está disponible temporalmente.

Spring Cloud utiliza el proyecto Netflix Zuul (https://github.com/Netflix/zuul) para proporcionar capacidades de enrutamiento de servicios para su aplicación de microservicio. Zuul es una puerta de enlace de servicios que procesa las solicitudes de servicio y se asegura de que todas las llamadas a sus microservicios pasen por una única "puerta de entrada" antes de que se invoque el servicio de destino. Con esta centralización de llamadas de servicio, puede aplicar políticas de servicio estándar, como autenticación de autorización de seguridad, filtrado de contenido y reglas de enrutamiento.

Spring Cloud Stream (https://cloud.spring.io/spring-cloud-stream/) es una tecnología habilitadora que le permite integrar fácilmente el procesamiento de mensajes livianos en su microservicio. Con Spring Cloud Stream, puede crear microservicios inteligentes que pueden usar eventos asincrónicos a medida que ocurren en su aplicación. Con Spring Cloud Stream, puede integrar rápidamente sus microservicios con agentes de mensajes como RabbitMQ (https://www.rabbitmq.com/) y Kafka (http://kafka.apache.org/).

Spring Cloud Sleuth (https://cloud.spring.io/spring-cloud-sleuth/) le permite integrar identificadores de seguimiento únicos en las llamadas HTTP y los canales de mensajes (RabbitMQ, Apache Kafka) que se utilizan dentro de su aplicación. Estos números de seguimiento, a veces denominados identificadores de correlación o seguimiento, le permiten realizar un seguimiento de una transacción a medida que fluye a través de los diferentes servicios de su aplicación. Con Spring Cloud Sleuth, estos ID de seguimiento se agregan automáticamente a cualquier declaración de registro que realice en su microservicio.

La verdadera belleza de Spring Cloud Sleuth se ve cuando se combina con herramientas de tecnología de agregación de registros como Papertrail (http://papertrailapp.com) y herramientas de rastreo como Zipkin (http://zipkin.io). Papertail es una plataforma de registro basada en la nube que se utiliza para agregar registros en tiempo real de diferentes microservicios en una base de datos consultable. Open Zipkin toma datos producidos por Spring Cloud Sleuth y le permite visualizar el flujo de sus llamadas de servicio involucradas para una sola transacción.

Spring Cloud Security (https://cloud.spring.io/spring-cloud-security/) es un marco de autenticación y autorización que puede controlar quién puede acceder a sus servicios y qué pueden hacer con sus servicios. Spring Cloud Security se basa en tokens y permite que los servicios se comuniquen entre sí a través de un token emitido por un servidor de autenticación. Cada servicio que recibe una llamada puede verificar el token proporcionado en la llamada HTTP para validar la identidad del usuario y sus derechos de acceso al servicio. Además, Spring Cloud Security es compatible con JavaScript Web Token (https://jwt.io). JavaScript Web Token (JWT) estandariza el formato de cómo se crea un token OAuth2 y proporciona estándares para firmar digitalmente un token creado.

Dejo link : http://projects.spring.io/spring-cloud/

miércoles, 10 de noviembre de 2021

Primeros pasos con Apache Kafka parte 20



Seguimos con Kafka

En el corazón de la API del consumidor hay un bucle simple para sondear el servidor en busca de datos.

Una vez que el consumidor se suscribe a los temas, el ciclo de sondeo maneja todos los detalles de coordinación, reequilibrio de particiones, pings y obtención de datos, dejando al desarrollador con una API limpia que simplemente que devuelve los datos disponibles de las particiones asignadas. Veamos un ejemplo :

try {

    while (true) {

        ConsumerRecords<String, String> records = consumer.poll(100);

        for (ConsumerRecord<String, String> record : records) {

            log.debug("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",

                record.topic(), record.partition(), record.offset(), record.key(), record.value());

            int updatedCount = 1;

            if (custCountryMap.countainsValue(record.value())) {

                updatedCount = custCountryMap.get(record.value()) + 1;

            }

            custCountryMap.put(record.value(), updatedCount)

            JSONObject json = new JSONObject(custCountryMap);

            System.out.println(json.toString(4))

        }

    }

} finally {

    consumer.close();

}

Este es un bucle infinito. Los consumidores suelen ser aplicaciones de larga duración que sondean continuamente a Kafka en busca de más datos. 

ConsumerRecords<String, String> records = consumer.poll(100);  en esta linea se puede ver que los consumidores deben seguir sondeando a Kafka o serán considerados muertos y las particiones que están consumiendo serán entregadas a otro consumidor del grupo para que continúe consumiendo. El parámetro que pasamos, poll (), es un intervalo de tiempo de espera y controla cuánto tiempo se bloqueará poll () si los datos no están disponibles en el búfer del consumidor. Si se establece en 0, poll () regresará inmediatamente; de lo contrario, esperará el número especificado de milisegundos para que lleguen los datos del intermediario.

poll () devuelve una lista de registros. Cada registro contiene el tema y la partición de la que proviene el registro, el desplazamiento del registro dentro de la partición y, por supuesto, la clave y el valor del registro. Por lo general, queremos iterar sobre la lista y procesar los registros individualmente. El método poll () toma un parámetro de tiempo de espera. Esto especifica cuánto tiempo tardará la encuesta en regresar, con o sin datos. El valor suele depender de las necesidades de la aplicación para obtener respuestas rápidas: ¿qué tan rápido desea devolver el control al subproceso que realiza el sondeo?

El procesamiento generalmente termina escribiendo un resultado en un almacén de datos o actualizando un registro almacenado. Aquí, el objetivo es mantener un recuento continuo de clientes de cada condado, por lo que actualizamos una tabla hash e imprimimos el resultado como JSON. Un ejemplo más realista almacenaría el resultado de las actualizaciones en un almacén de datos.

Siempre se debe cerrar la conexión del consumidor antes de salir. Esto cerrará las conexiones de red y los slots. También activará un reequilibrio de inmediato en lugar de esperar a que el coordinador del grupo descubra que el consumidor dejó de enviar pings y es probable que esté muerto, lo que llevará más tiempo y, por lo tanto, dará como resultado un período de tiempo más largo en el que los consumidores no pueden consumir mensajes de un subconjunto de las particiones.

El ciclo de la encuesta hace mucho más que simplemente obtener datos. La primera vez que llama a poll () con un nuevo consumidor, es responsable de encontrar el GroupCoordinator, unirse al grupo de consumidores y recibir una asignación de partición. Si se activa un reequilibrio, también se manejará dentro del ciclo de sondeo. Y, por supuesto, los latidos que mantienen vivos a los consumidores se envían desde dentro del ciclo de la encuesta. Por esta razón, intentamos asegurarnos de que cualquier procesamiento que hagamos entre iteraciones sea rápido y eficiente.

Primeros pasos con Apache Kafka parte 19



Seguimos con Kafka

Una vez que creamos un consumidor, el siguiente paso es suscribirse a uno o más temas. El método subcribe () toma una lista de temas como parámetro, por lo que es bastante simple de usar:

consumer.subscribe(Collections.singletonList("customerCountries"));

También es posible llamar a subscribe con una expresión regular. La expresión puede coincidir con varios nombres de temas, y si alguien crea un nuevo tema con un nombre que coincida, se producirá un reequilibrio casi de inmediato y los consumidores comenzarán a consumir desde el nuevo tema. Esto es útil para aplicaciones que necesitan consumir de múltiples temas y pueden manejar los diferentes tipos de datos que contendrán los temas.

La suscripción a varios temas mediante una expresión regular se usa con mayor frecuencia en aplicaciones que replican datos entre Kafka y otro sistema.

Para suscribirse a todos los temas de prueba, podemos llamar a:

consumer.subscribe("test.*");


sábado, 30 de octubre de 2021

Primeros pasos con Apache Kafka parte 18


Seguimos con Kafka

El primer paso para comenzar a consumir registros es crear una instancia de KafkaConsumer. Crear un KafkaConsumer es muy similar a crear un KafkaProducer: crea una instancia de propiedades de Java con las propiedades que desea pasar al consumidor. Para comenzar, solo necesitamos usar las tres propiedades obligatorias: bootstrap.servers, key.deserializer y value.deserializer.

La primera propiedad, bootstrap.servers, es la cadena de conexión a un clúster de Kafka. Se usa exactamente de la misma manera que en KafkaProducer. Las otras dos propiedades, key.deserializer y value.deserializer, son similares a los serializadores definidos para el productor, pero en lugar de especificar clases que conviertan los objetos Java en matrices de bytes, debe especificar clases que puedan tomar una matriz de bytes y convertirla en un objeto Java.

Hay una cuarta propiedad, que no es estrictamente obligatoria, pero por ahora pretendemos que lo es. La propiedad es group.id y especifica el grupo de consumidores al que pertenece la instancia de KafkaConsumer. Si bien es posible crear consumidores que no pertenecen a ningún grupo de consumidores, esto es poco común, por lo que durante la mayor parte del capítulo asumiremos que el consumidor es parte de un grupo.

El siguiente fragmento de código muestra cómo crear un KafkaConsumer:

Properties props = new Properties();

props.put("bootstrap.servers", "broker1:9092,broker2:9092");

props.put("group.id", "CountryCounter");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

La mayor parte de lo que ve aquí debería resultarle familiar sobre la creación de productores. Suponemos que los registros que consumimos tendrán objetos String como clave y valor del registro. La única propiedad nueva aquí es group.id, que es el nombre del grupo de consumidores al que pertenece este consumidor.

domingo, 24 de octubre de 2021

Primeros pasos con Apache Kafka parte 17


Seguimos con Kafka

Los consumidores de un grupo de consumidores comparten las propiedades de las particiones en los temas a los que se suscriben. Cuando agregamos un nuevo consumidor al grupo, comienza a consumir mensajes de particiones previamente consumidas por otro consumidor. Lo mismo sucede cuando un consumidor se apaga o se bloquea; abandona el grupo, y las particiones que solía consumir serán consumidas por uno de los consumidores restantes. La reasignación de particiones a los consumidores también ocurre cuando los temas que consume el grupo de consumidores se modifican (por ejemplo, si un administrador agrega nuevas particiones). Mover la propiedad de una partición de un consumidor a otro se denomina reequilibrio.

Los reequilibrios son importantes porque brindan al grupo de consumidores una alta disponibilidad y escalabilidad (lo que nos permite agregar y eliminar consumidores de manera fácil y segura), pero en el curso normal de los eventos son bastante indeseables. Durante un reequilibrio, los consumidores no pueden consumir mensajes, por lo que un reequilibrio es básicamente una breve ventana de indisponibilidad de todo el grupo de consumidores. Además, cuando las particiones se mueven de un consumidor a otro, el consumidor pierde su estado actual; si estaba almacenando en caché algún dato, deberá actualizar sus cachés, lo que ralentizará la aplicación hasta que el consumidor configure su estado nuevamente. 

La forma en que los consumidores mantienen la membresía en un grupo de consumidores y la propiedad de las particiones que se les asignan es enviando latidos a un corredor de Kafka designado como coordinador del grupo (este corredor puede ser diferente para diferentes grupos de consumidores). Siempre que el consumidor envíe latidos a intervalos regulares, se asume que está vivo, bien y procesando mensajes de sus particiones. Los latidos se envían cuando el consumidor realiza una encuesta (es decir, recupera registros) y cuando confirma los registros que ha consumido.

Si el consumidor deja de enviar latidos durante el tiempo suficiente, su sesión expirará y el coordinador del grupo la considerará muerta y provocará un reequilibrio. Si un consumidor falla y deja de procesar mensajes, el coordinador del grupo tardará unos segundos sin latidos en decidir que está muerto y activar el reequilibrio. Durante esos segundos, no se procesarán mensajes de las particiones propiedad del consumidor muerto. Al cerrar un consumidor limpiamente, el consumidor notificará al coordinador del grupo que se va, y el coordinador del grupo activará un reequilibrio de inmediato, reduciendo la brecha en el procesamiento. 

Con las versiones más recientes de Kafka, puede configurar cuánto tiempo puede pasar la aplicación sin sondear antes de que abandone el grupo y active un reequilibrio. Esta configuración se utiliza para evitar un bloqueo en vivo, en el que la aplicación no se bloqueó pero no logró avanzar por alguna razón. Esta configuración es independiente de session.time out.ms, que controla el tiempo que se tarda en detectar un bloqueo del consumidor y dejar de enviar latidos.


sábado, 16 de octubre de 2021

Primeros pasos con Apache Kafka parte 16

Seguimos con Kafka. 

Existe la necesidad de escalar el consumo por temas. Al igual que varios productores pueden escribir sobre el mismo tema, debemos permitir que varios consumidores lean el mismo tema, dividiendo los datos entre ellos.

Los consumidores de Kafka suelen formar parte de un grupo de consumidores. Cuando varios consumidores están suscritos a un tema y pertenecen al mismo grupo de consumidores, cada consumidor del grupo recibirá mensajes de un subconjunto diferente de las particiones del tema.

Tomemos el tema T1 con cuatro particiones. Ahora suponga que creamos un nuevo consumidor, C1, que es el único consumidor del grupo G1, y lo usamos para suscribirse al tema T1. El consumidor C1 recibirá todos los mensajes de las cuatro particiones t1.

Si agregamos otro consumidor, C2, al grupo G1, cada consumidor solo recibirá mensajes de dos particiones. Quizás los mensajes de la partición 0 y 2 van a C1 y los mensajes de las particiones 1 y 3 van al consumidor C2.

Si G1 tiene cuatro consumidores, cada uno leerá los mensajes de una sola partición.

Si agregamos más consumidores a un solo grupo con un solo tema que las particiones que tenemos, algunos de los consumidores estarán inactivos y no recibirán ningún mensaje.

La principal forma en que escalamos el consumo de datos de un tema de Kafka es agregando más consumidores a un grupo de consumidores. Es común que los consumidores de Kafka realicen operaciones de alta latencia, como escribir en una base de datos o un cálculo lento de los datos. En estos casos, es posible que un solo consumidor no pueda mantenerse al día con los flujos de datos de velocidad en un tema, y agregar más consumidores que compartan la carga al hacer que cada consumidor posea solo un subconjunto de las particiones y los mensajes es nuestro método principal de escalado. Esta es una buena razón para crear temas con una gran cantidad de particiones: permite agregar más consumidores cuando aumenta la carga. Tenga en cuenta que no tiene sentido agregar más consumidores de los que tiene particiones en un tema; algunos de los consumidores simplemente estarán inactivos. 

Además de agregar consumidores para escalar una sola aplicación, es muy común tener múltiples aplicaciones que necesitan leer datos del mismo tema. De hecho, uno de los principales objetivos de diseño en Kafka era hacer que los datos producidos para los temas de Kafka estuvieran disponibles para muchos casos de uso en toda la organización. En esos casos, queremos que cada aplicación obtenga todos los mensajes, en lugar de solo un subconjunto. Para asegurarse de que una aplicación reciba todos los mensajes de un tema, asegúrese de que la aplicación tenga su propio grupo de consumidores. A diferencia de muchos sistemas de mensajería tradicionales, Kafka se adapta a una gran cantidad de consumidores y grupos de consumidores sin reducir el rendimiento. En el ejemplo anterior, si agregamos un nuevo grupo de consumidores G2 con un solo consumidor, este consumidor obtendrá todos los mensajes del tema T1 independientemente de lo que esté haciendo G1. G2 puede tener más de un consumidor, en cuyo caso cada uno obtendrá un subconjunto de particiones, tal como mostramos para G1, pero G2 en su conjunto seguirá recibiendo todos los mensajes independientemente de otros grupos de consumidores.

Para resumir, crea un nuevo grupo de consumidores para cada aplicación que necesita todos los mensajes de uno o más temas. Agrega consumidores a un grupo de consumidores existente para escalar la lectura y el procesamiento de mensajes de los temas, por lo que cada consumidor adicional en un grupo solo obtendrá un subconjunto de los mensajes.


miércoles, 13 de octubre de 2021

Primeros pasos con Apache Kafka parte 15


Hasta ahora, hemos discutido las características del particionador predeterminado, que es el que se usa con más frecuencia. Sin embargo, Kafka no lo limita a particiones hash y, a veces, existen buenas razones para particionar los datos de manera diferente. Por ejemplo, suponga que es un proveedor B2B y su mayor cliente es una empresa que fabrica dispositivos portátiles llamados Bananas. Suponga que hace tantos negocios con el cliente "Banana" que más del 10% de sus transacciones diarias son con este cliente. Si usa la partición de hash predeterminada, los registros de Banana se asignarán a la misma partición que otras cuentas, lo que dará como resultado que una partición sea aproximadamente el doble de grande que el resto. Esto puede hacer que los servidores se queden sin espacio, que el procesamiento se ralentice, etc. Lo que realmente queremos es darle a Banana su propia partición y luego usar particiones hash para asignar el resto de las cuentas a las particiones.

A continuación, se muestra un ejemplo de un particionador personalizado:

import org.apache.kafka.clients.producer.Partitioner;

import org.apache.kafka.common.Cluster;

import org.apache.kafka.common.PartitionInfo;

import org.apache.kafka.common.record.InvalidRecordException;

import org.apache.kafka.common.utils.Utils;


public class BananaPartitioner implements Partitioner {


public void configure(Map<String, ?> configs) {}

public int partition(String topic, Object key, byte[] keyBytes,

        Object value, byte[] valueBytes,

        Cluster cluster) {

    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

    int numPartitions = partitions.size();

    if ((keyBytes == null) || (!(key instanceOf String))) 

        throw new InvalidRecordException("We expect all messages to have customer name as key")

    if (((String) key).equals("Banana"))

        return numPartitions; // Banana siempre va estar en la ultima partición

    return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))

}

public void close() {}

}

lunes, 4 de octubre de 2021

Primeros pasos con Apache Kafka parte 14


 Seguimos con Kafka. 

Los objetos ProducerRecord incluyen un nombre de tema, una clave y un valor. Los mensajes de Kafka son pares clave-valor y, si bien es posible crear un ProducerRecord con solo un tema y un valor, con la clave establecida en nula de forma predeterminada, la mayoría de las aplicaciones producen registros con claves. Las claves sirven para dos objetivos: son información adicional que se almacena con el mensaje y también se utilizan para decidir en cuál de las particiones de tema se escribirá el mensaje. Todos los mensajes con la misma clave irán a la misma partición. Esto significa que si un proceso está leyendo solo un subconjunto de las particiones en un tema, todos los registros para una sola clave serán leídos por el mismo proceso. Para crear un registro de valor-clave, simplemente cree un ProducerRecord de la siguiente manera:

ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");

Al crear mensajes con una clave nula, simplemente puede omitir la clave:

ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "USA");


Aquí, la clave simplemente se establecerá en nula, lo que puede indicar que faltaba el nombre de un cliente en un formulario.

Cuando la clave es nula y se usa el particionador predeterminado, el registro se enviará a una de las particiones disponibles del tema al azar. Se utilizará un algoritmo de operación por turnos para equilibrar los mensajes entre las particiones.

Si existe una clave y se usa el particionador predeterminado, Kafka aplicará un hash a la clave (usando su propio algoritmo hash, por lo que los valores hash no cambiarán cuando se actualice Java) y usará el resultado para asignar el mensaje a una partición específica. Dado que es importante que una clave siempre se asigne a la misma partición, usamos todas las particiones del tema para calcular la asignación, no solo las particiones disponibles. Esto significa que si una partición específica no está disponible cuando escribe datos en ella, es posible que obtenga un error. Esto es bastante raro, como verá en el Capítulo 6 cuando analicemos la replicación y disponibilidad de Kafka.

El mapeo de claves a particiones es consistente solo mientras no cambie el número de particiones en un tema. Entonces, siempre que el número de particiones sea constante, puede estar seguro de que, por ejemplo, los registros relacionados con el usuario 045189 siempre se escribirán en la partición 34. Esto permite todo tipo de optimización al leer datos de particiones. Sin embargo, en el momento en que agrega nuevas particiones al tema, esto ya no está garantizado; los registros antiguos permanecerán en la partición 34, mientras que los registros nuevos se escribirán en una partición diferente. Cuando la partición de claves es importante, la solución más sencilla es crear temas con suficientes particiones y nunca agregar particiones.

domingo, 19 de septiembre de 2021

Primeros pasos con Apache Kafka parte 13



Seguimos con Kafka. 

Los archivos Avro deben almacenar el esquema completo en el archivo de datos que están asociados, almacenar el esquema completo en cada registro generalmente será más del doble del tamaño del registro. Para resolver este problema podemos utilizar un registro de esquemas. Los Schema Registry no forma parte de Apache Kafka, pero hay varias opciones de código abierto para elegir. Usaremos Confluent Schema Registry para este ejemplo. 

La idea es almacenar todos los esquemas utilizados luego, simplemente almacenamos el identificador del esquema en el registro que producimos en Kafka. Los consumidores pueden usar el identificador para extraer el registro del registro de esquema y deserializar los datos. 

A continuación, se muestra un ejemplo de cómo producir objetos Avro generados en Kafka:

Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");

props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");

props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");

props.put("schema.registry.url", schemaUrl);

String topic = "customerContacts";

int wait = 500;

Producer<String, Customer> producer = new KafkaProducer<String,Customer>(props);

while (true) {

    Customer customer = CustomerGenerator.getNext();

    System.out.println("Generated customer " + customer.toString());

    ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getId(), customer);

    producer.send(record);

}

Usamos KafkaAvroSerializer para serializar nuestros objetos con Avro. AvroSerializer también puede manejar primitivas, por lo que luego podemos usar String como clave de registro y nuestro objeto Customer como valor.

schema.registry.url es un nuevo parámetro. Esto simplemente apunta a dónde almacenamos los esquemas.

El cliente es nuestro objeto generado. Le decimos al productor que nuestros registros contendrán Cliente como valor.

También creamos una instancia de ProducerRecord con Customer como el tipo de valor y pasamos un objeto Customer al crear el nuevo registro.

Eso es todo. Enviamos el registro con nuestro objeto Cliente y KafkaAvroSerializer se encargará del resto.

¿Qué sucede si prefiere utilizar objetos Avro genéricos en lugar de los objetos Avro generados?
No hay problema. En este caso, solo necesita proporcionar el esquema:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", url);

String schemaString = "{\"namespace\": \"customerManagement.avro\",
\"type\": \"record\", " +
"\"name\": \"Customer\"," +
"\"fields\": [" +
"{\"name\": \"id\", \"type\": \"int\"}," +
"{\"name\": \"name\", \"type\": \"string\"}," +
"{\"name\": \"email\", \"type\": [\"null\",\"string
\"], \"default\":\"null\" }" +
"]}";

Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);

for (int nCustomers = 0; nCustomers < customers; nCustomers++) {
    String name = "exampleCustomer" + nCustomers;
    String email = "example " + nCustomers + "@example.com"
    GenericRecord customer = new GenericData.Record(schema);
    customer.put("id", nCustomer);
    customer.put("name", name);
    customer.put("email", email);
    ProducerRecord<String, GenericRecord> data = new ProducerRecord<String,
    GenericRecord>("customerContacts", name, customer);
    producer.send(data);
}

Seguimos usando el mismo KafkaAvroSerializer.

Y proporcionamos el URI del mismo registro de esquema.

Pero ahora también necesitamos proporcionar el esquema Avro, ya que no lo proporciona el objeto generado por Avro.

Nuestro tipo de objeto es un Avro GenericRecord, que inicializamos con nuestro esquema y los datos que queremos escribir.

Entonces, el valor de ProducerRecord es simplemente un GenericRecord que cuenta nuestro esquema y datos. El serializador sabrá cómo obtener el esquema de este registro, almacenarlo en el registro de esquema y serializar los datos del objeto.


viernes, 17 de septiembre de 2021

Primeros pasos con Apache Kafka parte 12


Seguimos con Kafka. 

Apache Avro es un formato de serialización de datos independiente del lenguaje. El proyecto fue creado por Doug Cutting para proporcionar una forma de compartir archivos de datos con una gran audiencia.

Los datos de Avro se describen en un esquema independiente del lenguaje. El esquema generalmente se describe en JSON y la serialización suele ser en archivos binarios, aunque también se admite la serialización en JSON. Avro asume que el esquema está presente al leer y escribir archivos, generalmente incrustando el esquema en los propios archivos.




Una de las características más interesantes de Avro, y lo que lo hace adecuado para su uso en sistemas de mensajería como Kafka, es que cuando la aplicación que está escribiendo mensajes cambia a un nuevo esquema, las aplicaciones que leen los datos pueden continuar procesando mensajes sin necesidad de cambiar o actualizar.

Supongamos que el esquema original fuera:

{

     "namespace": "customerManagement.avro",

     "type": "record",

     "name": "Customer",

     "fields": [

          {"name": "id", "type": "int"},

          {"name": "name", "type": "string""},

          {"name": "faxNumber", "type": ["null", "string"], "default": "null"}

     ]

}

Usamos este esquema durante unos meses y generamos algunos terabytes de datos en este formato. Ahora suponga que decidimos que en la nueva versión, actualizaremos al siglo XXI y ya no incluiremos un campo de número de fax y en su lugar usaremos un campo de correo electrónico.

El nuevo esquema sería:

{"namespace": "customerManagement.avro",
    "type": "record",
    "name": "Customer",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "name", "type": "string"},
        {"name": "email", "type": ["null", "string"], "default": "null"}
    ]
}

Ahora, después de actualizar a la nueva versión, los registros antiguos contendrán "faxNumber" y los registros nuevos contendrán "email". En muchas organizaciones, las actualizaciones se realizan lentamente y durante muchos meses. Por lo tanto, debemos considerar cómo las aplicaciones anteriores a la actualización que aún usan los números de fax y las aplicaciones posteriores a la actualización que usan el correo electrónico podrán manejar todos los eventos en Kafka.

La aplicación de lectura contendrá llamadas a métodos similares a getName (), getId () y getFaxNumber. Si encuentra un mensaje escrito con el nuevo esquema, getName() y getId () continuará funcionando sin modificaciones, pero getFaxNumber () devolverá nulo porque el mensaje no contendrá un número de fax.

Ahora suponga que actualizamos nuestra aplicación de lectura y ya no tiene el método getFaxNumber() sino getEmail(). Si encuentra un mensaje escrito con el esquema anterior, getEmail() devolverá un valor nulo porque los mensajes anteriores no contienen una dirección de correo electrónico.

Este ejemplo ilustra el beneficio de usar Avro: aunque cambiemos el esquema en los mensajes sin cambiar todas las aplicaciones que leen los datos, no habrá excepciones ni errores de ruptura y no será necesario realizar costosas actualizaciones de los datos existentes.

Sin embargo, hay dos advertencias para este escenario:
  • El esquema utilizado para escribir los datos y el esquema esperado por la aplicación de lectura deben ser compatibles. La documentación de Avro incluye reglas de compatibilidad.
  • El deserializador necesitará acceder al esquema que se utilizó al escribir los datos, incluso cuando sea diferente del esquema esperado por la aplicación que accede a los datos. En los archivos Avro, el esquema de escritura se incluye en el propio archivo, pero hay una mejor manera de manejar esto para los mensajes de Kafka. Que veremos en próximos post...

sábado, 11 de septiembre de 2021

Primeros pasos con Apache Kafka parte 11

Seguimos con Kafka. 

Como se vio en ejemplos anteriores, la configuración del productor incluye serializadores y hemos visto cómo utilizar el serializador de cadenas predeterminado. Kafka también incluye serializadores para enteros y ByteArrays, pero algunas veces necesitamos serializar de una forma especial.

Cuando el objeto que necesita enviar a Kafka no es una simple cadena o un entero, tiene la opción de usar una biblioteca de serialización genérica como Avro, Thrift o Protobuf para crear registros, o crear una serialización personalizada para los objetos que ya está usando . 

Suponga que en lugar de registrar solo el nombre del cliente, crea una clase simple para representar a los clientes:

public class Customer {

      private int customerID;

      private String customerName;

      public Customer(int ID, String name) {

            this.customerID = ID;

            this.customerName = name;

      }

      public int getID() {

            return customerID;

      }

      public String getName() {

            return customerName;

      }

}

Ahora suponga que queremos crear un serializador personalizado para esta clase.:

import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CustomerSerializer implements Serializer<Customer> {

      @Override
      public void configure(Map configs, boolean isKey) {
            // nothing to configure
      }

      @Override
      /**
      We are serializing Customer as:
      4 byte int representing customerId
      4 byte int representing length of customerName in UTF-8 bytes (0 if name is Null)
      N bytes representing customerName in UTF-8
      */
      public byte[] serialize(String topic, Customer data) {
      try {
            byte[] serializedName;
            int stringSize;
            if (data == null)
                  return null;
            else {
                  if (data.getName() != null) {
                        serializeName = data.getName().getBytes("UTF-8");
                        stringSize = serializedName.length;
                  } else {
                        serializedName = new byte[0];
                        stringSize = 0;
                  }
            }
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
            buffer.putInt(data.getID());
            buffer.putInt(stringSize);
            buffer.put(serializedName);
            return buffer.array();
      } catch (Exception e) {
            throw new SerializationException("Error when serializing Customer to byte[] " + e);
      }
   }

  @Override
  public void close() {
         // nothing to close
  }
}

La configuración de un productor con este CustomerSerializer le permitirá definir ProducerRecord <String, Customer> y enviar datos del cliente y pasar los objetos del cliente directamente al productor. 

Este ejemplo es bastante simple, pero puede ver lo frágil que es el código. Si alguna vez tenemos demasiados clientes, por ejemplo, y necesitamos cambiar customerID a Long, o si alguna vez decidimos agregar un campo startDate a Customer, tendremos un problema serio para mantener la compatibilidad entre los mensajes antiguos y nuevos. La depuración de problemas de compatibilidad entre diferentes versiones de serializadores y deserializadores es bastante desafiante; es necesario comparar matrices de bytes sin procesar. Para empeorar las cosas, si varios equipos de la misma empresa terminan escribiendo datos del Cliente en Kafka, todos deberán usar los mismos serializadores y modificar el código al mismo tiempo.

Por estos motivos, es buena idea utilizar serializadores y deserializadores existentes, como JSON, Apache Avro, Thrift o Protobuf. 

jueves, 2 de septiembre de 2021

Primeros pasos con Apache Kafka parte 10

 


Seguimos con Kafka

Hasta ahora, hemos visto muy pocos parámetros de configuración para los productores, solo el URI y los serializadores de bootstrap.servers obligatorios. El productor tiene una gran cantidad de parámetros de configuración; la mayoría están documentados en la documentación de Apache Kafka y muchos tienen valores predeterminados razonables, por lo que no hay razón para modificar cada parámetro. Sin embargo, algunos de los parámetros tienen un impacto significativo en el uso de la memoria, el rendimiento y la confiabilidad de los productores. Los revisaremos aquí.

acks

El parámetro acks controla cuántas réplicas de partición deben recibir el registro antes de que el productor pueda considerar que la escritura se ha realizado correctamente. Esta opción tiene un impacto significativo en la probabilidad de que se pierdan los mensajes. Hay tres valores permitidos para el parámetro acks:

• Si acks = 0, el productor no esperará una respuesta del broker antes de asumir que el mensaje se envió correctamente. Esto significa que si algo salió mal y el corredor no recibirá mensaje, el productor no lo sabrá y el mensaje se perderá. Sin embargo, debido a que el productor no está esperando respuesta del servidor, puede enviar mensajes tan rápido como lo admita la red, por lo que esta configuración se puede utilizar para lograr un rendimiento muy alto.

• Si acks = 1, el productor recibirá una respuesta de éxito del corredor en el momento en que la réplica del líder reciba el mensaje. Si el mensaje no se puede escribir al líder (por ejemplo, si el líder colapsó y aún no se eligió un nuevo líder), el productor recibirá una respuesta de error y podrá volver a intentar enviar el mensaje, evitando la pérdida potencial de datos. El mensaje aún puede perderse si el líder falla y se elige una réplica sin este mensaje como el nuevo líder (a través de unclean elección de líder). En este caso, el rendimiento depende de si enviamos mensajes de forma sincrónica o asincrónica. Si nuestro código de cliente espera una respuesta del servidor (al llamar al método get () del objeto Future devuelto al enviar un mensaje) obviamente aumentará la latencia significativamente (al menos por una red viaje ida y vuelta). Si el cliente usa devoluciones de llamada, la latencia se ocultará, pero el rendimiento estará limitado por la cantidad de mensajes en tránsito (es decir, cuántos mensajes enviará el productor antes de recibir respuestas del servidor).

• Si acks = all, el productor recibirá una respuesta exitosa del broker una vez que todas las réplicas sincronizadas hayan recibido el mensaje. Este es el modo más seguro, ya que puede asegurarse de que más de un corredor tenga el mensaje y de que el mensaje sobrevivirá incluso en el caso de una falla, sin embargo, la latencia que discutimos en el caso acks = 1 será aún mayor, ya que estaremos esperando que más de un broker reciba el mensaje.

buffer.memory

Esto establece la cantidad de memoria que el productor utilizará para almacenar en búfer los mensajes que esperan ser enviados a los intermediarios. Si los mensajes son enviados por la aplicación más rápido de lo que pueden entregarse al servidor, el productor puede quedarse sin espacio y las llamadas send () adicionales bloquearán o lanzarán una excepción, según el parámetro block.on.buffer.full ( reemplazado con max.block.ms en la versión 0.9.0.0, que permite bloquear durante un cierto tiempo y luego lanzar una excepción).

compression.type

De forma predeterminada, los mensajes se envían sin comprimir. Este parámetro se puede establecer en snappy, gzip o lz4, en cuyo caso se utilizarán los algoritmos de compresión correspondientes para comprimir los datos antes de enviarlos a los corredores. Google inventó la compresión rápida para proporcionar relaciones de compresión decentes con una sobrecarga de CPU baja y buena rendimiento, por lo que se recomienda en los casos en que tanto el rendimiento como el ancho de banda sean una preocupación. La compresión Gzip normalmente utilizará más CPU y tiempo, pero dará como resultado mejores relaciones de compresión, por lo que se recomienda en los casos en que el ancho de banda de la red es más restringido. Al habilitar la compresión, reduce la utilización de la red y el almacenamiento  que suele ser un cuello de botella al enviar mensajes a Kafka.

retries

Cuando el productor recibe un mensaje de error del servidor, el error podría ser transitorio (por ejemplo, falta de líder para una partición). En este caso, el valor del parámetro de reintentos controlará cuántas veces el productor reintentará enviar el mensaje antes de darse por vencido y notificar al cliente de un problema. De forma predeterminada, el productor esperará 100 ms entre reintentos, pero puede controlar esto mediante el parámetro retry.backoff.ms. Recomendamos probar cuánto tiempo se tarda en recuperarse de un corredor bloqueado (es decir, cuánto tiempo hasta que todas las particiones obtengan nuevos líderes) y establecer el número de reintentos y el retraso entre ellos de modo que la cantidad total de tiempo dedicado a reintentar sea mayor que el tiempo el clúster de Kafka necesita recuperarse del colapso; de lo contrario, el productor se rendirá demasiado pronto. El productor no volverá a intentar todos los errores. Algunos errores no son transitorios y no provocarán reintentos (p. Ej., Error "mensaje demasiado grande"). En general, debido a que el productor maneja los reintentos por usted, no tiene sentido manejar los reintentos dentro de su propia lógica de aplicación. ustedquerrá centrar sus esfuerzos en el manejo de errores no confiables o casos en los que se agotaron los reintentos.

batch.size

Cuando se envían varios registros a la misma partición, el productor los agrupará. Este parámetro controla la cantidad de memoria en bytes (¡no en mensajes!) que se utilizará para cada lote. Cuando el lote esté lleno, se enviarán todos los mensajes del lote. Sin embargo, esto no significa que el productor esperará a que el lote se llene. El productor enviará lotes medio llenos e incluso lotes con un solo mensaje en ellos. Por lo tanto, establecer un tamaño de lote demasiado grande no causará demoras en el envío de mensajes; solo usará más memoria para los lotes. Establecer el tamaño del lote demasiado pequeño agregará algunos gastos generales porque el productor necesitará enviar mensajes con más frecuencia.

linger.ms

linger.ms controla la cantidad de tiempo para esperar mensajes adicionales antes de enviar el lote actual. KafkaProducer envía un lote de mensajes cuando la corriente el lote de alquiler está lleno o cuando se alcanza el límite de linger.ms. De forma predeterminada, el productor enviará mensajes tan pronto como haya un hilo de remitente disponible para enviarlos, incluso si solo hay un mensaje en el lote. Al establecer linger.ms por encima de 0, le indicamos al productor que espere unos milisegundos para agregar mensajes adicionales al lote antes de enviarlo a los brokers. Esto aumenta la latencia pero también aumenta el rendimiento (debido a que enviamos más mensajes a la vez, hay menos gastos generales por mensaje).

client.id

Esta puede ser cualquier cadena y será utilizada por los intermediarios para identificar los mensajes enviados desde el cliente. Se utiliza en registros y métricas, y para cuotas.

max.in.flight.requests.per.connection

Esto controla cuántos mensajes enviará el productor al servidor sin recibir respuestas. Establecer este valor alto puede aumentar el uso de la memoria mientras mejora rendimiento, pero establecerlo demasiado alto puede reducir el rendimiento a medida que el procesamiento por lotes se vuelve menos eficiente. Establecer esto en 1 garantizará que los mensajes se escribirán al corredor en el orden en que se enviaron, incluso cuando se produzcan reintentos.

timeout.ms, request.timeout.ms, and metadata.fetch.timeout.ms

Estos parámetros controlan cuánto tiempo el productor esperará una respuesta del servidor cuando envíe datos (request.timeout.ms) y cuando solicite metadatos como el líderes actuales para las particiones en las que estamos escribiendo (metadata.fetch.timeout.ms). Si se alcanza el tiempo de espera sin respuesta, el productor volverá a intentar enviar o responder con un error (ya sea a través de una excepción o mediante el envío de devolución de llamada). timeout.ms controla el tiempo que el intermediario esperará para que las réplicas sincronizadas reconozcan el mensaje para cumplir con la configuración de acks; el intermediario devolverá un error si transcurre el tiempo sin los reconocimientos necesarios.

max.block.ms

Este parámetro controla cuánto tiempo bloqueará el productor cuando llame a send () y cuando solicite explícitamente metadatos a través de particionesFor (). Esos métodos bloquean cuando el búfer de envío del productor está lleno o cuando los metadatos no están disponibles. Cuando se alcanza max.block.ms, se lanza una excepción de tiempo de espera.

max.request.size

Esta configuración controla el tamaño de una solicitud de producto enviada por el productor. Limita tanto el tamaño del mensaje más grande que se puede enviar como la cantidad de mensajes que el productor puede enviar en una solicitud. Por ejemplo, con un tamaño de solicitud máximo predeterminado de 1 MB, el mensaje más grande que puede enviar es 1 MB o el productor puede agrupar 1,000 mensajes de tamaño 1 K cada uno en una solicitud. Además, el corredor tiene su propio límite en el tamaño del mensaje más grande que aceptará (message.max.bytes). Por lo general, es una buena idea hacer coincidir estas configuraciones, para que el productor no intente enviar mensajes de un tamaño que el broker rechace.

receive.buffer.bytes y send.buffer.bytes

Estos son los tamaños de los búferes de envío y recepción de TCP que utilizan los sockets al escribir y leer datos. Si se establecen en -1, se utilizarán los valores predeterminados del sistema operativo. Es una buena idea aumentarlos cuando los productores o consumidores se comunican con intermediarios en un centro de datos diferente porque esos enlaces de red suelen tener mayor latencia y menor ancho de banda.


Apache Kafka conserva el orden de los mensajes dentro de una partición. Esto significa que si los mensajes se enviaron desde el productor en un orden específico, el broker los escribirá en una partición en ese orden y todos los consumidores los leerán en ese orden. Para algunos casos de uso, el orden es muy importante. Hay una gran diferencia entre depositar $ 100 en una cuenta y luego retirarlos, ¡y al revés! Sin embargo, algunos casos de uso son menos sensibles.

Establecer el parámetro de reintentos en un valor distinto de cero y el max.in.flights.requests.per.session en más de uno significa que es posible que el bróker no pueda escribir el primer lote de mensajes, logre escribir el segundo (que ya estaba en vuelo), y luego vuelva a intentar el primer lote y tenga éxito, invirtiendo así el orden.

Por lo general, establecer el número de reintentos a cero no es una opción en un sistema confiable, por lo que si garantizar el orden es crítico, recomendamos configurar in.flight.requests.per.session = 1 para asegurarse de que mientras se reintenta un lote de mensajes, no se enviarán mensajes adicionales (porque tiene el potencial de invertir el orden correcto).

Esto limitará severamente el rendimiento del productor, por lo tanto, utilícelo solo cuando el pedido sea importante.

miércoles, 25 de agosto de 2021

Primeros pasos con Apache Kafka parte 9


Seguimos con Kafka

Supongamos que el tiempo de ida y vuelta de la red entre nuestra aplicación y el clúster de Kafka es de 10 ms. Si esperamos una respuesta después de enviar cada mensaje, enviar 100 mensajes tardará alrededor de 1 segundo. Por otro lado, si solo enviamos todos nuestros mensajes y no esperamos ninguna respuesta, enviar 100 mensajes apenas nos llevará tiempo. En la mayoría de los casos, realmente no necesitamos una respuesta: Kafka devuelve el tema, la partición y el desplazamiento del registro después de que se escribió, lo que generalmente no es requerido por la aplicación de envío. Por otro lado, necesitamos saber cuándo fallamos en enviar un mensaje por completo para poder lanzar una excepción, registrar un error o quizás escribir el mensaje en un archivo de “errores” para un análisis posterior.

Para enviar mensajes de forma asincrónica y manejar escenarios de error, el productor admite agregar un callback al enviar un registro. Veamos un ejemplo:

private class DemoProducerCallback implements Callback {

    @Override

    public void onCompletion(RecordMetadata recordMetadata, Exception e) {

        if (e != null) {

            e.printStackTrace();

        }

    }

}

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());

Para usar callbaks, necesitamos una clase que implemente la interfaz org.apache.kafka.clients.producer.Callback, que tiene una única función onCompletion().

Si Kafka devolvió un error, onCompletion() tendrá una excepción no nula. Aquí lo “manejamos” imprimiendo, pero el código de producción probablemente tendrá funciones de manejo de errores más robustas. Los registros son los mismos que antes. Y pasamos un objeto de devolución de llamada cuando enviamos el registro.

domingo, 22 de agosto de 2021

Primeros pasos con Apache Kafka parte 8


Seguimos con Kafka

La forma más sencilla de enviar un mensaje es la siguiente:

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");

try {

producer.send(record);

} catch (Exception e) {

e.printStackTrace();

}

El productor acepta objetos ProducerRecord, por lo que comenzamos creando uno. ProducerRecord tiene varios constructores. Aquí usamos uno que requiere el nombre del tema al que estamos enviando datos, que siempre es una cadena, y la clave y el valor que estamos enviando a Kafka, que en este caso también son cadenas. Los tipos de clave y valor deben coincidir con nuestros objetos de serializador y productor.

Usamos el método send() del objeto productor para enviar el ProducerRecord. El mensaje se colocará en un búfer y se enviará al broker en un hilo separado. El método send () devuelve un objeto Java Future con RecordMetadata, pero como simplemente ignoramos el valor devuelto, no tenemos forma de saber si el mensaje se envió correctamente o no. Este método de envío de mensajes se puede utilizar cuando es aceptable que algun mensaje no llegue

Si bien ignoramos los errores que pueden ocurrir al enviar mensajes a los brokers de Kafka, es posible que obtengamos una excepción si el productor encontró errores antes de enviar el mensaje a Kafka. Pueden ser una SerializationException cuando no se serializa el mensaje, una BufferExhaustedException o TimeoutException si el búfer está lleno, o una InterruptException si se interrumpió el hilo de envío.

sábado, 21 de agosto de 2021

Primeros pasos con Apache Kafka parte 7



Seguimos con Kafka

El primer paso para escribir mensajes a Kafka es crear un objeto productor. Un productor de Kafka tiene tres propiedades obligatorias:

bootstrap.servers

Lista de hosts: Como nos tenemos que conectar al cluster necesitamos, la ip de almenos una maquina pero se recomienda incluir al menos dos, por lo que en caso de que una se caiga, el productor aún podrá conectarse al clúster. Luego de conectarse, se le enviaran todas las ips del cluster. 

key.serializer

Nombre de una clase que se utilizará para serializar las claves de los registros que produciremos en Kafka. Los brokers de Kafka esperan matrices de bytes como claves y valores de mensajes.

Sin embargo, la interfaz de productor permite, utilizando tipos parametrizados, enviar cualquier objeto Java como clave y valor. Esto hace que el código sea muy legible, pero también significa que el productor debe saber cómo convertir estos objetos en matrices de bytes.

key.serializer debe configurarse con el nombre de una clase que implemente la interfaz org.apache.kafka.common.serialization.Serializer. El productor utilizará esta clase para serializar el objeto clave en una matriz de bytes. El paquete de cliente de Kafka incluye ByteArraySerializer. Es necesario configurar key.serializer incluso si tiene la intención de enviar solo valores.

value.serializer Nombre de una clase que se utilizará para serializar los valores de los registros que produciremos en Kafka. De la misma manera que establece key.serializer en un nombre de una clase que serializará el objeto de clave de mensaje en una matriz de bytes, establece value.serializer en una clase que serializará el objeto de valor de mensaje.

El siguiente fragmento de código muestra cómo crear un nuevo productor estableciendo solo los parámetros obligatorios y usando valores predeterminados para todo lo demás:

private Properties kafkaProps = new Properties();

kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");

kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<String, String>(kafkaProps);

Con una interfaz tan simple, está claro que la mayor parte del control sobre el comportamiento del productor se realiza estableciendo las propiedades de configuración correctas. La documentación de Apache Kafka cubre todas las opciones de configuración, y repasaremos las importantes más adelante.

Una vez que creamos una instancia de un productor, es hora de comenzar a enviar mensajes. Hay tres métodos principales para enviar mensajes:

Dispara y olvida
Enviamos un mensaje al servidor y realmente no nos importa si llega con éxito o no. La mayoría de las veces, llegará correctamente, ya que Kafka tiene una alta disponibilidad y el productor volverá a intentar enviar mensajes automáticamente. Sin embargo, algunos mensajes se pueden perder con este método, hay casos de uso que no nos importa estar seguros si llegan los mensajes para esos casos es este método. 

Envío sincrónico
Enviamos un mensaje, el método send () devuelve un objeto Future y usamos get () para esperar y ver si el envío fue exitoso o no.

Envío asincrónico
Llamamos al método send() con una función de devolución de llamada, que se activa cuando recibe una respuesta del corredor de Kafka. 

En los ejemplos que siguen, veremos cómo enviar mensajes usando estos métodos y cómo manejar los diferentes tipos de errores que pueden ocurrir pero eso es una historia para otro post...

viernes, 13 de agosto de 2021

Consumiendo mensajes desde Apache Kafka con Spring Boot


Vamos a hacer un ejemplo de spring boot que consuma un mensaje desde Kafka. Primero tenemos que bajar nuestro proyecto de spring initializr en esta pagina podemos agregar spring-kafka o agregarlo luego  : 

implementation 'org.springframework.kafka:spring-kafka'

Usando Gradle o Maven. 

Antes vamos a hacer un mensaje de ejemplo, que va ser el mensaje a escribir : 


public class Message {

    private long id;

    private String message;

    private LocalDateTime time;


    public Message() {}


    public Message(long id, String message, LocalDateTime time) {

        this.id = id;

        this.message = message;

        this.time = time;

    }


    public Message(String message, LocalDateTime time) {

        this(new Random().nextLong(), message, time);

    }


    public Message(String message) {

        this(message,LocalDateTime.now());

    }


    public long getId() {

        return id;

    }


    public void setId(long id) {

        this.id = id;

    }


    public String getMessage() {

        return message;

    }


    public void setMessage(String message) {

        this.message = message;

    }


    public LocalDateTime getTime() {

        return time;

    }


    public void setTime(LocalDateTime time) {

        this.time = time;

    }


    @Override

    public boolean equals(Object o) {

        if (this == o) return true;

        if (o == null || getClass() != o.getClass()) return false;

        Message message1 = (Message) o;

        return Objects.equals(message, message1.message) &&

                Objects.equals(time, message1.time);

    }


    @Override

    public int hashCode() {

        return Objects.hash(message, time);

    }


    @Override

    public String toString() {

        return "Message{" +

                "message='" + message + '\'' +

                ", time=" + time +

                '}';

    }

}

Tenemos que escribir 2 clases de configuración, una para indicar como nos conectar a Kafka y como esta serializado el objeto. 


@EnableKafka

@Configuration

public class KafkaConsumerConfig {


    @Value(value = "${kafka.bootstrapAddress}")

    private String bootstrapAddress;


    @Bean

    public ConsumerFactory<String, Message> consumerFactory() {

        var props = new HashMap<String, Object>();

        props.put(

                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,

                bootstrapAddress);

        props.put(

                ConsumerConfig.GROUP_ID_CONFIG,

                "demo");

        props.put(

                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

                StringDeserializer.class);

        props.put(

                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

                JsonDeserializer.class);

        props.put(JsonDeserializer.TRUSTED_PACKAGES,

                "com.assembly.demo.model");

        return new DefaultKafkaConsumerFactory<>(props);

    }


    @Bean

    public ConcurrentKafkaListenerContainerFactory<String, Message>

    kafkaListenerContainerFactory() {

        var factory = new ConcurrentKafkaListenerContainerFactory<String, Message>();

        factory.setConsumerFactory(this.consumerFactory());

        return factory;

    }

}

Y Otra para confugurar el topic : 


@Configuration

public class KafkaTopicConfig {

    @Value(value = "${kafka.bootstrapAddress}")

    private String bootstrapAddress;

    @Bean

    public KafkaAdmin kafkaAdmin() {

        var configs = new HashMap<String, Object>();

        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);

        return new KafkaAdmin(configs);

    }

    @Bean

    public NewTopic topicDemo() {

        return new NewTopic("demo", 1, (short) 1);

    }

}


Y ahora vamos a hacer un servicio que consuma el mensaje y lo escriba en pantalla : 


@Service

public class ConsumerService {

    @KafkaListener(topics = "${kafka.topicName}", groupId = "demo")

    public void listenGroupDemo(Message message) {

        System.out.println("Received Message in group demo : " + message);

    }

}

Y Listo!!

Dejo link del repo : https://github.com/emanuelpeg/springBoot-Kafka-example