Translate

jueves, 2 de septiembre de 2021

Primeros pasos con Apache Kafka parte 10

 


Seguimos con Kafka

Hasta ahora, hemos visto muy pocos parámetros de configuración para los productores, solo el URI y los serializadores de bootstrap.servers obligatorios. El productor tiene una gran cantidad de parámetros de configuración; la mayoría están documentados en la documentación de Apache Kafka y muchos tienen valores predeterminados razonables, por lo que no hay razón para modificar cada parámetro. Sin embargo, algunos de los parámetros tienen un impacto significativo en el uso de la memoria, el rendimiento y la confiabilidad de los productores. Los revisaremos aquí.

acks

El parámetro acks controla cuántas réplicas de partición deben recibir el registro antes de que el productor pueda considerar que la escritura se ha realizado correctamente. Esta opción tiene un impacto significativo en la probabilidad de que se pierdan los mensajes. Hay tres valores permitidos para el parámetro acks:

• Si acks = 0, el productor no esperará una respuesta del broker antes de asumir que el mensaje se envió correctamente. Esto significa que si algo salió mal y el corredor no recibirá mensaje, el productor no lo sabrá y el mensaje se perderá. Sin embargo, debido a que el productor no está esperando respuesta del servidor, puede enviar mensajes tan rápido como lo admita la red, por lo que esta configuración se puede utilizar para lograr un rendimiento muy alto.

• Si acks = 1, el productor recibirá una respuesta de éxito del corredor en el momento en que la réplica del líder reciba el mensaje. Si el mensaje no se puede escribir al líder (por ejemplo, si el líder colapsó y aún no se eligió un nuevo líder), el productor recibirá una respuesta de error y podrá volver a intentar enviar el mensaje, evitando la pérdida potencial de datos. El mensaje aún puede perderse si el líder falla y se elige una réplica sin este mensaje como el nuevo líder (a través de unclean elección de líder). En este caso, el rendimiento depende de si enviamos mensajes de forma sincrónica o asincrónica. Si nuestro código de cliente espera una respuesta del servidor (al llamar al método get () del objeto Future devuelto al enviar un mensaje) obviamente aumentará la latencia significativamente (al menos por una red viaje ida y vuelta). Si el cliente usa devoluciones de llamada, la latencia se ocultará, pero el rendimiento estará limitado por la cantidad de mensajes en tránsito (es decir, cuántos mensajes enviará el productor antes de recibir respuestas del servidor).

• Si acks = all, el productor recibirá una respuesta exitosa del broker una vez que todas las réplicas sincronizadas hayan recibido el mensaje. Este es el modo más seguro, ya que puede asegurarse de que más de un corredor tenga el mensaje y de que el mensaje sobrevivirá incluso en el caso de una falla, sin embargo, la latencia que discutimos en el caso acks = 1 será aún mayor, ya que estaremos esperando que más de un broker reciba el mensaje.

buffer.memory

Esto establece la cantidad de memoria que el productor utilizará para almacenar en búfer los mensajes que esperan ser enviados a los intermediarios. Si los mensajes son enviados por la aplicación más rápido de lo que pueden entregarse al servidor, el productor puede quedarse sin espacio y las llamadas send () adicionales bloquearán o lanzarán una excepción, según el parámetro block.on.buffer.full ( reemplazado con max.block.ms en la versión 0.9.0.0, que permite bloquear durante un cierto tiempo y luego lanzar una excepción).

compression.type

De forma predeterminada, los mensajes se envían sin comprimir. Este parámetro se puede establecer en snappy, gzip o lz4, en cuyo caso se utilizarán los algoritmos de compresión correspondientes para comprimir los datos antes de enviarlos a los corredores. Google inventó la compresión rápida para proporcionar relaciones de compresión decentes con una sobrecarga de CPU baja y buena rendimiento, por lo que se recomienda en los casos en que tanto el rendimiento como el ancho de banda sean una preocupación. La compresión Gzip normalmente utilizará más CPU y tiempo, pero dará como resultado mejores relaciones de compresión, por lo que se recomienda en los casos en que el ancho de banda de la red es más restringido. Al habilitar la compresión, reduce la utilización de la red y el almacenamiento  que suele ser un cuello de botella al enviar mensajes a Kafka.

retries

Cuando el productor recibe un mensaje de error del servidor, el error podría ser transitorio (por ejemplo, falta de líder para una partición). En este caso, el valor del parámetro de reintentos controlará cuántas veces el productor reintentará enviar el mensaje antes de darse por vencido y notificar al cliente de un problema. De forma predeterminada, el productor esperará 100 ms entre reintentos, pero puede controlar esto mediante el parámetro retry.backoff.ms. Recomendamos probar cuánto tiempo se tarda en recuperarse de un corredor bloqueado (es decir, cuánto tiempo hasta que todas las particiones obtengan nuevos líderes) y establecer el número de reintentos y el retraso entre ellos de modo que la cantidad total de tiempo dedicado a reintentar sea mayor que el tiempo el clúster de Kafka necesita recuperarse del colapso; de lo contrario, el productor se rendirá demasiado pronto. El productor no volverá a intentar todos los errores. Algunos errores no son transitorios y no provocarán reintentos (p. Ej., Error "mensaje demasiado grande"). En general, debido a que el productor maneja los reintentos por usted, no tiene sentido manejar los reintentos dentro de su propia lógica de aplicación. ustedquerrá centrar sus esfuerzos en el manejo de errores no confiables o casos en los que se agotaron los reintentos.

batch.size

