Translate
jueves, 25 de julio de 2019
Mezclando datos de diferentes almacenes de datos con Apache Spark SQL
Vamos a hacer un ejemplo con Apache Spark SQL. Algo simple, vamos a tomar algunas tablas de una base de datos relacional y vamos hacer unas consultas y luego vamos a importar datos de un csv para tambien hacer consultas:
//Primero importamos la clase de SQLContext y creamos el contexto.
import org.apache.spark.sql.SQLContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
//Vamos a importar los datos de la base de datos, en este caso la tabla "actor"
val url = "jdbc:mysql://0.0.0.0:3306/sakila"
val dfActor = spark.read.format("jdbc").option("url", url)
.option("dbtable", "actor")
.option("user", "sparkDb")
.option("password","sparkDb").load()
//Ya tenemos la tabla en el dataframe ahora vamos a imprimir el esquema y ver los datos.
dfActor.printSchema()
dfActor.show()
dfActor.count()
//Consultamos el campo nombre
dfActor.select("first_name").show()
//Registramos la tabla para poder consultarla con SQL
dfActor.registerTempTable("actors")
// Y consultamos
spark.sql("select first_name, last_name from actors").show
//Vamos a traernos una tabla relación entre el actor y el film
val dfActorFilms = spark.read.format("jdbc").option("url", url).option("dbtable", "film_actor").option("user", "sparkDb").option("password","sparkDb").load()
//Registro la tabla temporal
dfActorFilms.registerTempTable("actors_films")
//Selecciono los datos del actor y en cuantas peliculas actuo
spark.sql("select a.first_name, a.last_name, count(af.film_id) from actors a join actors_films af on (a.actor_id=af.actor_id) group by a.first_name, a.last_name").show
// Ahora vamos a traernos los datos del film pero desde un archivo csv
// Creamos una clase que me va hacer util leer los datos de forma estructurada.
case class Film(film_id: Int, title: String, description: String, release_year: Int)
// Leemos el archivo CSV :
val dfFilms = sc.textFile("/home/spark/films.csv").map(_.split(",")).map(p => Film(p(0).trim.toInt, p(1), p(2), p(3).trim.toInt)).toDF()
// Vemos que tiene el archivo
dfFilms.show
//Registramos el dataframe para poder hacer consultas sobre él
dfFilms.registerTempTable("films")
//Realizamos una consulta con joins al datafame actos_films para saber cuantos actores trabajaron en los films.
val dfFilmsCountActors = spark.sql("select f.title, count(af.film_id) from films f join actors_films af on (f.film_id=af.film_id) group by f.title")
//Registramos el dataframe como una tabla temporal
dfFilmsCountActors.registerTempTable("films_Count_Actors")
//Hacemos una consulta sobre esa tabla con filtro.
spark.sql("select * from films_Count_Actors fca where fca.title like '%SANTA%' ").show
//Por ultimo exportamos este resultado.
dfFilmsCountActors.coalesce(1).write.csv("/home/spark/filmsCount.csv")
Fin!
La idea era poder hacer un conjunto de acciones que nos sirvan como ejemplo de las cosas que se pueden hacer con Spark SQL. Espero que les sirva!!