Como se vio en ejemplos anteriores, la configuración del productor incluye serializadores y hemos visto cómo utilizar el serializador de cadenas predeterminado. Kafka también incluye serializadores para enteros y ByteArrays, pero algunas veces necesitamos serializar de una forma especial.
Cuando el objeto que necesita enviar a Kafka no es una simple cadena o un entero, tiene la opción de usar una biblioteca de serialización genérica como Avro, Thrift o Protobuf para crear registros, o crear una serialización personalizada para los objetos que ya está usando .
Suponga que en lugar de registrar solo el nombre del cliente, crea una clase simple para representar a los clientes:
public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
}
}
Ahora suponga que queremos crear un serializador personalizado para esta clase.:
import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerSerializer implements Serializer<Customer> {
@Override
public void configure(Map configs, boolean isKey) {
// nothing to configure
}
@Override
/**
We are serializing Customer as:
4 byte int representing customerId
4 byte int representing length of customerName in UTF-8 bytes (0 if name is Null)
N bytes representing customerName in UTF-8
*/
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (data == null)
return null;
else {
if (data.getName() != null) {
serializeName = data.getName().getBytes("UTF-8");
stringSize = serializedName.length;
} else {
serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getID());
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to byte[] " + e);
}
}
@Override
public void close() {
// nothing to close
}
}
La configuración de un productor con este CustomerSerializer le permitirá definir ProducerRecord <String, Customer> y enviar datos del cliente y pasar los objetos del cliente directamente al productor.
Este ejemplo es bastante simple, pero puede ver lo frágil que es el código. Si alguna vez tenemos demasiados clientes, por ejemplo, y necesitamos cambiar customerID a Long, o si alguna vez decidimos agregar un campo startDate a Customer, tendremos un problema serio para mantener la compatibilidad entre los mensajes antiguos y nuevos. La depuración de problemas de compatibilidad entre diferentes versiones de serializadores y deserializadores es bastante desafiante; es necesario comparar matrices de bytes sin procesar. Para empeorar las cosas, si varios equipos de la misma empresa terminan escribiendo datos del Cliente en Kafka, todos deberán usar los mismos serializadores y modificar el código al mismo tiempo.
Por estos motivos, es buena idea utilizar serializadores y deserializadores existentes, como JSON, Apache Avro, Thrift o Protobuf.