Translate

sábado, 25 de agosto de 2018

Haciendo un proyecto con scalatra y spark


Vamos a hacer un proyecto en Scala con Scalatra y Spark. La idea es hacer una Api Rest, la cual utilice Spark.

Lo primero que hacemos es el proyecto de scalatra como esta indicado aquí :
https://emanuelpeg.blogspot.com/2018/08/haciendo-un-proyecto-con-scalatra.html

Ya teniendo este proyecto agregamos las dependencias de spark y de jersey (este ultimo es necesario para el funcionamiento de los frameworks)

Es decir el buil.sbt nos va quedar así:

val ScalatraVersion = "2.6.3"

organization := "com.hexacta"

name := "Sparklatra"

version := "0.1.0-SNAPSHOT"

scalaVersion := "2.11.0"

resolvers += Classpaths.typesafeReleases

libraryDependencies ++= Seq(
  "org.scalatra" %% "scalatra" % ScalatraVersion  ,
  "org.scalatra" %% "scalatra-scalatest" % ScalatraVersion % "test" ,
  "ch.qos.logback" % "logback-classic" % "1.2.3" % "runtime" ,
  "org.eclipse.jetty" % "jetty-webapp" % "9.4.9.v20180320" % "container" ,
  "javax.servlet" % "javax.servlet-api" % "3.1.0" % "provided"  ,
  "com.sun.jersey" % "jersey-core" % "1.19.4" ,
  "com.sun.jersey" % "jersey-server" % "1.19.4" ,
  "org.apache.spark" %% "spark-core" % "2.3.1"
)

enablePlugins(SbtTwirl)
enablePlugins(ScalatraPlugin)

Luego de agregar estas dependencias vamos a hacer un objeto que contenga el contexto de Spark, dado que queremos utilizar un contexto en toda nuestra aplicación y lo hacemos de la siguiente manera :

package com.miEmpresa

import org.apache.spark.{SparkConf, SparkContext}

object SparkContext {

  //Create a SparkContext to initialize Spark
  val conf = new SparkConf()
  conf.setMaster("local")
  conf.setAppName("Word Count")
  val sc = new SparkContext(conf)

   def getSc = {
      sc
   }
}

Luego de hacer esto, vamos a servlet de scalatra y agregamos el método get que cuente palabras pasadas como parámetro en la url :

   get(s"/contar/:str") {

    //word count
    val counts = List({
      params("str")
    }).flatMap(line => line.split(" "))
      .map(word => (word, 1))

    val countsRdd = SparkContext.getSc.parallelize(counts).reduceByKey(_ + _).collect()

    var result = ListBuffer[(String,Int)]()

    countsRdd.foreach(line => result += line)

    Ok(compact(render(result)))

  }

En el get definimos que la url va a contener un string que es nuestro parámetro str. Luego hacemos un split por espacios en blanco y luego hacemos un mapa (palabra, 1).  Como ultimo paso convertimos nuestro map a RDD y reducimos por palabra. Es decir el proceso sería para la linea "hola hola mundo" de la siguiente manera :

 "hola hola mundo" -> split(" ") = ("hola","hola","mundo") -> map = ( ("hola", 1) , ("hola", 1) , ("mundo", 1) ) -> reduceByKey = ("hola",2), ("mundo",1)

Para probarlo primero debemos ejecutarlo con sbt y jetty. Vamos al directorio donde se encuentra el archivo build.sbt y escribimos:
sbt
--- Acá va correr un montón de cosas ---
jetty:start

Por ultimo, debemos ir a un browser y probar la url : http://localhost:8080/contar/

Como se envían los datos por el método get podemos probar con la siguiente url, por ejemplo:

"http://localhost:8080/contar/hola hola hola Mundo"

Y listo!!