Translate

Mostrando las entradas con la etiqueta Apache Spark. Mostrar todas las entradas
Mostrando las entradas con la etiqueta Apache Spark. Mostrar todas las entradas

sábado, 10 de agosto de 2019

Unificando el modelo de programación de motores mapreduce con Apache Beam


Supongamos te tenemos que hacer una aplicación que tiene que utilizar un motor map-reduce y bueno, como sabemos spark, trabajamos con nuestros RDDs y el contexto de spark y tenemos un éxito absoluto.

Ahora bien en un momento nos enteramos que nuestra aplicación funcionaria mejor con Apache Flink o por un tema comercial queremos migrar a Cloud Dataflow de google, no sé la realidad cambia y nosotro no vamos a poder cambiar tan fácil de tecnología, porque estamos atrapados en nuestros RDDs y en el contexto de Spark.

Por esta razón se invento Apache Beam, la idea es unificar el modelo de programación de frameworks map-reduce, streams o big data en general para poder migrar fácilmente de framework a framework.

Apache Beam es un modelo unificado de código abierto para definir tuberías de procesamiento paralelo de datos por lotes y de transmisión. Usando el SDK de Beam, creamos un programa que define tuberías que es ejecutada por uno de los back-end de procesamiento distribuido compatibles con Apache Apex, Apache Flink, Apache Spark y Google Cloud Dataflow. Además Apache Beam SDK es de código abierto.

Beam es particularmente útil para tareas paralelas de procesamiento de datos, en las que el problema se puede descomponer en muchos paquetes más pequeños de datos que se pueden procesar de forma independiente y en paralelo. También puede usar Beam para tareas de Extracción, Transformación y Carga (ETL) e integración de datos puros. Estas tareas son útiles para mover datos entre diferentes medios de almacenamiento y fuentes de datos, transformar datos en un formato más deseable o cargar datos de un sistema.

Los SDK de Beam proporcionan un modelo de programación unificado que puede representar y transformar conjuntos de datos de cualquier tamaño, ya sea que la entrada sea un conjunto de datos finito de una fuente de datos por lotes o un conjunto de datos infinito de una fuente de datos de transmisión  (streams). Los SDK de Beam utilizan las mismas clases para representar datos acotados y no acotados, y las mismas transformaciones funcionan para esos datos.

Beam actualmente admite los siguientes lenguajes:
  • Java Java logo
  • Python Python logo
  • Go 
  • Scala por medio de la librería Scio.

Algo importante es que tenemos un sdk por lenguaje, y es medio obvio dado que esto no corre solo sobre la jdk. Esto me suena tramposo porque no es que para todos los lenguajes tenes todos los frameworks. Sino que para algunos lenguajes tenes algunos frameworks de esa tecnología. 

Los Beam Pipeline Runners traducen la tubería de procesamiento de datos al framework back-end que deseemos. Cuando ejecute su programa Beam, deberá especificar un corredor apropiado para el back-end.

Beam actualmente admite Runners que funcionan con los siguientes back-end de procesamiento distribuido:

  • Apache Apex  
  • Apache Flink 
  • Apache Gearpump (incubating) 
  • Apache Samza
  • Apache Spark 
  • Google Cloud Dataflow 
  • Hazelcast Jet 

Dejo link: https://beam.apache.org/

jueves, 1 de agosto de 2019

Ploteando un dataframe con R en Spark.

Retomando un post viejo y uno nuevo. Vamos a tomar los datos exportados del archivo csv que contiene la cantidad de artistas que trabajaron en cada película y vamos a graficar la densidad de este valor. Es decir, vamos a saber cuantos actores conocidos trabajan en películas normalmente (en promedio) :

// Primero leo el archivo

csvPath <- "/home/spark/filmsCount.csv/part.csv"

df <- read.df(csvPath, "csv", header = "false", inferSchema = "true")

//Luego imprimo sus valores para saber que están bien.

head(df)

showDF(df)

//Ahora transformo el dataframe de spark a un dataframe plano de R

dfR <- collect(df) //Es importante pasar de dataframe de spark a dataframe de R

//Importo la librería de plot de R

library('txtplot')

//Gráfico la densidad a los 20 primeros valores.
txtdensity( dfR[1:20,2] )

//Gráfico la densidad a todos los valores.
txtdensity( dfR[,2] )

//Gráfico la densidad a todos los valores con un barchart
txtbarchart(as.factor( dfR[,2] ))

Y listo!!

Y te queda así :








jueves, 25 de julio de 2019

Mezclando datos de diferentes almacenes de datos con Apache Spark SQL


Vamos a hacer un ejemplo con Apache Spark SQL. Algo simple, vamos a tomar algunas tablas de una base de datos relacional y vamos hacer unas consultas y luego vamos a importar datos de un csv para tambien hacer consultas:

//Primero importamos la clase de SQLContext y creamos el contexto.

import org.apache.spark.sql.SQLContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

//Vamos a importar los datos de la base de datos, en este caso la tabla "actor"

val url = "jdbc:mysql://0.0.0.0:3306/sakila"

val dfActor = spark.read.format("jdbc").option("url", url)
.option("dbtable", "actor")
.option("user", "sparkDb")
.option("password","sparkDb").load()

//Ya tenemos la tabla en el dataframe ahora vamos a imprimir el esquema y ver los datos.

dfActor.printSchema()
dfActor.show()
dfActor.count()

//Consultamos el campo nombre

dfActor.select("first_name").show()

//Registramos la tabla para poder consultarla con SQL

dfActor.registerTempTable("actors")

// Y consultamos

spark.sql("select first_name, last_name from actors").show

//Vamos a traernos una tabla relación entre el actor y el film

val dfActorFilms = spark.read.format("jdbc").option("url", url).option("dbtable", "film_actor").option("user", "sparkDb").option("password","sparkDb").load()

//Registro la tabla temporal

dfActorFilms.registerTempTable("actors_films")

//Selecciono los datos del actor y en cuantas peliculas actuo

