domingo, 31 de marzo de 2019

Apache Spark llega a Apache HBase con HBase-Spark


HBase-Spark  fue commiteado por primera vez a Github en julio de 2014. HBase-Spark  salió de una simple solicitud de clientes para tener un nivel de interacción entre HBase y Spark similar al ya disponible entre HBase y MapReduce. Aquí hay un breve resumen de la funcionalidad que brinda dicha librería:

Acceso completo a HBase:
  • Posibilidad de hacer una carga masiva.
  • Posibilidad de realizar operaciones masivas como put, get o delete
  • Posibilidad de ser una fuente de datos para motores SQL.

El enfoque que se le dio a la librería tiene los siguientes objetivos:
  • Hacer las conexiones HBase sin problemas.
  • Hacer la integración de Kerberos sin problemas.
  • Crear RDDs a través de acciones de escaneo o desde un RDD existente que se usando comandos Get.
  • Tomar cualquier RDD y permita que se realice cualquier combinación de operaciones HBase.
  • Proporcione métodos simples para operaciones comunes mientras permite operaciones avanzadas desconocidas sin restricciones a través de la API.
  • Soporte Scala y Java.
  • Soporta Spark y Spark Streaming con una API similar.
Estos objetivos llevaron a un diseño que tomó un par de notas de la API de GraphX en Spark. Por ejemplo, en HBase-Spark hay un objeto llamado HBaseContext. Esta clase tiene un constructor que toma la información de configuración de HBase y luego, una vez construida, le permite hacer un montón de operaciones en ella. Por ejemplo:
  • Crear RDD / DStream desde una Scaneo.
  • Poner / Eliminar el contenido de un RDD / DStream en HBase
  • Crear un RDD / DStream a partir de los contenidos de un RDD / DStream
  • Tomar el contenido de un RDD / DStream y realizar cualquier operación si se le entregó una HConnection en el proceso de trabajo.
La arquitectura básica aún se mantiene, ya que la parte central del código está diseñada para obtener un objeto de conexión HBase en cada Ejecutor Spark.

Veamos unos ejemplos por ejemplo como podemos hacer un bulk delete :

val hbaseContext = new HBaseContext(sc, config)
rdd.hbaseBulkDelete(hbaseContext,
                  TableName.valueOf(tableName),
                  putRecord => new Delete(putRecord),
                  4)

Aquí hay un ejemplo de una función de partición de mapa donde podemos obtener un objeto de conexión a medida que iteramos sobre nuestros valores:

val getRdd = rdd.hbaseMapPartitions(hbaseContext, (it, conn) => {
        val table = conn.getTable(TableName.valueOf("t1"))
        var res = mutable.MutableList[String]()
        ...
      })

MapPartitions funciona como map es decir toma un rdd y lo transforma en otro rdd.

Veamos un ejemplo completo donde guardamos los objetos con ids 1, 2, 3, 4, 5 :

// Nothing to see here just creating a SparkContext like you normally would
val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " + tableName + " " + columnFamily)
val sc = new SparkContext(sparkConf)

//This is making a RDD of
//(RowKey, columnFamily, columnQualifier, value)
val rdd = sc.parallelize(Array(
      (Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
      (Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
      (Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
      (Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
      (Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
     )
    )

//Create the HBase config like you normally would  then
//Pass the HBase configs and SparkContext to the HBaseContext
val conf = HBaseConfiguration.create();
conf.addResource(new Path("/etc/hbase/conf/core-site.xml"));
conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));
val hbaseContext = new HBaseContext(sc, conf);

//Now give the rdd, table name, and a function that will convert a RDD record to a put, and finally
// A flag if you want the puts to be batched
hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
    tableName,
    //This function is really important because it allows our source RDD to have data of any type
    // Also because puts are not serializable
    (putRecord) > {
      val put = new Put(putRecord._1)
      putRecord._2.foreach((putValue) > put.add(putValue._1, putValue._2, putValue._3))
       put
    },
    true);

Ahora, cada partición de ese RDD se ejecutará en paralelo (en diferentes subprocesos en un número de trabajadores de Spark en el clúster), algo así como lo que habría ocurrido si hiciéramos Puts en una tarea MapReduce.

Una cosa a tener en cuenta es que se aplican las mismas reglas cuando se trabaja con HBase de MapReduce o Spark en términos de rendimiento de Get y Put. Si tiene Puts que no están particionados, lo más probable es que se envíe un lote de Put a cada RegionServer, lo que dará como resultado menos registros por RegionServers por lote. La imagen a continuación ilustra cómo se vería esto con seis RegionServers:


Ahora veamos el mismo diagrama si utilizamos Spark para particionar primero antes de hablar con HBase.



No hay comentarios.:

Publicar un comentario