Translate

jueves, 25 de julio de 2019

Mezclando datos de diferentes almacenes de datos con Apache Spark SQL


Vamos a hacer un ejemplo con Apache Spark SQL. Algo simple, vamos a tomar algunas tablas de una base de datos relacional y vamos hacer unas consultas y luego vamos a importar datos de un csv para tambien hacer consultas:

//Primero importamos la clase de SQLContext y creamos el contexto.

import org.apache.spark.sql.SQLContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

//Vamos a importar los datos de la base de datos, en este caso la tabla "actor"

val url = "jdbc:mysql://0.0.0.0:3306/sakila"

val dfActor = spark.read.format("jdbc").option("url", url)
.option("dbtable", "actor")
.option("user", "sparkDb")
.option("password","sparkDb").load()

//Ya tenemos la tabla en el dataframe ahora vamos a imprimir el esquema y ver los datos.

dfActor.printSchema()
dfActor.show()
dfActor.count()

//Consultamos el campo nombre

dfActor.select("first_name").show()

//Registramos la tabla para poder consultarla con SQL

dfActor.registerTempTable("actors")

// Y consultamos

spark.sql("select first_name, last_name from actors").show

//Vamos a traernos una tabla relación entre el actor y el film

val dfActorFilms = spark.read.format("jdbc").option("url", url).option("dbtable", "film_actor").option("user", "sparkDb").option("password","sparkDb").load()

//Registro la tabla temporal

dfActorFilms.registerTempTable("actors_films")

//Selecciono los datos del actor y en cuantas peliculas actuo

spark.sql("select a.first_name, a.last_name, count(af.film_id) from actors a join actors_films af on (a.actor_id=af.actor_id) group by a.first_name, a.last_name").show

// Ahora vamos a traernos los datos del film pero desde un archivo csv
// Creamos una clase que me va hacer util leer los datos de forma estructurada.

case class Film(film_id: Int, title: String, description: String, release_year: Int)

// Leemos el archivo CSV :

val dfFilms = sc.textFile("/home/spark/films.csv").map(_.split(",")).map(p => Film(p(0).trim.toInt, p(1), p(2), p(3).trim.toInt)).toDF()

// Vemos que tiene el archivo

dfFilms.show

//Registramos el dataframe para poder hacer consultas sobre él

dfFilms.registerTempTable("films")

//Realizamos una consulta con joins al datafame actos_films para saber cuantos actores trabajaron en los films.

val dfFilmsCountActors = spark.sql("select f.title, count(af.film_id) from films f join actors_films af on (f.film_id=af.film_id) group by f.title")

//Registramos el dataframe como una tabla temporal

dfFilmsCountActors.registerTempTable("films_Count_Actors")

//Hacemos una consulta sobre esa tabla con filtro.

spark.sql("select * from films_Count_Actors fca where fca.title like '%SANTA%' ").show

//Por ultimo exportamos este resultado.

dfFilmsCountActors.coalesce(1).write.csv("/home/spark/filmsCount.csv")

Fin!

La idea era poder hacer un conjunto de acciones que nos sirvan como ejemplo de las cosas que se pueden hacer con Spark SQL. Espero que les sirva!!