domingo, 20 de mayo de 2018

DataFrame en Spark SQL


En lenguajes de programación como R, hay una abstracción que es utilizada para almacenar tablas de datos en la memoria. La biblioteca de análisis de datos de Python, llamada Pandas, también tiene un concepto similar. El mismo concepto de tabla de datos se extiende a Spark, conocido como DataFrame, construido sobre RDD, y hay una API muy completa conocida como API DataFrame en Spark SQL para procesar los datos en el DataFrame. También se desarrolló un lenguaje de consulta similar a SQL sobre la abstracción de DataFrame, atendiendo a las necesidades de los usuarios finales para consultar y procesar los datos.

La diferencia clave entre RDD y DataFrame es que DataFrame almacena mucha más información sobre la estructura de los datos, como los tipos de datos y los nombres de las columnas, que el RDD. Esto permite que el DataFrame optimice el procesamiento de forma mucho más efectiva que las transformaciones Spark y las acciones Spark que procesan en RDD. El otro aspecto más importante para mencionar aquí es que todos los lenguajes de programación compatibles de Spark se pueden usar para desarrollar aplicaciones utilizando la API DataFrame de Spark SQL.

Para todos los propósitos prácticos, Spark SQL es un motor SQL distribuido. Que usa esta abstracción para manipular datos que pueden provenir de diferentes orígenes de datos.

Vamos a construir un dataframe desde una base de datos relacional y un archivo y luego vamos a combinarlos:

import org.apache.spark.sql.SQLContext

//Creo el contexto Spark sql
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

//genero el jdbc
val url = "jdbc:mysql://127.0.0.1:3306/amarokdb"

//me conecto a la base y traigo los albunes
val albums = spark.read.format("jdbc").option("url", url).option("dbtable", "albums").option("user", "root").option("password","pass").load()

albums.printSchema() // Imprimo el esquema para probar
albums.registerTempTable("albums") //Registro la lista.

import sqlContext.implicits._

// Creo una clase artista que para leer el archivo
case class Artist(album_id: Int, id: Int, name: String)

// Leo el archivo y creo un DataFrame de artistas
val artists = sc.textFile("/java/spark/artists.csv").map(_.split(",")).map(p => Artist(p(0).trim.toInt, p(1).trim.toInt, p(2))).toDF()

//registro la tabla
artists.registerTempTable("artists")

//Genero un dataframe con un join entre los 2 dataframes y lo imprimo
spark.sql(“select * from albums a, artists aa where a.artist = aa.id”).show

Y eso es todo amigos!!!