Translate

lunes, 18 de febrero de 2019

Spark y HBase

Tuve que conectar una aplicación spark con hbase con scala y sbt y scalatra... etc...

primero vamos a ver a build.sbt : 

val ScalatraVersion = "2.6.3"

organization := "com.hexacta"

name := "Sparklatra"

version := "0.1.0-SNAPSHOT"

scalaVersion := "2.11.8"

resolvers += Classpaths.typesafeReleases
resolvers += "Cloudera Repository" at "https://repository.cloudera.com/content/repositories/releases/"
resolvers += "Cloudera Repository2" at "https://repository.cloudera.com/artifactory/cloudera-repos/"

libraryDependencies ++= Seq(
  "org.scalatra" %% "scalatra" % ScalatraVersion  ,
  "org.scalatra" %% "scalatra-scalatest" % ScalatraVersion % "test" ,
  "ch.qos.logback" % "logback-classic" % "1.2.3" % "runtime" ,
  "org.eclipse.jetty" % "jetty-webapp" % "9.4.9.v20180320" % "container" ,
  "javax.servlet" % "javax.servlet-api" % "3.1.0" % "provided"  ,
  "com.sun.jersey" % "jersey-core" % "1.19.4" ,
  "com.sun.jersey" % "jersey-server" % "1.19.4" ,
  "org.apache.spark" % "spark-core_2.11" % "2.3.1",
  "org.apache.spark" %% "spark-sql" % "2.3.1",
  "org.apache.spark" %% "spark-streaming" % "2.3.1" ,
  "org.apache.hbase" % "hbase-spark" % "2.1.0-cdh6.1.1" intransitive() ,
  "org.apache.hbase" % "hbase-server" % "2.1.0-cdh6.1.1"  intransitive() ,
  "org.apache.hbase" % "hbase-mapreduce" % "2.1.0-cdh6.1.1" intransitive() ,
  "org.apache.hbase" % "hbase-common" % "2.1.0" exclude("com.fasterxml.jackson.core", "jackson-databind"),
  "org.apache.hbase" % "hbase-client" % "2.1.0" exclude("com.fasterxml.jackson.core", "jackson-databind")
)

enablePlugins(SbtTwirl)
enablePlugins(ScalatraPlugin)

Ahora vamos hacer un ejemplito, de un metodo get que insertar y consultar : 

   get(s"/testSparkHbase/:key/:value") {
    val conf : Configuration = HBaseContext.getConf()
    // cree la tabla
    // create 'TableTest', 'info'
    // put 'TableTest', 'rowkey1', 'info:test', 'ejemplo'
    val connection = ConnectionFactory.createConnection(conf)

    val hbaseContext = new org.apache.hadoop.hbase.spark.HBaseContext(SparkContext.getSc, conf)

    val tableName = "TableTest"
    val columnFamily = "info"

    val rdd = SparkContext.getSc.parallelize(Array(
      (Bytes.toBytes(params("key")),
        Array((Bytes.toBytes(columnFamily), Bytes.toBytes("test"), Bytes.toBytes(params("value")))))))

    hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
      TableName.valueOf(tableName),
      (putRecord) => {
        val put = new Put(putRecord._1)
        putRecord._2.foreach((putValue) =>
          put.addColumn(putValue._1, putValue._2, putValue._3))
        put
      });

    Ok("ok")
  }

  get(s"/testSparkHbase/:key") {

    val conf : Configuration = HBaseContext.getConf()
    // cree la tabla
    // create 'TableTest', 'info'
    // put 'TableTest', 'rowkey1', 'info:test', 'ejemplo'
    val connection = ConnectionFactory.createConnection(conf)

    val hbaseContext = new org.apache.hadoop.hbase.spark.HBaseContext(SparkContext.getSc, conf)

    val tableName = "TableTest"
    val columnFamily = "info"

    val rdd = SparkContext.getSc.parallelize(Array(Bytes.toBytes(params("key"))))

    val getRdd = rdd.hbaseBulkGet[String](hbaseContext, TableName.valueOf(tableName), 2,
      record => {
        System.out.println("making Get"+ record.toString)
        new Get(record)
      },
      (result: Result) => {

        val it = result.listCells().iterator()
        val b = new StringBuilder

        b.append(Bytes.toString(result.getRow) + ":")

        while (it.hasNext) {
          val cell = it.next()
          val q = Bytes.toString(CellUtil.cloneQualifier(cell))
          if (q.equals("counter")) {
            b.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
          } else {
            b.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
          }
        }
        b.toString()
      })

    getRdd.collect().foreach(v => println(v))

    var result = ListBuffer[String]()

    getRdd.collect().foreach(v => result += v)

    Ok(compact(render(result)))
  }

  }