Seguimos con Kafka
Hasta ahora nos hemos centrado en aprender la API del consumidor, pero solo hemos analizado algunas de las propiedades de configuración, solo los obligatorios bootstrap.servers, group.id, key.deserializer y value.deserializer. Toda la configuración del consumidor está documentada en la documentación de Apache Kafka. La mayoría de los parámetros tienen una razón pueden tener valores predeterminados y no requieren modificación, pero algunos tienen implicaciones en el rendimiento y la disponibilidad de los consumidores. Echemos un vistazo a algunas de las propiedades más importantes.
fetch.min.bytes
Esta propiedad permite que un consumidor especifique la cantidad mínima de datos que desea recibir del corredor al buscar registros. Si un corredor recibe una solicitud de registros de un consumidor, pero los nuevos registros suman menos bytes que min.fetch.bytes, el corredor esperará hasta que haya más mensajes disponibles antes de devolver los registros al consumidor. Esto reduce la carga tanto para el consumidor como para el corredor, ya que tienen que manejar menos mensajes de ida y vuelta en los casos en que los temas no tienen mucha actividad nueva (o para horas de menor actividad del día). Deberá establecer este parámetro más alto que el predeterminado si el consumidor está usando demasiada CPU cuando no hay muchos datos disponibles, o reducir la carga de los intermediarios cuando tiene una gran cantidad de consumidores.
fetch.max.wait.ms
Al establecer fetch.min.bytes, le indica a Kafka que espere hasta que tenga suficientes datos para enviar antes de responder al consumidor. fetch.max.wait.ms te permite controlar cuánto tiempo esperar. De forma predeterminada, Kafka esperará hasta 500 ms. Esto da como resultado hasta 500 ms de latencia adicional en caso de que no haya suficientes datos fluyendo al tema de Kafka para satisfacer la cantidad mínima de datos para devolver. Si desea limitar la latencia potencial (generalmente debido a que los SLA controlan la latencia máxima de la aplicación), puede establecer fetch.max.wait.ms en un valor más bajo. Si establece fetch.max.wait.ms en 100 ms y fetch.min.bytes en 1 MB, Kafka recibirá una solicitud de recuperación del consumidor y responderá con datos cuando tenga 1 MB de datos para devolver o después de 100 ms, lo que ocurra primero.
max.partition.fetch.bytes
Esta propiedad controla el número máximo de bytes que el servidor devolverá por partición. El valor predeterminado es 1 MB, lo que significa que cuando KafkaConsumer.poll () devuelve ConsumerRecords, el objeto de registro utilizará como máximo un máximo de partición.fetch.bytes por partición asignada al consumidor. Entonces, si un tema tiene 20 particiones y tiene 5 consumidores, cada consumidor deberá tener 4 MB de memoria disponible para Consumer Records. En la práctica, querrá asignar más memoria ya que cada consumidor necesitará manejar más particiones si otros consumidores del grupo fallan. máx. partition.fetch.bytes debe ser más grande que el mensaje más grande que aceptará un corredor (determinado por la propiedad max.message.size en la configuración del corredor), o el corredor puede tener mensajes que el consumidor no podrá consumir, en los caso que el consumidor se cuelgue tratando de leerlos. Otra consideración importante al establecer max.partition.fetch.bytes es la cantidad de tiempo que le toma al consumidor procesar los datos. Como recordará, el consumidor debe llamar a poll () con la frecuencia suficiente para evitar el tiempo de espera de la sesión y el reequilibrio posterior. Si la cantidad de datos que devuelve un solo poll () es muy grande, el consumidor puede tardar más en procesar, lo que significa que no llegará a la siguiente iteración del ciclo de sondeo a tiempo para evitar un tiempo de espera de sesión. Si esto ocurre, las dos opciones son reducir el máx. partición.fetch.bytes o para aumentar el tiempo de espera de la sesión.
session.timeout.ms
La cantidad de tiempo que un consumidor puede estar fuera de contacto con los corredores mientras aún se considera vivo es de 3 segundos. Si pasa más de session.timeout.ms sin que el consumidor envíe un latido al coordinador del grupo, se considera muerto y el coordinador del grupo activará un reequilibrio del grupo de consumidores para asignar particiones del consumidor muerto a los otros consumidores del grupo. . Esta propiedad está estrechamente relacionada con heartbeat.interval.ms. heartbeat.interval.ms controla la frecuencia con la que el método poll () de KafkaConsumer enviará un latido al coordinador del grupo, mientras que session.timeout.ms controla cuánto tiempo puede pasar un consumidor sin enviar un latido. Por lo tanto, esas dos propiedades generalmente se modifican juntas: heatbeat.interval.ms debe ser menor que session.timeout.ms, y generalmente se establece en un tercio del valor de tiempo de espera. Entonces, si session.timeout.ms es de 3 segundos, heartbeat.interval.ms debería ser de 1 segundo. Establecer session.timeout.ms por debajo del valor predeterminado permitirá a los grupos de consumidores detectar y recuperarse de fallas antes, pero también puede causar reequilibrios no deseados como resultado de que los consumidores tarden más en completar el ciclo de sondeo o la recolección de basura. Configurar session.timeout.ms más alto reducirá la posibilidad de un reequilibrio accidental, pero también significa que tomará más tiempo detectar una falla real.
auto.offset.reset
Esta propiedad controla el comportamiento del consumidor cuando comienza a leer una partición para la cual no tiene una compensación comprometida o si la compensación comprometida que tiene no es válida (generalmente porque el consumidor estuvo inactivo durante tanto tiempo que el registro con esa compensación ya estaba obsoleto en el corredor). El valor predeterminado es "más reciente", lo que significa que, sin una compensación válida, el consumidor comenzará a leer los registros más recientes (registros que se escribieron después de que el consumidor comenzó a ejecutar). La alternativa es "la más temprana", lo que significa que, sin un desplazamiento válido, el consumidor leerá todos los datos de la partición, comenzando desde el principio.
enable.auto.commit
Este parámetro controla si el consumidor confirmará las compensaciones automáticamente y su valor predeterminado es verdadero. Configúrelo en falso si prefiere controlar cuándo se confirman las compensaciones, lo cual es necesario para minimizar los duplicados y evitar la pérdida de datos. Si establece enable.auto.commit en true, es posible que también desee controlar la frecuencia con la que se confirmarán las compensaciones mediante auto.commit.interval.ms.
division.assignment.strategy
Aprendimos que las particiones se asignan a los consumidores en un grupo de consumidores. Un PartitionAssignor es una clase que, dados los consumidores y los temas a los que se suscribieron, decide qué particiones se asignarán a qué consumidor. De forma predeterminada, Kafka tiene dos estrategias de asignación:
- Range: Asigna a cada consumidor un subconjunto consecutivo de particiones de cada tema al que se suscribe. Entonces, si los consumidores C1 y C2 están suscritos a dos temas, T1 y T2, y cada uno de los temas tiene tres particiones, luego a C1 se le asignarán las particiones 0 y 1 de los temas T1 y T2, mientras que a C2 se le asignará la partición 2 de esos temas. Debido a que cada tema tiene un número impar de particiones y la asignación se realiza para cada tema de forma independiente, el primer consumidor termina con más particiones que el segundo. Esto sucede siempre que se utiliza la asignación de rango y el número de consumidores no divide el número de particiones en cada tema de forma ordenada.
- RoundRobin: Toma todas las particiones de todos los temas suscritos y las asigna a los consumidores de forma secuencial, una por una. Si C1 y C2 se describieron anteriormente, usaron RoundRobin asignación, C1 tendría las particiones 0 y 2 del tema T1 y la partición 1 del tema T2. C2 tendría la partición 1 del tema T1 y las particiones 0 y 2 del tema T2. En general, si todos los consumidores están suscritos a los mismos temas (un escenario muy común), la asignación de RoundRobin terminará con todos los consumidores con el mismo número de particiones (o como máximo una diferencia de partición).
division.assignment.strategy permite elegir una estrategia de asignación de partición. El valor predeterminado es org.apache.kafka.clients.consumer.RangeAssignor, que implementa la estrategia de rango descrita anteriormente. Puede reemplazarlo con org.apache.kafka.clients.consumer.RoundRobinAssignor. Una opción más avanzada es implementar su propia estrategia de asignación, en cuyo caso partition.assignment.strategy debe apuntar al nombre de su clase.
client.id
Esta puede ser cualquier cadena y será utilizada por los intermediarios para identificar los mensajes enviados desde el cliente. Se utiliza en registros y métricas, y para cuotas.
registros.max.poll.
Esto controla el número máximo de registros que devolverá una sola llamada a poll (). Esto es útil para ayudar a controlar la cantidad de datos que su aplicación necesitará procesar en el ciclo de sondeo.
receive.buffer.bytes y send.buffer.bytes
Estos son los tamaños de los búferes de envío y recepción de TCP que utilizan los sockets al escribir y leer datos. Si se establecen en -1, se utilizarán los valores predeterminados del sistema operativo. Puede ser una buena idea aumentarlos cuando los productores o consumidores se comunican con intermediarios en un centro de datos diferente, porque esos enlaces de red suelen tener mayor latencia y menor ancho de banda.