spark.sql("select a.first_name, a.last_name, count(af.film_id) from actors a join actors_films af on (a.actor_id=af.actor_id) group by a.first_name, a.last_name").show

// Ahora vamos a traernos los datos del film pero desde un archivo csv
// Creamos una clase que me va hacer util leer los datos de forma estructurada.

case class Film(film_id: Int, title: String, description: String, release_year: Int)

// Leemos el archivo CSV :

val dfFilms = sc.textFile("/home/spark/films.csv").map(_.split(",")).map(p => Film(p(0).trim.toInt, p(1), p(2), p(3).trim.toInt)).toDF()

// Vemos que tiene el archivo

dfFilms.show

//Registramos el dataframe para poder hacer consultas sobre él

dfFilms.registerTempTable("films")

//Realizamos una consulta con joins al datafame actos_films para saber cuantos actores trabajaron en los films.

val dfFilmsCountActors = spark.sql("select f.title, count(af.film_id) from films f join actors_films af on (f.film_id=af.film_id) group by f.title")

//Registramos el dataframe como una tabla temporal

dfFilmsCountActors.registerTempTable("films_Count_Actors")

//Hacemos una consulta sobre esa tabla con filtro.

spark.sql("select * from films_Count_Actors fca where fca.title like '%SANTA%' ").show

//Por ultimo exportamos este resultado.

dfFilmsCountActors.coalesce(1).write.csv("/home/spark/filmsCount.csv")

Fin!

La idea era poder hacer un conjunto de acciones que nos sirvan como ejemplo de las cosas que se pueden hacer con Spark SQL. Espero que les sirva!!

martes, 23 de julio de 2019

Como puedo exportar un dataframe a un archivo en Apache Spark??


Tuve que exportar un dataframe a un archivo csv es spark y con scala, como no sabia lo tuve que buscar y ahora te lo comparto. En Spark 2 simplemente podemos hacer lo siguiente:

df.write.csv ("/la/carpeta/donde/queremos/que/este/el/archivo")

Si deseamos asegurarnos que el archivo sea uno, es decir que ya no estén particionados, agregue un .coalesce (1) de la siguiente manera;

df.coalesce (1) .write.csv ("/la/carpeta/donde/queremos/que/este/el/archivo")

Espero que les sea de ayuda!!

miércoles, 1 de mayo de 2019

C# y F# para Apache Spark


Microsoft anunció el lanzamiento de .NET para Apache Spark.

Microsoft anunció la versión preliminar de .NET para Apache Spark. Apache Spark está escrito en Scala, por lo que siempre ha tenido soporte nativo para este lenguaje. También ha tenido durante mucho tiempo enlaces API para Java, así como los populares lenguajes de ciencia de datos Python y R. Los nuevos enlaces de lenguaje para C# y F# están escritos en una nueva capa de interoperabilidad Spark. En las pruebas en el punto de referencia TPC-H, el rendimiento de .NET fue comparable a otros lenguajes, y en algunos casos fue "2 veces más rápido que Python".

Los desarrolladores pueden reutilizar el código y las librerías compatibles con .NET estándar y "pueden acceder a todos los aspectos de Apache Spark, incluidos Spark SQL, DataFrames, Streaming, MLLib". Los desarrolladores de la nube pueden implementar .NET para Apache Spark en Azure de Microsoft usando Azure HDInsight y Azure Databricks, o en Amazon Web Services usando Amazon EMR Spark y AWS Databricks.

La ejecución de aplicaciones .NET en Spark requiere la instalación de los binarios de Microsoft.Spark.Worker, así como un JDK y los binarios estándar de Apache Spark. El desarrollo de una aplicación requiere la instalación del paquete nuget Microsoft.Spark. Los desarrolladores de Microsoft han enviado Propuestas de mejora de proyectos Spark (SPIP) para incluir la extensión de lenguaje C# y una capa de interoperabilidad genérica en Spark. Sin embargo, Sean Owen, un comentarista de Apache Spark, comentó que sería "altamente improbable" que el trabajo se fusionara con Spark.

La hoja de ruta de .NET para Apache Spark enumera varias mejoras al proyecto que ya está en marcha, incluida la compatibilidad con Apache Spark 3.0, la compatibilidad con la vectorización de .NET Core 3.0 y la compatibilidad con VS Code. .NET para el código fuente de Apache Spark está disponible en Github.

miércoles, 10 de abril de 2019

Python Developers Survey 2018 Results

Jetbrain como todos los años hace su encuestas y así tenemos los resultados:
https://www.jetbrains.com/research/python-developers-survey-2018/

La verdad no hubo muchas cosas que me llamaron la atención python es un lenguje que esta instalado ya en la sociedad.

Si puedo hacer una revisión a una cuestion sobre la relación de python con bigdata. Al parecer la gente de python utiliza spark para analizar sus datos.

 

domingo, 31 de marzo de 2019

Apache Spark llega a Apache HBase con HBase-Spark


HBase-Spark  fue commiteado por primera vez a Github en julio de 2014. HBase-Spark  salió de una simple solicitud de clientes para tener un nivel de interacción entre HBase y Spark similar al ya disponible entre HBase y MapReduce. Aquí hay un breve resumen de la funcionalidad que brinda dicha librería:

Acceso completo a HBase:
  • Posibilidad de hacer una carga masiva.
  • Posibilidad de realizar operaciones masivas como put, get o delete
  • Posibilidad de ser una fuente de datos para motores SQL.

El enfoque que se le dio a la librería tiene los siguientes objetivos:
  • Hacer las conexiones HBase sin problemas.
  • Hacer la integración de Kerberos sin problemas.
  • Crear RDDs a través de acciones de escaneo o desde un RDD existente que se usando comandos Get.
  • Tomar cualquier RDD y permita que se realice cualquier combinación de operaciones HBase.
  • Proporcione métodos simples para operaciones comunes mientras permite operaciones avanzadas desconocidas sin restricciones a través de la API.
  • Soporte Scala y Java.
  • Soporta Spark y Spark Streaming con una API similar.
