miércoles, 13 de octubre de 2021

Primeros pasos con Apache Kafka parte 15


Hasta ahora, hemos discutido las características del particionador predeterminado, que es el que se usa con más frecuencia. Sin embargo, Kafka no lo limita a particiones hash y, a veces, existen buenas razones para particionar los datos de manera diferente. Por ejemplo, suponga que es un proveedor B2B y su mayor cliente es una empresa que fabrica dispositivos portátiles llamados Bananas. Suponga que hace tantos negocios con el cliente "Banana" que más del 10% de sus transacciones diarias son con este cliente. Si usa la partición de hash predeterminada, los registros de Banana se asignarán a la misma partición que otras cuentas, lo que dará como resultado que una partición sea aproximadamente el doble de grande que el resto. Esto puede hacer que los servidores se queden sin espacio, que el procesamiento se ralentice, etc. Lo que realmente queremos es darle a Banana su propia partición y luego usar particiones hash para asignar el resto de las cuentas a las particiones.

A continuación, se muestra un ejemplo de un particionador personalizado:

import org.apache.kafka.clients.producer.Partitioner;

import org.apache.kafka.common.Cluster;

import org.apache.kafka.common.PartitionInfo;

import org.apache.kafka.common.record.InvalidRecordException;

import org.apache.kafka.common.utils.Utils;


public class BananaPartitioner implements Partitioner {


public void configure(Map<String, ?> configs) {}

public int partition(String topic, Object key, byte[] keyBytes,

        Object value, byte[] valueBytes,

        Cluster cluster) {

    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

    int numPartitions = partitions.size();

    if ((keyBytes == null) || (!(key instanceOf String))) 

        throw new InvalidRecordException("We expect all messages to have customer name as key")

    if (((String) key).equals("Banana"))

        return numPartitions; // Banana siempre va estar en la ultima partición

    return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))

}

public void close() {}

}

No hay comentarios.:

Publicar un comentario