Tuve que conectar una aplicación spark con hbase con scala y sbt y scalatra... etc...
primero vamos a ver a build.sbt :
val ScalatraVersion = "2.6.3"
organization := "com.hexacta"
name := "Sparklatra"
version := "0.1.0-SNAPSHOT"
scalaVersion := "2.11.8"
resolvers += Classpaths.typesafeReleases
resolvers += "Cloudera Repository" at "https://repository.cloudera.com/content/repositories/releases/"
resolvers += "Cloudera Repository2" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
libraryDependencies ++= Seq(
"org.scalatra" %% "scalatra" % ScalatraVersion ,
"org.scalatra" %% "scalatra-scalatest" % ScalatraVersion % "test" ,
"ch.qos.logback" % "logback-classic" % "1.2.3" % "runtime" ,
"org.eclipse.jetty" % "jetty-webapp" % "9.4.9.v20180320" % "container" ,
"javax.servlet" % "javax.servlet-api" % "3.1.0" % "provided" ,
"com.sun.jersey" % "jersey-core" % "1.19.4" ,
"com.sun.jersey" % "jersey-server" % "1.19.4" ,
"org.apache.spark" % "spark-core_2.11" % "2.3.1",
"org.apache.spark" %% "spark-sql" % "2.3.1",
"org.apache.spark" %% "spark-streaming" % "2.3.1" ,
"org.apache.hbase" % "hbase-spark" % "2.1.0-cdh6.1.1" intransitive() ,
"org.apache.hbase" % "hbase-server" % "2.1.0-cdh6.1.1" intransitive() ,
"org.apache.hbase" % "hbase-mapreduce" % "2.1.0-cdh6.1.1" intransitive() ,
"org.apache.hbase" % "hbase-common" % "2.1.0" exclude("com.fasterxml.jackson.core", "jackson-databind"),
"org.apache.hbase" % "hbase-client" % "2.1.0" exclude("com.fasterxml.jackson.core", "jackson-databind")
)
enablePlugins(SbtTwirl)
enablePlugins(ScalatraPlugin)
Ahora vamos hacer un ejemplito, de un metodo get que insertar y consultar :
get(s"/testSparkHbase/:key/:value") {
val conf : Configuration = HBaseContext.getConf()
// cree la tabla
// create 'TableTest', 'info'
// put 'TableTest', 'rowkey1', 'info:test', 'ejemplo'
val connection = ConnectionFactory.createConnection(conf)
val hbaseContext = new org.apache.hadoop.hbase.spark.HBaseContext(SparkContext.getSc, conf)
val tableName = "TableTest"
val columnFamily = "info"
val rdd = SparkContext.getSc.parallelize(Array(
(Bytes.toBytes(params("key")),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("test"), Bytes.toBytes(params("value")))))))
hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
TableName.valueOf(tableName),
(putRecord) => {
val put = new Put(putRecord._1)
putRecord._2.foreach((putValue) =>
put.addColumn(putValue._1, putValue._2, putValue._3))
put
});
Ok("ok")
}
get(s"/testSparkHbase/:key") {
val conf : Configuration = HBaseContext.getConf()
// cree la tabla
// create 'TableTest', 'info'
// put 'TableTest', 'rowkey1', 'info:test', 'ejemplo'
val connection = ConnectionFactory.createConnection(conf)
val hbaseContext = new org.apache.hadoop.hbase.spark.HBaseContext(SparkContext.getSc, conf)
val tableName = "TableTest"
val columnFamily = "info"
val rdd = SparkContext.getSc.parallelize(Array(Bytes.toBytes(params("key"))))
val getRdd = rdd.hbaseBulkGet[String](hbaseContext, TableName.valueOf(tableName), 2,
record => {
System.out.println("making Get"+ record.toString)
new Get(record)
},
(result: Result) => {
val it = result.listCells().iterator()
val b = new StringBuilder
b.append(Bytes.toString(result.getRow) + ":")
while (it.hasNext) {
val cell = it.next()
val q = Bytes.toString(CellUtil.cloneQualifier(cell))
if (q.equals("counter")) {
b.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
} else {
b.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
}
}
b.toString()
})
getRdd.collect().foreach(v => println(v))
var result = ListBuffer[String]()
getRdd.collect().foreach(v => result += v)
Ok(compact(render(result)))
}
}