Estos objetivos llevaron a un diseño que tomó un par de notas de la API de GraphX en Spark. Por ejemplo, en HBase-Spark hay un objeto llamado HBaseContext. Esta clase tiene un constructor que toma la información de configuración de HBase y luego, una vez construida, le permite hacer un montón de operaciones en ella. Por ejemplo:
  • Crear RDD / DStream desde una Scaneo.
  • Poner / Eliminar el contenido de un RDD / DStream en HBase
  • Crear un RDD / DStream a partir de los contenidos de un RDD / DStream
  • Tomar el contenido de un RDD / DStream y realizar cualquier operación si se le entregó una HConnection en el proceso de trabajo.
La arquitectura básica aún se mantiene, ya que la parte central del código está diseñada para obtener un objeto de conexión HBase en cada Ejecutor Spark.

Veamos unos ejemplos por ejemplo como podemos hacer un bulk delete :

val hbaseContext = new HBaseContext(sc, config)
rdd.hbaseBulkDelete(hbaseContext,
                  TableName.valueOf(tableName),
                  putRecord => new Delete(putRecord),
                  4)

Aquí hay un ejemplo de una función de partición de mapa donde podemos obtener un objeto de conexión a medida que iteramos sobre nuestros valores:

val getRdd = rdd.hbaseMapPartitions(hbaseContext, (it, conn) => {
        val table = conn.getTable(TableName.valueOf("t1"))
        var res = mutable.MutableList[String]()
        ...
      })

MapPartitions funciona como map es decir toma un rdd y lo transforma en otro rdd.

Veamos un ejemplo completo donde guardamos los objetos con ids 1, 2, 3, 4, 5 :

// Nothing to see here just creating a SparkContext like you normally would
val sparkConf = new SparkConf().setAppName("HBaseBulkPutExample " + tableName + " " + columnFamily)
val sc = new SparkContext(sparkConf)

