Translate
sábado, 25 de agosto de 2018
Haciendo un proyecto con scalatra y spark
Vamos a hacer un proyecto en Scala con Scalatra y Spark. La idea es hacer una Api Rest, la cual utilice Spark.
Lo primero que hacemos es el proyecto de scalatra como esta indicado aquí :
https://emanuelpeg.blogspot.com/2018/08/haciendo-un-proyecto-con-scalatra.html
Ya teniendo este proyecto agregamos las dependencias de spark y de jersey (este ultimo es necesario para el funcionamiento de los frameworks)
Es decir el buil.sbt nos va quedar así:
val ScalatraVersion = "2.6.3"
organization := "com.hexacta"
name := "Sparklatra"
version := "0.1.0-SNAPSHOT"
scalaVersion := "2.11.0"
resolvers += Classpaths.typesafeReleases
libraryDependencies ++= Seq(
"org.scalatra" %% "scalatra" % ScalatraVersion ,
"org.scalatra" %% "scalatra-scalatest" % ScalatraVersion % "test" ,
"ch.qos.logback" % "logback-classic" % "1.2.3" % "runtime" ,
"org.eclipse.jetty" % "jetty-webapp" % "9.4.9.v20180320" % "container" ,
"javax.servlet" % "javax.servlet-api" % "3.1.0" % "provided" ,
"com.sun.jersey" % "jersey-core" % "1.19.4" ,
"com.sun.jersey" % "jersey-server" % "1.19.4" ,
"org.apache.spark" %% "spark-core" % "2.3.1"
)
enablePlugins(SbtTwirl)
enablePlugins(ScalatraPlugin)
Luego de agregar estas dependencias vamos a hacer un objeto que contenga el contexto de Spark, dado que queremos utilizar un contexto en toda nuestra aplicación y lo hacemos de la siguiente manera :
package com.miEmpresa
import org.apache.spark.{SparkConf, SparkContext}
object SparkContext {
//Create a SparkContext to initialize Spark
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Word Count")
val sc = new SparkContext(conf)
def getSc = {
sc
}
}
Luego de hacer esto, vamos a servlet de scalatra y agregamos el método get que cuente palabras pasadas como parámetro en la url :
get(s"/contar/:str") {
//word count
val counts = List({
params("str")
}).flatMap(line => line.split(" "))
.map(word => (word, 1))
val countsRdd = SparkContext.getSc.parallelize(counts).reduceByKey(_ + _).collect()
var result = ListBuffer[(String,Int)]()
countsRdd.foreach(line => result += line)
Ok(compact(render(result)))
}
En el get definimos que la url va a contener un string que es nuestro parámetro str. Luego hacemos un split por espacios en blanco y luego hacemos un mapa (palabra, 1). Como ultimo paso convertimos nuestro map a RDD y reducimos por palabra. Es decir el proceso sería para la linea "hola hola mundo" de la siguiente manera :
"hola hola mundo" -> split(" ") = ("hola","hola","mundo") -> map = ( ("hola", 1) , ("hola", 1) , ("mundo", 1) ) -> reduceByKey = ("hola",2), ("mundo",1)
Para probarlo primero debemos ejecutarlo con sbt y jetty. Vamos al directorio donde se encuentra el archivo build.sbt y escribimos:
sbt
--- Acá va correr un montón de cosas ---
jetty:start
Por ultimo, debemos ir a un browser y probar la url : http://localhost:8080/contar/
Como se envían los datos por el método get podemos probar con la siguiente url, por ejemplo:
"http://localhost:8080/contar/hola hola hola Mundo"
Y listo!!