Cuando se envían varios registros a la misma partición, el productor los agrupará. Este parámetro controla la cantidad de memoria en bytes (¡no en mensajes!) que se utilizará para cada lote. Cuando el lote esté lleno, se enviarán todos los mensajes del lote. Sin embargo, esto no significa que el productor esperará a que el lote se llene. El productor enviará lotes medio llenos e incluso lotes con un solo mensaje en ellos. Por lo tanto, establecer un tamaño de lote demasiado grande no causará demoras en el envío de mensajes; solo usará más memoria para los lotes. Establecer el tamaño del lote demasiado pequeño agregará algunos gastos generales porque el productor necesitará enviar mensajes con más frecuencia.

linger.ms

linger.ms controla la cantidad de tiempo para esperar mensajes adicionales antes de enviar el lote actual. KafkaProducer envía un lote de mensajes cuando la corriente el lote de alquiler está lleno o cuando se alcanza el límite de linger.ms. De forma predeterminada, el productor enviará mensajes tan pronto como haya un hilo de remitente disponible para enviarlos, incluso si solo hay un mensaje en el lote. Al establecer linger.ms por encima de 0, le indicamos al productor que espere unos milisegundos para agregar mensajes adicionales al lote antes de enviarlo a los brokers. Esto aumenta la latencia pero también aumenta el rendimiento (debido a que enviamos más mensajes a la vez, hay menos gastos generales por mensaje).

client.id

Esta puede ser cualquier cadena y será utilizada por los intermediarios para identificar los mensajes enviados desde el cliente. Se utiliza en registros y métricas, y para cuotas.

max.in.flight.requests.per.connection

Esto controla cuántos mensajes enviará el productor al servidor sin recibir respuestas. Establecer este valor alto puede aumentar el uso de la memoria mientras mejora rendimiento, pero establecerlo demasiado alto puede reducir el rendimiento a medida que el procesamiento por lotes se vuelve menos eficiente. Establecer esto en 1 garantizará que los mensajes se escribirán al corredor en el orden en que se enviaron, incluso cuando se produzcan reintentos.

timeout.ms, request.timeout.ms, and metadata.fetch.timeout.ms

Estos parámetros controlan cuánto tiempo el productor esperará una respuesta del servidor cuando envíe datos (request.timeout.ms) y cuando solicite metadatos como el líderes actuales para las particiones en las que estamos escribiendo (metadata.fetch.timeout.ms). Si se alcanza el tiempo de espera sin respuesta, el productor volverá a intentar enviar o responder con un error (ya sea a través de una excepción o mediante el envío de devolución de llamada). timeout.ms controla el tiempo que el intermediario esperará para que las réplicas sincronizadas reconozcan el mensaje para cumplir con la configuración de acks; el intermediario devolverá un error si transcurre el tiempo sin los reconocimientos necesarios.

max.block.ms

Este parámetro controla cuánto tiempo bloqueará el productor cuando llame a send () y cuando solicite explícitamente metadatos a través de particionesFor (). Esos métodos bloquean cuando el búfer de envío del productor está lleno o cuando los metadatos no están disponibles. Cuando se alcanza max.block.ms, se lanza una excepción de tiempo de espera.

max.request.size

Esta configuración controla el tamaño de una solicitud de producto enviada por el productor. Limita tanto el tamaño del mensaje más grande que se puede enviar como la cantidad de mensajes que el productor puede enviar en una solicitud. Por ejemplo, con un tamaño de solicitud máximo predeterminado de 1 MB, el mensaje más grande que puede enviar es 1 MB o el productor puede agrupar 1,000 mensajes de tamaño 1 K cada uno en una solicitud. Además, el corredor tiene su propio límite en el tamaño del mensaje más grande que aceptará (message.max.bytes). Por lo general, es una buena idea hacer coincidir estas configuraciones, para que el productor no intente enviar mensajes de un tamaño que el broker rechace.

receive.buffer.bytes y send.buffer.bytes

Estos son los tamaños de los búferes de envío y recepción de TCP que utilizan los sockets al escribir y leer datos. Si se establecen en -1, se utilizarán los valores predeterminados del sistema operativo. Es una buena idea aumentarlos cuando los productores o consumidores se comunican con intermediarios en un centro de datos diferente porque esos enlaces de red suelen tener mayor latencia y menor ancho de banda.


Apache Kafka conserva el orden de los mensajes dentro de una partición. Esto significa que si los mensajes se enviaron desde el productor en un orden específico, el broker los escribirá en una partición en ese orden y todos los consumidores los leerán en ese orden. Para algunos casos de uso, el orden es muy importante. Hay una gran diferencia entre depositar $ 100 en una cuenta y luego retirarlos, ¡y al revés! Sin embargo, algunos casos de uso son menos sensibles.

Establecer el parámetro de reintentos en un valor distinto de cero y el max.in.flights.requests.per.session en más de uno significa que es posible que el bróker no pueda escribir el primer lote de mensajes, logre escribir el segundo (que ya estaba en vuelo), y luego vuelva a intentar el primer lote y tenga éxito, invirtiendo así el orden.

Por lo general, establecer el número de reintentos a cero no es una opción en un sistema confiable, por lo que si garantizar el orden es crítico, recomendamos configurar in.flight.requests.per.session = 1 para asegurarse de que mientras se reintenta un lote de mensajes, no se enviarán mensajes adicionales (porque tiene el potencial de invertir el orden correcto).

Esto limitará severamente el rendimiento del productor, por lo tanto, utilícelo solo cuando el pedido sea importante.