domingo, 15 de abril de 2018

Ejemplo de una aplicación en Spark


La aplicación de muestra en esta sección es una aplicación simple para contar palabras. Este es el mismo ejemplo que se utiliza para enseñar el procesamiento de big data con Hadoop. Realizaremos algunos análisis de datos en un archivo de texto para contar cuántas palabras hay en el archivo y cuántas veces se repiten. El archivo de texto y el conjunto de datos en este ejemplo son pequeños, pero los mismos programas Spark se pueden usar para grandes conjuntos de datos sin modificaciones de código. De forma similar a Hadoop, el entorno de tiempo de ejecución Spark distribuirá automáticamente los datos a diferentes nodos en el clúster para un procesamiento de datos más rápido.
Para mantener el ejemplo simple, usaremos el shell Spark Scala.
Primero, instalemos Spark en nuestra máquina local, esto ya lo vimos en post pasados.
Con spark instalado y listo, podemos utilizar el API de stark para procesar archivos. Por lo tanto abrimos el shell de scala (yo voy a utilizar linux)
Me paro en la carpeta bin de spark y escribo:

$ ./spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/04/15 11:04:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/04/15 11:04:39 WARN Utils: Your hostname, toto resolves to a loopback address: 127.0.1.1; using 172.17.0.1 instead (on interface docker0)
18/04/15 11:04:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Spark context Web UI available at http://172.17.0.1:4040
Spark context available as 'sc' (master = local[*], app id = local-1523801082307).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
      /_/
       
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_161)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext

scala> import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext._

scala> val txtFile = "../NOTICE"
txtFile: String = ../NOTICE

scala> val txtData = sc.textFile(txtFile)
txtData: org.apache.spark.rdd.RDD[String] = ../NOTICE MapPartitionsRDD[1] at textFile at <console>:30

scala> txtData.cache()
res2: txtData.type = ../NOTICE MapPartitionsRDD[1] at textFile at <console>:30

scala> txtData.count()
res3: Long = 661                 

De esta manera sabemos la cantidad de lineas de archivo “NOTICE”, si queremos saber la cantidad de veces que aparece las palabras podemos hacer:

scala> val wcData = txtData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
wcData: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:36

scala> wcData.collect().foreach(println)
(created,1)
(Unless,4)
(Technology,1)
(Sébastien,1)
...


Lo que hace en la primera linea es :
  • Divide una linea por espacio “ ”
  • Luego crea una tupla (palabra, 1), imaginate que queda algo así: (palabra1,1) , (palabra2,1), (palabra1,1), (palabra1,1)
  • Por lo tanto aplica una reducción por key es decir va sumando cuantas veces aparecen las palabras agrupándolos por key y queda de la siguiente manera: (palabra1,3) , (palabra2,1)


En la segunda linea imprimirme el resultado.

Y Listo!!