domingo, 19 de septiembre de 2021

Primeros pasos con Apache Kafka parte 13



Seguimos con Kafka. 

Los archivos Avro deben almacenar el esquema completo en el archivo de datos que están asociados, almacenar el esquema completo en cada registro generalmente será más del doble del tamaño del registro. Para resolver este problema podemos utilizar un registro de esquemas. Los Schema Registry no forma parte de Apache Kafka, pero hay varias opciones de código abierto para elegir. Usaremos Confluent Schema Registry para este ejemplo. 

La idea es almacenar todos los esquemas utilizados luego, simplemente almacenamos el identificador del esquema en el registro que producimos en Kafka. Los consumidores pueden usar el identificador para extraer el registro del registro de esquema y deserializar los datos. 

A continuación, se muestra un ejemplo de cómo producir objetos Avro generados en Kafka:

Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");

props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");

props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");

props.put("schema.registry.url", schemaUrl);

String topic = "customerContacts";

int wait = 500;

Producer<String, Customer> producer = new KafkaProducer<String,Customer>(props);

while (true) {

    Customer customer = CustomerGenerator.getNext();

    System.out.println("Generated customer " + customer.toString());

    ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getId(), customer);

    producer.send(record);

}

Usamos KafkaAvroSerializer para serializar nuestros objetos con Avro. AvroSerializer también puede manejar primitivas, por lo que luego podemos usar String como clave de registro y nuestro objeto Customer como valor.

schema.registry.url es un nuevo parámetro. Esto simplemente apunta a dónde almacenamos los esquemas.

El cliente es nuestro objeto generado. Le decimos al productor que nuestros registros contendrán Cliente como valor.

También creamos una instancia de ProducerRecord con Customer como el tipo de valor y pasamos un objeto Customer al crear el nuevo registro.

Eso es todo. Enviamos el registro con nuestro objeto Cliente y KafkaAvroSerializer se encargará del resto.

¿Qué sucede si prefiere utilizar objetos Avro genéricos en lugar de los objetos Avro generados?
No hay problema. En este caso, solo necesita proporcionar el esquema:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", url);

String schemaString = "{\"namespace\": \"customerManagement.avro\",
\"type\": \"record\", " +
"\"name\": \"Customer\"," +
"\"fields\": [" +
"{\"name\": \"id\", \"type\": \"int\"}," +
"{\"name\": \"name\", \"type\": \"string\"}," +
"{\"name\": \"email\", \"type\": [\"null\",\"string
\"], \"default\":\"null\" }" +
"]}";

Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);

for (int nCustomers = 0; nCustomers < customers; nCustomers++) {
    String name = "exampleCustomer" + nCustomers;
    String email = "example " + nCustomers + "@example.com"
    GenericRecord customer = new GenericData.Record(schema);
    customer.put("id", nCustomer);
    customer.put("name", name);
    customer.put("email", email);
    ProducerRecord<String, GenericRecord> data = new ProducerRecord<String,
    GenericRecord>("customerContacts", name, customer);
    producer.send(data);
}

Seguimos usando el mismo KafkaAvroSerializer.

Y proporcionamos el URI del mismo registro de esquema.

Pero ahora también necesitamos proporcionar el esquema Avro, ya que no lo proporciona el objeto generado por Avro.

Nuestro tipo de objeto es un Avro GenericRecord, que inicializamos con nuestro esquema y los datos que queremos escribir.

Entonces, el valor de ProducerRecord es simplemente un GenericRecord que cuenta nuestro esquema y datos. El serializador sabrá cómo obtener el esquema de este registro, almacenarlo en el registro de esquema y serializar los datos del objeto.


No hay comentarios.:

Publicar un comentario