//This is making a RDD of
//(RowKey, columnFamily, columnQualifier, value)
val rdd = sc.parallelize(Array(
      (Bytes.toBytes("1"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
      (Bytes.toBytes("2"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
      (Bytes.toBytes("3"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
      (Bytes.toBytes("4"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
      (Bytes.toBytes("5"), Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5"))))
     )
    )

//Create the HBase config like you normally would  then
//Pass the HBase configs and SparkContext to the HBaseContext
val conf = HBaseConfiguration.create();
conf.addResource(new Path("/etc/hbase/conf/core-site.xml"));
conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));
val hbaseContext = new HBaseContext(sc, conf);

//Now give the rdd, table name, and a function that will convert a RDD record to a put, and finally
// A flag if you want the puts to be batched
hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
    tableName,
    //This function is really important because it allows our source RDD to have data of any type
    // Also because puts are not serializable
    (putRecord) > {
      val put = new Put(putRecord._1)
      putRecord._2.foreach((putValue) > put.add(putValue._1, putValue._2, putValue._3))
       put
    },
    true);

Ahora, cada partición de ese RDD se ejecutará en paralelo (en diferentes subprocesos en un número de trabajadores de Spark en el clúster), algo así como lo que habría ocurrido si hiciéramos Puts en una tarea MapReduce.

Una cosa a tener en cuenta es que se aplican las mismas reglas cuando se trabaja con HBase de MapReduce o Spark en términos de rendimiento de Get y Put. Si tiene Puts que no están particionados, lo más probable es que se envíe un lote de Put a cada RegionServer, lo que dará como resultado menos registros por RegionServers por lote. La imagen a continuación ilustra cómo se vería esto con seis RegionServers:


Ahora veamos el mismo diagrama si utilizamos Spark para particionar primero antes de hablar con HBase.



lunes, 18 de febrero de 2019

Spark y HBase

Tuve que conectar una aplicación spark con hbase con scala y sbt y scalatra... etc...

primero vamos a ver a build.sbt : 

val ScalatraVersion = "2.6.3"

organization := "com.hexacta"

name := "Sparklatra"

version := "0.1.0-SNAPSHOT"

scalaVersion := "2.11.8"

resolvers += Classpaths.typesafeReleases
resolvers += "Cloudera Repository" at "https://repository.cloudera.com/content/repositories/releases/"
resolvers += "Cloudera Repository2" at "https://repository.cloudera.com/artifactory/cloudera-repos/"

libraryDependencies ++= Seq(
  "org.scalatra" %% "scalatra" % ScalatraVersion  ,
  "org.scalatra" %% "scalatra-scalatest" % ScalatraVersion % "test" ,
  "ch.qos.logback" % "logback-classic" % "1.2.3" % "runtime" ,
  "org.eclipse.jetty" % "jetty-webapp" % "9.4.9.v20180320" % "container" ,
  "javax.servlet" % "javax.servlet-api" % "3.1.0" % "provided"  ,
  "com.sun.jersey" % "jersey-core" % "1.19.4" ,
  "com.sun.jersey" % "jersey-server" % "1.19.4" ,
  "org.apache.spark" % "spark-core_2.11" % "2.3.1",
  "org.apache.spark" %% "spark-sql" % "2.3.1",
  "org.apache.spark" %% "spark-streaming" % "2.3.1" ,
  "org.apache.hbase" % "hbase-spark" % "2.1.0-cdh6.1.1" intransitive() ,
  "org.apache.hbase" % "hbase-server" % "2.1.0-cdh6.1.1"  intransitive() ,
  "org.apache.hbase" % "hbase-mapreduce" % "2.1.0-cdh6.1.1" intransitive() ,
  "org.apache.hbase" % "hbase-common" % "2.1.0" exclude("com.fasterxml.jackson.core", "jackson-databind"),
  "org.apache.hbase" % "hbase-client" % "2.1.0" exclude("com.fasterxml.jackson.core", "jackson-databind")
)

enablePlugins(SbtTwirl)
enablePlugins(ScalatraPlugin)

Ahora vamos hacer un ejemplito, de un metodo get que insertar y consultar : 

   get(s"/testSparkHbase/:key/:value") {
    val conf : Configuration = HBaseContext.getConf()
    // cree la tabla
    // create 'TableTest', 'info'
    // put 'TableTest', 'rowkey1', 'info:test', 'ejemplo'
    val connection = ConnectionFactory.createConnection(conf)

    val hbaseContext = new org.apache.hadoop.hbase.spark.HBaseContext(SparkContext.getSc, conf)

    val tableName = "TableTest"
    val columnFamily = "info"

    val rdd = SparkContext.getSc.parallelize(Array(
      (Bytes.toBytes(params("key")),
        Array((Bytes.toBytes(columnFamily), Bytes.toBytes("test"), Bytes.toBytes(params("value")))))))

    hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
      TableName.valueOf(tableName),
      (putRecord) => {
        val put = new Put(putRecord._1)
        putRecord._2.foreach((putValue) =>
          put.addColumn(putValue._1, putValue._2, putValue._3))
        put
      });

    Ok("ok")
  }

  get(s"/testSparkHbase/:key") {

    val conf : Configuration = HBaseContext.getConf()
    // cree la tabla
    // create 'TableTest', 'info'
    // put 'TableTest', 'rowkey1', 'info:test', 'ejemplo'
    val connection = ConnectionFactory.createConnection(conf)

    val hbaseContext = new org.apache.hadoop.hbase.spark.HBaseContext(SparkContext.getSc, conf)

    val tableName = "TableTest"
    val columnFamily = "info"

    val rdd = SparkContext.getSc.parallelize(Array(Bytes.toBytes(params("key"))))

    val getRdd = rdd.hbaseBulkGet[String](hbaseContext, TableName.valueOf(tableName), 2,
      record => {
        System.out.println("making Get"+ record.toString)
        new Get(record)
      },
      (result: Result) => {

        val it = result.listCells().iterator()
        val b = new StringBuilder

        b.append(Bytes.toString(result.getRow) + ":")

        while (it.hasNext) {
          val cell = it.next()
          val q = Bytes.toString(CellUtil.cloneQualifier(cell))
          if (q.equals("counter")) {
            b.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
          } else {
            b.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
          }
        }
        b.toString()
      })

    getRdd.collect().foreach(v => println(v))

    var result = ListBuffer[String]()

    getRdd.collect().foreach(v => result += v)

    Ok(compact(render(result)))
  }

  }



miércoles, 24 de octubre de 2018

Instalar un master y un nodo con spark.


La idea es instalar un cluster de spark, solo un nodo y un master. 

Primero los dos 2 servers deben verse por medio de ssh para esto sigan este post :  

Luego descargamos spark desde internet: 
wget https://www.apache.org/dyn/closer.lua/spark/spark-2.3.2/spark-2.3.2-bin-hadoop2.7.tgz

Luego desempaquetamos: 

tar -xzvf spark.tar.gz

Luego vamos a conf y copiamos a el template de spark-env pero sin ".template"

cp spark-env.sh.template spark-env.sh

Luego editamos spark-env y agregamos el java home y el ip del master : 

export JAVA_HOME=/path/java-openjdk

export SPARK_MASTER_HOST='xx.xx.xx.xx'

En SPARK_MASTER_HOST va la ip del master (esto hay que hacerlo en los dos servers)

Por ultimo levantamos todos los servicios :

$ ./start-all.sh 

Y listo! Podemos chequear su funcionamiento en el puerto 8080 del master : 


Correr una tarea en un cluster de spark.


Hemos hecho un proyecto con Spark  el cual cuenta palabras pero este utiliza spark como "local" es decir no utiliza un cluster spark.

Pero la potencia de Spark esta en utilizar esto en un Cluster para lo que tenemos que modificar nuestro Context para que permita ejecutar esto en el cluster.

Un cluster Spark tiene un master el cual tiene una ip por ejemplo x.x.x.x con dicha ip vamos a modificar el Context de Spark:

package com.myCompania.app

import org.apache.spark.{SparkConf, SparkContext}

object SparkContext {

  //Create a SparkContext to initialize Spark
  val conf = new SparkConf()
 
  conf.setMaster("spark://x.x.x.x:7077")
  conf.setJars(Seq("path/nombreDelJar.jar"))

  conf.setAppName("Hexacta")
  val sc = new SparkContext(conf)

   def getSc = {
      sc
   }
}

En el puerto 7077 spark publica su servicio, por dicha razon escribimos esta linea:

conf.setMaster("spark://x.x.x.x:7077")

Luego tenemos que agregar el jar de nuestra aplicación porque spark no conoce las clases que utilizamos, y esto lo hacemos con :

conf.setJars(Seq("path/nombreDelJar.jar"))

Y listo!!

domingo, 16 de septiembre de 2018

Concepto de ventana en Apache Spark Streaming

Hemos hecho un ejemplo con Apache Spark Streaming. Pero ahora, consideremos la idea de Windows. En Spark Streaming, tenemos pequeños lotes de datos que ingresan en un rango de tiempo o ventana de tiempo.

Spark hace un lote de los datos entrantes de acuerdo con el intervalo de tiempo, pero a veces debemos recordar cosas del pasado. Por ejemplo podemos querer mantener un promedio de treinta segundos para datos de entrada, pero queremos resultados cada cinco segundos. En este caso, desearía un intervalo de lote de cinco segundos, pero una longitud de ventana de treinta segundos. Spark proporciona varios métodos para realizar este tipo de cálculos.

La solución que tenemos tenemos que usar son funciones de ventana.

Windows nos permite tomar un primer lote y luego un segundo lote y luego un tercer lote y luego crear una ventana de todos los lotes en función del intervalo de tiempo especificado. De esta forma, siempre podemos tener el nuevo RDD y también el historial de los RDD que existieron en la ventana.

La función de ventana más simple es una ventana, que le permite crear una nueva DStream, calculada aplicando los parámetros de ventana al viejo DStream. Puede usar cualquiera de las operaciones de DStream en la nueva transmisión, para que tenga toda la flexibilidad que pueda desear.

Por ejemplo, desea PUBLICAR a todos los usuarios activos de los últimos cinco segundos, pero desea actualizar los resultados cada segundo.

sc = SparkContext(appName="ActiveUsers")
ssc = StreamingContext(sc, 1)
activeUsers = [
    ["Alice", "Bob"],
    ["Bob"],
    ["Carlos", "Dan"],
    ["Carlos", "Dan", "Erin"],
    ["Carlos", "Frank"],
]
rddQueue = []
for datum in activeUsers:
    rddQueue += [ssc.sparkContext.parallelize(datum)]
inputStream = ssc.queueStream(rddQueue)
inputStream.window(5, 1)\
    .map(lambda x: set([x]))\
    .reduce(lambda x, y: x.union(y))\
    .pprint()
ssc.start()
sleep(5)
ssc.stop(stopSparkContext=True, stopGraceFully=True)

Este ejemplo imprime todos los usuarios activos de los últimos cinco segundos, pero lo imprime cada segundo. No es necesario realizar un seguimiento manual del estado, ya que la función de ventana conserva los datos antiguos durante otros cuatro intervalos. La función de ventana le permite especificar la duración de la ventana y la duración de la diapositiva, o la frecuencia con la que desea que se calcule una nueva ventana.

sábado, 25 de agosto de 2018

Haciendo un proyecto con scalatra y spark


Vamos a hacer un proyecto en Scala con Scalatra y Spark. La idea es hacer una Api Rest, la cual utilice Spark.

Lo primero que hacemos es el proyecto de scalatra como esta indicado aquí :
https://emanuelpeg.blogspot.com/2018/08/haciendo-un-proyecto-con-scalatra.html

Ya teniendo este proyecto agregamos las dependencias de spark y de jersey (este ultimo es necesario para el funcionamiento de los frameworks)

Es decir el buil.sbt nos va quedar así:

val ScalatraVersion = "2.6.3"

organization := "com.hexacta"

name := "Sparklatra"

version := "0.1.0-SNAPSHOT"

scalaVersion := "2.11.0"

resolvers += Classpaths.typesafeReleases

libraryDependencies ++= Seq(
  "org.scalatra" %% "scalatra" % ScalatraVersion  ,
  "org.scalatra" %% "scalatra-scalatest" % ScalatraVersion % "test" ,
  "ch.qos.logback" % "logback-classic" % "1.2.3" % "runtime" ,
  "org.eclipse.jetty" % "jetty-webapp" % "9.4.9.v20180320" % "container" ,
  "javax.servlet" % "javax.servlet-api" % "3.1.0" % "provided"  ,
  "com.sun.jersey" % "jersey-core" % "1.19.4" ,
  "com.sun.jersey" % "jersey-server" % "1.19.4" ,
  "org.apache.spark" %% "spark-core" % "2.3.1"
)

enablePlugins(SbtTwirl)
enablePlugins(ScalatraPlugin)

Luego de agregar estas dependencias vamos a hacer un objeto que contenga el contexto de Spark, dado que queremos utilizar un contexto en toda nuestra aplicación y lo hacemos de la siguiente manera :

package com.miEmpresa

import org.apache.spark.{SparkConf, SparkContext}

object SparkContext {

  //Create a SparkContext to initialize Spark
  val conf = new SparkConf()
  conf.setMaster("local")
  conf.setAppName("Word Count")
  val sc = new SparkContext(conf)

   def getSc = {
      sc
   }
}

Luego de hacer esto, vamos a servlet de scalatra y agregamos el método get que cuente palabras pasadas como parámetro en la url :

   get(s"/contar/:str") {

    //word count
    val counts = List({
      params("str")
    }).flatMap(line => line.split(" "))
      .map(word => (word, 1))

    val countsRdd = SparkContext.getSc.parallelize(counts).reduceByKey(_ + _).collect()

    var result = ListBuffer[(String,Int)]()

    countsRdd.foreach(line => result += line)

    Ok(compact(render(result)))

  }

En el get definimos que la url va a contener un string que es nuestro parámetro str. Luego hacemos un split por espacios en blanco y luego hacemos un mapa (palabra, 1).  Como ultimo paso convertimos nuestro map a RDD y reducimos por palabra. Es decir el proceso sería para la linea "hola hola mundo" de la siguiente manera :

 "hola hola mundo" -> split(" ") = ("hola","hola","mundo") -> map = ( ("hola", 1) , ("hola", 1) , ("mundo", 1) ) -> reduceByKey = ("hola",2), ("mundo",1)

Para probarlo primero debemos ejecutarlo con sbt y jetty. Vamos al directorio donde se encuentra el archivo build.sbt y escribimos:
sbt
--- Acá va correr un montón de cosas ---
jetty:start

Por ultimo, debemos ir a un browser y probar la url : http://localhost:8080/contar/

Como se envían los datos por el método get podemos probar con la siguiente url, por ejemplo:

"http://localhost:8080/contar/hola hola hola Mundo"

Y listo!!

domingo, 12 de agosto de 2018

Ejemplo con Java y Spark


Vamos hacer un pequeño ejemplo, una aplicación Java que utilice Spark para contar palabras. La aplicación calculara cuantas veces aparece una palabra en un archivo de texto.

Para hacer esto vamos a utilizar eclipse y maven.

Primero creamos un proyecto simple de maven, para lo cual podemos utilizar la consola, con el siguiente comando:

mvn archetype:generate
-DarchetypeGroupId=org.apache.maven.archetypes
-DarchetypeArtifactId=maven-archetype-quickstart
-DarchetypeVersion=RELEASE
-DgroupId=org.miOrganizacion
-DartifactId=nombreDelProyecto

o con eclipse, vamos a File -> New -> Project -> Maven -> Maven Project y luego elegimos es arquetipo "maven-archetype-quickstart" y luego completamos los campos groupId y artifactId. Y listo!

Ahora vamos agregar spark en las dependencias en el pom.xml :

  <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.1</version>
</dependency>

Y vamos a indicarle que utilizaremos java 8 con la siguiente lineas :

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

El pom.xml queda de la siguiente manera:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.hexacta</groupId>
<artifactId>testSpark</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>testSpark</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.1</version>
</dependency>


<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>


</dependencies>
</project>

Y ahora, a programa!!.

Vamos hacer el ejemplo de contar palabras de un archivo de texto. Para lo cual haremos un objeto llamado "App.java" : 

package com.hexacta.testSpark;

import java.util.Arrays;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import scala.Tuple2;

public class App {

public static void main(String[] args) {
// Create a SparkContext to initialize
SparkConf conf = new SparkConf().setMaster("local").setAppName("Word Count");

// Create a Java version of the Spark Context
JavaSparkContext sc = new JavaSparkContext(conf);

// Load the text into a Spark RDD, which is a distributed representation of each
// line of text
JavaRDD<String> textFile = sc.textFile("/java/spark/artists.csv");

JavaPairRDD<String, Integer> counts = textFile
                                .flatMap(s -> Arrays.asList(s.split("[ ,]")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1)).reduceByKey((a, b) -> a + b);
counts.foreach(p -> System.out.println(p));
System.out.println("Total words: " + counts.count());
counts.saveAsTextFile("/java/spark/artistsCount.txt");
}
}

Dejo el repo git :
https://github.com/emanuelpeg/client-java-spark

Ejemplo con Scala y Spark


Vamos hacer un pequeño ejemplo, una aplicación Scala que utilice Spark para contar palabras. La aplicación calculara cuantas veces aparece una palabra en un archivo de texto.

Para hacer esto vamos a utilizar Intellij Idea. Primero vamos a bajar intellij idea community edition porque somos muy pobres.

Luego instalamos el plugin de Scala. Eso lo hacemos yendo a file->settings->plugins y ahí instalamos  el plugin de scala :



Luego debemos hacer un nuevo proyecto, vamos a elegir un proyecto sbt :



Luego agregamos esta dependencia en el archivo build.sbt :

libraryDependencies ++= {
  val sparkVer = "2.1.0"
  Seq(
    "org.apache.spark" %% "spark-core" % sparkVer
  )
}

Quedando de la siguiente manera :

name := "cliente-spark-scala"

version := "0.1"

scalaVersion := "2.11.0"

libraryDependencies ++= {
  val sparkVer = "2.1.0"
  Seq(
    "org.apache.spark" %% "spark-core" % sparkVer
  )
}

Luego de cambiar este archivo el IDE nos preguntará si importa las librerías o siempre importa las librerías. En este caso, elegimos cualquiera de los dos :



Y ahora, a programa!!.

Vamos hacer el ejemplo de contar palabras de un archivo de texto. Para lo cual haremos un objeto llamado "ScalaExample" :

object ScalaExample {

  import org.apache.spark.{SparkConf, SparkContext}

  def main(args: Array[String]): Unit = {
    //Create a SparkContext to initialize Spark
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Word Count")
    val sc = new SparkContext(conf)

    // Load the text into a Spark RDD, which is a distributed representation of each line of text
    val textFile = sc.textFile("/java/spark/customers.txt")

    //word count
    val counts = textFile.flatMap(line => line.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _)

    counts.foreach(println)
    System.out.println("Total words: " + counts.count());
   // counts.saveAsTextFile("/java/spark/customersCount.txt");
  }

}

Dejo el repo git : 

lunes, 6 de agosto de 2018

Data Science and Cognitive Computing Courses


Quiero compartir la siguiente página que permite hacer cursos de machine learning, Apache Spark, SQL, Python, R, Data science, Reactive, Scala, Big Data, etc...

Y todos de forma gratuita!

Dejo link: https://cognitiveclass.ai

jueves, 2 de agosto de 2018

Un ejemplo utilizando Apache Spark Streaming


Lo que vamos a hacer es un proceso que escuche lo que escribimos con "nc" en el puerto 9999. Y luego vamos a contar, es decir vamos a contar las palabras.

Iniciamos spark-shell :

./spark-shell

Luego debemos importar todos los objetos para hacer streaming:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

//Ahora hacemos el contexto de streaming, utilizando el contexto de mi aplicación. A la vez //especificamos el periodo de tiempo que se debe procesar los datos:

val ssc = new StreamingContext(sc, Seconds(10))

// Creamos el DStream para localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

// indicamos las tareas a realizar con los datos.
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKey(_ + _)

// Imprima los primeros elementos de cada RDD generado en este DStream para la consola

wordCounts.print()

ssc.start()             // comenzamos
ssc.awaitTermination()  // Espere a que termine el cálculo

Y listo, ahora si vamos a otra consola y escribimos :

nc -lk 9999

Este comando va a escribir lo escribimos en el puerto 9999 y nuestra aplicación va a leerlo y contarlo.

Y eso es todo!!!



miércoles, 11 de julio de 2018

Apache Spark MLlib parte 4

Continuamos!!!

Vamos a ver un ejemplo utilizando el algoritmo Kmeans que es uno de los algoritmos más fáciles de clasificación.

K-means es un método de agrupamiento, que tiene como objetivo la partición de un conjunto de n observaciones en k grupos en el que cada observación pertenece al grupo cuyo valor medio es más cercano.

Si bien se puede indicar que es una técnica utilizada por minería de datos más que para Maching learning, vamos a empezar de a poco.

Empecemos viendo los datos, los cuales supongamos ya hemos limpiado :

0 1:0.0 2:0.0 3:0.0
1 1:0.1 2:0.1 3:0.1
2 1:0.2 2:0.2 3:0.2
3 1:9.0 2:9.0 3:9.0
4 1:9.1 2:9.1 3:9.1
5 1:9.2 2:9.2 3:9.2

Como se puede ver son 4 columnas, de la cuales la primera es un id (identificador) y las demás están compuestas por un indice y un dato, por ejemplo 1:0.0 (el indice es 1 y el dato 0.0 el cual es continuo)

Esto se encuentra en el archivo sample_kmeans_data.txt

Empecemos tomando los datos y llevándolos a un RDD :

//creamos la aplicación de spark si utilizamos spark-shell esto no es necesario.
val spark = SparkSession.builder.appName("KMeansExample") .getOrCreate()

//Leemos el archivo con el formato libsvm
val dataset = spark.read.format("libsvm").load("path/sample_kmeans_data.txt")

//Probamos que se leyeron los datos, si estamos en spark-shell
dataset.show()
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|           (3,[],[])|
|  1.0|(3,[0,1,2],[0.1,0...|
|  2.0|(3,[0,1,2],[0.2,0...|
|  3.0|(3,[0,1,2],[9.0,9...|
|  4.0|(3,[0,1,2],[9.1,9...|
|  5.0|(3,[0,1,2],[9.2,9...|
+-----+--------------------+


// Entrenamos nuestro modelo
import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(dataset)

// Evaluamos la agrupación mediante el cómputo dentro de la suma establecida de errores cuadrados.
val WSSSE = model.computeCost(dataset)
println(s"Within Set Sum of Squared Errors = $WSSSE")

// Mostramos el resultado
println("Cluster Centers: ")
model.clusterCenters.foreach(println)
[0.1,0.1,0.1]
[9.1,9.1,9.1]

Para los que como yo no entendieron que paso el ultimo momento... Reiteramos!

Dado un conjunto de observaciones (x1, x2, …, xn), donde cada observación es un vector real de d dimensiones, k-means construye una partición de las observaciones en k conjuntos (k ≤ n) a fin de minimizar la suma de los cuadrados dentro de cada grupo (WCSS): S = {S1, S2, …, Sk}

Si vemos la Wikipedia : 

Paso de asignación: Asigna cada observación al grupo con la media más cercana (es decir, la partición de las observaciones de acuerdo con el diagrama de Voronoi generado por los centroides).
Donde cada  va exactamente dentro de un , incluso aunque pudiera ir en dos de ellos.
Paso de actualización: Calcular los nuevos centroides como el centroide de las observaciones en el grupo.
El algoritmo se considera que ha convergido cuando las asignaciones ya no cambian.

Grafícado : 

Si hablamos en criollo el algoritmo trata de hacer grupos y calcular su centro, y va moviendo el centro según los datos hasta que estos centros no cambian y ahí finaliza.





lunes, 9 de julio de 2018

Apache Spark MLlib parte 3


Continuamos...


La detección de fraude es otro caso de uso importante del aprendizaje automático. Aborda un problema crítico en la industria financiera de forma rápida y precisa.

Las organizaciones de servicios financieros solo tienen unos cientos de milisegundos para determinar si una transacción en línea en particular es legítima o es un fraude.

Las técnicas de redes neuronales se utilizan para la detección de fraudes en puntos de venta (POS). Las organizaciones como PayPal utilizan diferentes tipos de algoritmos de aprendizaje automático para la gestión de riesgos, como la red lineal, neuronal y el aprendizaje profundo.

La biblioteca Spark MLlib proporciona varios algoritmos para resolver este caso de uso, incluidos SVM lineales, regresión logística, árboles de decisión y Bayes sencillos. Los modelos de conjunto (que combinan las predicciones de un conjunto de modelos) como los bosques aleatorios o los árboles que aumentan el gradiente también están disponibles …

MLlib es la biblioteca de aprendizaje automático de Spark. Su objetivo es hacer que el aprendizaje automático práctico sea escalable y fácil. Consiste en algoritmos y utilidades de aprendizaje comunes, que incluyen clasificación, regresión, clustering, filtrado colaborativo, reducción de dimensionalidad y primitivas de optimización de nivel inferior y API de canalizaciones de nivel superior.

Como aprendimos anteriormente, hay dos formas de utilizar la API de Aprendizaje Automático de Spark: Spark MLlib y spark.ml.

La API Spark MLlib está disponible en los lenguajes de programación Scala, Java y Python.

Continuara...

Apache Spark MLlib parte 2


 Continuamos!!

Cuando se trabaja en proyectos de aprendizaje automático la preparación de datos, la limpieza y el análisis, también son tareas importantes más allá de los modelos de aprendizaje reales y los algoritmos utilizados para resolver los problemas de negocios.

Los siguientes son los pasos que se realizan en un programa típico de aprendizaje automático:

Caracterización (featurization) del modelo.
entrenar el modelo.
evaluación modelo.

Es importante saber que se debe limpiar y preparar los datos antes de ejecutar un algoritmo de aprendizaje automático, de lo contrario el patrón resultante no será preciso ni útil, y puede pasar por alto algunas anomalías.

La calidad de los datos de entrenamiento que proporcionamos a los programas de aprendizaje automático también juega un papel fundamental en los resultados de predicción. Si los datos de entrenamiento no son lo suficientemente aleatorios, los patrones resultantes no serán precisos. Y si el conjunto de datos es demasiado pequeño, el programa de aprendizaje automático puede dar predicciones inexactas.

Los casos de uso comerciales para aprendizaje automático abarcan diferentes dominios y escenarios, incluidos motores de recomendación (como este motor de recomendación de alimentos), análisis predictivo (por ejemplo, predicción de precios de acciones o retrasos en los vuelos), publicidad dirigida, detección de fraude, reconocimiento de imágenes y videos -conducir automóviles y otras formas de inteligencia artificial.

Veamos dos aplicaciones populares, un motor de recomendación y detección de fraude, en más detalle.

Los motores de recomendación utilizan los atributos de un elemento o un usuario o el comportamiento de un usuario o sus compañeros para hacer predicciones. Diferentes factores impulsan un motor de recomendación eficaz. Algunos de estos factores incluyen:
  • análisis por pares.
  • comportamiento del cliente.
  • ofertas u ofertas corporativas.
  • agrupamiento de elementos.
  • factores de mercado / tienda.


Podemos construir un motor de recomendación mediante la participación de dos algoritmos: filtrado basado en contenido y filtrado colaborativo.

El filtrado basado en contenido se basa en cuán similares son el uso y las calificaciones de un artículo en particular a otros artículos. El modelo usa los atributos de contenido de los elementos (como categorías, etiquetas, descripciones y otros datos) para generar una matriz que relacione cada elemento con otros elementos y calcule la similitud en función de las clasificaciones proporcionadas. Luego, los elementos más similares se enumeran junto con un puntaje de similitud. Los artículos con el puntaje más alto son los más similares.

La recomendación de la película es un buen ejemplo de este modelo. Puede aconsejar que a los usuarios a los que les haya gustado una película en particular también les hayan gustado estas otras películas.

El filtrado basado en contenido no tiene en cuenta el comportamiento general de otros usuarios, por lo que sus modelos no ofrecen recomendaciones personalizadas, como el filtrado colaborativo y otros modelos.

Por otro lado, los modelos de filtrado colaborativo predicen y recomiendan elementos específicos o usuarios en función de la similitud con otros elementos o usuarios.

El filtro aplica ponderaciones basadas en las preferencias del "usuario par". La suposición es que los usuarios con perfiles o comportamientos similares también tendrán preferencias de elementos similares.

Un ejemplo de este modelo son las recomendaciones sobre sitios web de comercio electrónico como Amazon. Cuando buscamos un artículo en el sitio web, vemos una lista llamada "Los clientes que vieron este artículo también compraron".

Los elementos con el puntaje de recomendación más alto son los más relevantes para el usuario en contexto.

Las soluciones de filtrado colaborativo funcionan mejor que otros modelos. Spark MLlib implementa un algoritmo de filtrado colaborativo llamado Alternating least squares (ALS). Hay dos variaciones de las entradas en el filtrado colaborativo, llamadas realimentación explícita e implícita.

La retroalimentación explícita se basa en las preferencias directas otorgadas por el usuario al elemento (como una película). La retroalimentación explícita es agradable, pero muchas veces es sesgada porque los usuarios a quienes les gusta o no les gusta mucho un producto tienden a opinar con más frecuencia que los que les es indiferente. Por lo tanto podemos no obtener la opinión de muchas personas en el centro de la curva.

La retroalimentación implícita incluye las vistas del usuario, los clics, los "me gusta", etc. La retroalimentación implícita a menudo se utiliza para el análisis predictivo debido a lo fácil que es recopilar este tipo de datos.

También hay métodos basados en modelos para motores de recomendación. A menudo incorporan métodos de filtrado colaborativo y basado en contenido. Un enfoque basado en modelos obtiene lo mejor de ambos mundos: el poder y el rendimiento del filtrado colaborativo y la flexibilidad y adaptabilidad del filtrado basado en el contenido. Las técnicas de aprendizaje profundo o deep learning son buenos ejemplos de este modelo.

También puede integrar otros algoritmos como k-means clustering en el motor de recomendación para refinar aún más las predicciones. El algoritmo k-means funciona al dividir N observaciones en k clústeres en los que cada observación pertenece al clúster con la media más cercana. Usando la técnica de k-means, podemos encontrar elementos similares o usuarios en función de sus atributos.


Como deben adivinar:

continuará...

domingo, 8 de julio de 2018

Apache Spark MLlib

Apache Spark Mllib incluye diferentes algoritmos de Machine Learning. Estas librerías se encuentran en 2 paquetes : spark.mllib  y  spark.ml

spark.mllib contiene la API original de spark construida sobre RDDs. Estos algoritmos incluyen correlación, clasificación y regresión, filtrado colaborativo, clustering y reducción de dimensionalidad.

spark.ml contiene la API construida sobre Dataframes, el cual es el core de Spark SQL. Esto puede ser utilizado para hacer una tubería de maching learning, es decir combinar técnicas o limpiar los datos y luego procesarlos. Este paquete provee: selectores, transformadores, extractores y técnicas de maching learning como clasificación, regresión y clustering.

La ciencia de datos o Data science es la disciplina que extraer el conocimiento de grandes conjuntos de datos (estructurados o no estructurados) para proporcionar información a los equipos de negocios e influir en las estrategias comerciales. El papel del científico de datos es resolver problemas que no son fáciles de resolver utilizando los métodos numéricos tradicionales. Normalmente estos cientificos de datos utilizan modelos de aprendizaje automático.

Existen diferentes tipos de modelos de aprendizaje automático:
  • aprendizaje supervisado,
  • aprendizaje sin supervisión,
  • aprendizaje semi-supervisado
  • aprendizaje reforzado.
Entre los algoritmos soportados por Spark MLlib y utilizados por los científicos de datos, podemos nombrar:


  • Naive Bayes es un algoritmo de aprendizaje supervisado utilizado para la clasificación. Se basa en la aplicación del teorema de Bayes y un conjunto de suposiciones de independencia condicional.
  • El algoritmo k-means o k-means clustering crea k grupos a partir de un conjunto de objetos para que los miembros de cada grupo sean más similares entre sí.
  • Una máquina de vectores de soporte (SVM) es un algoritmo de aprendizaje supervisado que se utiliza para encontrar el límite que separa las clases por un margen tan amplio como sea posible. Dado un conjunto de ejemplos de entrenamiento, cada uno marcado como perteneciente a una de dos categorías, un algoritmo de entrenamiento SVM construye un modelo que asigna nuevos ejemplos en una categoría u otra. Las aplicaciones de SVM incluyen bioinformática, análisis de texto y reconocimiento de imágenes.
  • Los árboles de decisión se utilizan en muchos tipos de problemas de aprendizaje automático, incluida la clasificación multiclase. Spark MLlib es compatible tanto con un algoritmo de árbol de decisión básico como con conjuntos de árboles. Hay dos algoritmos de conjunto disponibles: árboles con gradiente mejorado y bosques aleatorios.
Como una primer vistazo, esta bien hasta aquí, pero debemos seguir en próximos posts, es decir : 

continuará...