Translate

miércoles, 10 de noviembre de 2021

Primeros pasos con Apache Kafka parte 20



Seguimos con Kafka

En el corazón de la API del consumidor hay un bucle simple para sondear el servidor en busca de datos.

Una vez que el consumidor se suscribe a los temas, el ciclo de sondeo maneja todos los detalles de coordinación, reequilibrio de particiones, pings y obtención de datos, dejando al desarrollador con una API limpia que simplemente que devuelve los datos disponibles de las particiones asignadas. Veamos un ejemplo :

try {

    while (true) {

        ConsumerRecords<String, String> records = consumer.poll(100);

        for (ConsumerRecord<String, String> record : records) {

            log.debug("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",

                record.topic(), record.partition(), record.offset(), record.key(), record.value());

            int updatedCount = 1;

            if (custCountryMap.countainsValue(record.value())) {

                updatedCount = custCountryMap.get(record.value()) + 1;

            }

            custCountryMap.put(record.value(), updatedCount)

            JSONObject json = new JSONObject(custCountryMap);

            System.out.println(json.toString(4))

        }

    }

} finally {

    consumer.close();

}

Este es un bucle infinito. Los consumidores suelen ser aplicaciones de larga duración que sondean continuamente a Kafka en busca de más datos. 

ConsumerRecords<String, String> records = consumer.poll(100);  en esta linea se puede ver que los consumidores deben seguir sondeando a Kafka o serán considerados muertos y las particiones que están consumiendo serán entregadas a otro consumidor del grupo para que continúe consumiendo. El parámetro que pasamos, poll (), es un intervalo de tiempo de espera y controla cuánto tiempo se bloqueará poll () si los datos no están disponibles en el búfer del consumidor. Si se establece en 0, poll () regresará inmediatamente; de lo contrario, esperará el número especificado de milisegundos para que lleguen los datos del intermediario.

poll () devuelve una lista de registros. Cada registro contiene el tema y la partición de la que proviene el registro, el desplazamiento del registro dentro de la partición y, por supuesto, la clave y el valor del registro. Por lo general, queremos iterar sobre la lista y procesar los registros individualmente. El método poll () toma un parámetro de tiempo de espera. Esto especifica cuánto tiempo tardará la encuesta en regresar, con o sin datos. El valor suele depender de las necesidades de la aplicación para obtener respuestas rápidas: ¿qué tan rápido desea devolver el control al subproceso que realiza el sondeo?

El procesamiento generalmente termina escribiendo un resultado en un almacén de datos o actualizando un registro almacenado. Aquí, el objetivo es mantener un recuento continuo de clientes de cada condado, por lo que actualizamos una tabla hash e imprimimos el resultado como JSON. Un ejemplo más realista almacenaría el resultado de las actualizaciones en un almacén de datos.

Siempre se debe cerrar la conexión del consumidor antes de salir. Esto cerrará las conexiones de red y los slots. También activará un reequilibrio de inmediato en lugar de esperar a que el coordinador del grupo descubra que el consumidor dejó de enviar pings y es probable que esté muerto, lo que llevará más tiempo y, por lo tanto, dará como resultado un período de tiempo más largo en el que los consumidores no pueden consumir mensajes de un subconjunto de las particiones.

El ciclo de la encuesta hace mucho más que simplemente obtener datos. La primera vez que llama a poll () con un nuevo consumidor, es responsable de encontrar el GroupCoordinator, unirse al grupo de consumidores y recibir una asignación de partición. Si se activa un reequilibrio, también se manejará dentro del ciclo de sondeo. Y, por supuesto, los latidos que mantienen vivos a los consumidores se envían desde dentro del ciclo de la encuesta. Por esta razón, intentamos asegurarnos de que cualquier procesamiento que hagamos entre iteraciones sea rápido y eficiente.