Mostrando las entradas con la etiqueta Apache Spark SQL. Mostrar todas las entradas
Mostrando las entradas con la etiqueta Apache Spark SQL. Mostrar todas las entradas

jueves, 25 de julio de 2019

Mezclando datos de diferentes almacenes de datos con Apache Spark SQL


Vamos a hacer un ejemplo con Apache Spark SQL. Algo simple, vamos a tomar algunas tablas de una base de datos relacional y vamos hacer unas consultas y luego vamos a importar datos de un csv para tambien hacer consultas:

//Primero importamos la clase de SQLContext y creamos el contexto.

import org.apache.spark.sql.SQLContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

//Vamos a importar los datos de la base de datos, en este caso la tabla "actor"

val url = "jdbc:mysql://0.0.0.0:3306/sakila"

val dfActor = spark.read.format("jdbc").option("url", url)
.option("dbtable", "actor")
.option("user", "sparkDb")
.option("password","sparkDb").load()

//Ya tenemos la tabla en el dataframe ahora vamos a imprimir el esquema y ver los datos.

dfActor.printSchema()
dfActor.show()
dfActor.count()

//Consultamos el campo nombre

dfActor.select("first_name").show()

//Registramos la tabla para poder consultarla con SQL

dfActor.registerTempTable("actors")

// Y consultamos

spark.sql("select first_name, last_name from actors").show

//Vamos a traernos una tabla relación entre el actor y el film

val dfActorFilms = spark.read.format("jdbc").option("url", url).option("dbtable", "film_actor").option("user", "sparkDb").option("password","sparkDb").load()

//Registro la tabla temporal

dfActorFilms.registerTempTable("actors_films")

//Selecciono los datos del actor y en cuantas peliculas actuo

spark.sql("select a.first_name, a.last_name, count(af.film_id) from actors a join actors_films af on (a.actor_id=af.actor_id) group by a.first_name, a.last_name").show

// Ahora vamos a traernos los datos del film pero desde un archivo csv
// Creamos una clase que me va hacer util leer los datos de forma estructurada.

case class Film(film_id: Int, title: String, description: String, release_year: Int)

// Leemos el archivo CSV :

val dfFilms = sc.textFile("/home/spark/films.csv").map(_.split(",")).map(p => Film(p(0).trim.toInt, p(1), p(2), p(3).trim.toInt)).toDF()

// Vemos que tiene el archivo

dfFilms.show

//Registramos el dataframe para poder hacer consultas sobre él

dfFilms.registerTempTable("films")

//Realizamos una consulta con joins al datafame actos_films para saber cuantos actores trabajaron en los films.

val dfFilmsCountActors = spark.sql("select f.title, count(af.film_id) from films f join actors_films af on (f.film_id=af.film_id) group by f.title")

//Registramos el dataframe como una tabla temporal

dfFilmsCountActors.registerTempTable("films_Count_Actors")

//Hacemos una consulta sobre esa tabla con filtro.

spark.sql("select * from films_Count_Actors fca where fca.title like '%SANTA%' ").show

//Por ultimo exportamos este resultado.

dfFilmsCountActors.coalesce(1).write.csv("/home/spark/filmsCount.csv")

Fin!

La idea era poder hacer un conjunto de acciones que nos sirvan como ejemplo de las cosas que se pueden hacer con Spark SQL. Espero que les sirva!!

martes, 23 de julio de 2019

Como puedo exportar un dataframe a un archivo en Apache Spark??


Tuve que exportar un dataframe a un archivo csv es spark y con scala, como no sabia lo tuve que buscar y ahora te lo comparto. En Spark 2 simplemente podemos hacer lo siguiente:

df.write.csv ("/la/carpeta/donde/queremos/que/este/el/archivo")

Si deseamos asegurarnos que el archivo sea uno, es decir que ya no estén particionados, agregue un .coalesce (1) de la siguiente manera;

df.coalesce (1) .write.csv ("/la/carpeta/donde/queremos/que/este/el/archivo")

Espero que les sea de ayuda!!

lunes, 6 de agosto de 2018

Data Science and Cognitive Computing Courses


Quiero compartir la siguiente página que permite hacer cursos de machine learning, Apache Spark, SQL, Python, R, Data science, Reactive, Scala, Big Data, etc...

Y todos de forma gratuita!

Dejo link: https://cognitiveclass.ai

domingo, 20 de mayo de 2018

DataFrame en Spark SQL


En lenguajes de programación como R, hay una abstracción que es utilizada para almacenar tablas de datos en la memoria. La biblioteca de análisis de datos de Python, llamada Pandas, también tiene un concepto similar. El mismo concepto de tabla de datos se extiende a Spark, conocido como DataFrame, construido sobre RDD, y hay una API muy completa conocida como API DataFrame en Spark SQL para procesar los datos en el DataFrame. También se desarrolló un lenguaje de consulta similar a SQL sobre la abstracción de DataFrame, atendiendo a las necesidades de los usuarios finales para consultar y procesar los datos.

La diferencia clave entre RDD y DataFrame es que DataFrame almacena mucha más información sobre la estructura de los datos, como los tipos de datos y los nombres de las columnas, que el RDD. Esto permite que el DataFrame optimice el procesamiento de forma mucho más efectiva que las transformaciones Spark y las acciones Spark que procesan en RDD. El otro aspecto más importante para mencionar aquí es que todos los lenguajes de programación compatibles de Spark se pueden usar para desarrollar aplicaciones utilizando la API DataFrame de Spark SQL.

Para todos los propósitos prácticos, Spark SQL es un motor SQL distribuido. Que usa esta abstracción para manipular datos que pueden provenir de diferentes orígenes de datos.

Vamos a construir un dataframe desde una base de datos relacional y un archivo y luego vamos a combinarlos:

import org.apache.spark.sql.SQLContext

//Creo el contexto Spark sql
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

//genero el jdbc
val url = "jdbc:mysql://127.0.0.1:3306/amarokdb"

//me conecto a la base y traigo los albunes
val albums = spark.read.format("jdbc").option("url", url).option("dbtable", "albums").option("user", "root").option("password","pass").load()

albums.printSchema() // Imprimo el esquema para probar
albums.registerTempTable("albums") //Registro la lista.

import sqlContext.implicits._

// Creo una clase artista que para leer el archivo
case class Artist(album_id: Int, id: Int, name: String)

// Leo el archivo y creo un DataFrame de artistas
val artists = sc.textFile("/java/spark/artists.csv").map(_.split(",")).map(p => Artist(p(0).trim.toInt, p(1).trim.toInt, p(2))).toDF()

//registro la tabla
artists.registerTempTable("artists")

//Genero un dataframe con un join entre los 2 dataframes y lo imprimo
spark.sql(“select * from albums a, artists aa where a.artist = aa.id”).show

Y eso es todo amigos!!!


jueves, 10 de mayo de 2018

Apache Spark Streaming



Con el procesamiento de datos en streaming podemos analizar los datos en tiempo real, lo cual es muy útil en muchas situaciones. Apache Spark provee una librería para procesar streaming, que como no era necesarío un nombre difícil o original lo bautizaron Apache Spark Streaming.

El procesamiento de streaming se utiliza para analizar información que viene de sensores, iot, busquedas, logs de paginas web, el trafico a servidores, etc. Ejemplos de su uso podrian ser: monitorización de servidores o del comportamiento de usuarios en paginas web, etc.

Spark Streaming es una extensión de Apache Spark API y como ya digimos Apache Spark Streaming hace facil la creación de procesos tolerante a fallos para procesar información en tiempo real.

Desde la versión 2.0, Spark streaming soporta una nueva librería de streaming llamada 
Structured Streaming, el cual ofrece un procesamiento de streaming escalable y tolerante a fallos basado en Spark SQL. Podemos utilizar dataset and la Api de dataframe en Scala, Java, Python o R para escribir agregaciones de streamings. Structured Streaming provee un camino para el procesamiento sin que tengamos que razonar mucho sobre el procesamiento.

Spark Streaming funciona dividiendo la transmisión en pequeñas porciones de datos  (llamados micro-batches) en un intervalo predefinido (N segundos) y luego convierte cada micro-batch en un RDD. Nosotros podemos procesar estos RDD con las operaciones : map , reduce , reduceByKey , join , y window. Los resultados de estas operaciones RDD se devuelven en lotes. Por lo general, almacenamos estos resultados en un almacén de datos para un análisis posterior, para generar informes, o para enviar alertas basadas en eventos.

Es importante configurar correctamente el intervalo de tiempo para Spark Streaming, en función del caso de uso y los requisitos de procesamiento de datos. Si el valor de N es demasiado bajo, entonces los micropaquetes no tendrán suficientes datos para dar resultados significativos durante el análisis.

Los datos de streaming pueden ser procesados desde diferentes fuentes :
  • Kafka,
  • Flume,
  • Twitter,
  •  ZeroMQ,
  •  Amazon’s Kinesis, and
  •  TCP sockets
Otra ventaja de utilizar Apache Spark es que podemos combinar procesamiento batch con procesamiento streaming. También podemos utilizar otros subproyecto somo Spark mlib o graphX.


domingo, 6 de mayo de 2018

Ejemplo de Apache Spark Sql - parte 2

En el ejemplo anterior, el esquema se deduce utilizando reflexión. También podemos especificar mediante programación el esquema del dataset. Esto es útil cuando las clases personalizadas no se pueden definir con anticipación porque la estructura de los datos está codificada en una cadena.

El siguiente ejemplo de código muestra cómo especificar el esquema utilizando las clases de tipo de datos StructType, StringType y StructField.

//
// Programmatically Specifying the Schema
//
// Create SQLContext from the existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create an RDD
val rddCustomers = sc.textFile(“/home/emanuel/eje.csv”)

// The schema is encoded in a string
val schemaString = “customer_id name city state zip_code”

// Import Spark SQL data types and Row.
import org.apache.spark.sql._
import org.apache.spark.sql.types._;

// Generate the schema based on the string of schema
val schema = StructType(schemaString.split(“ “).map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (rddCustomers) to Rows.
val rowRDD = rddCustomers.map(_.split(“,”)).map(p => Row(p(0).trim,p(1),p(2),p(3),p(4)))

// Apply the schema to the RDD.
val dfCustomers = sqlContext.createDataFrame(rowRDD, schema)

// Register the DataFrames as a table.
dfCustomers.registerTempTable(“customers”)

// SQL statements can be run by using the sql methodsprovided by sqlContext.
val custNames = sqlContext.sql(“SELECT name FROM customers”)

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
custNames.map(t => “Name: “ + t(0)).collect().foreach(println)

// SQL statements can be run by using the sql methods provided by sqlContext.
val customersByCity = sqlContext.sql(“SELECT name,zip_code FROM customers ORDER BY zip_code”)

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
customersByCity.map(t => t(0) + “,” + t(1)).collect().
foreach(println)

También podemos cargar los datos de otras fuentes de datos como archivos de datos JSON, tablas Hive o incluso tablas de bases de datos relacionales que utilizan la fuente de datos JDBC.
Spark SQL proporciona una agradable interfaz SQL para interactuar con datos que se cargan desde diversas fuentes de datos, utilizando la conocida sintaxis de consulta SQL.
Esto es especialmente útil para miembros de proyectos no técnicos, como analistas de datos y DBA.

Ejemplo de Apache Spark Sql

Para este ejemplo, cargaremos datos de clientes de un archivo de texto y crearemos un objeto DataFrame a partir del conjunto de datos. Entonces podemos ejecutar las funciones de DataFrame como consultas específicas para seleccionar los datos.

Veamos el archivo customers.txt.

100,John Smith, Austin, TX, 78727
200,Joe Johnson, Dallas, TX, 75201
300,Bob Jones, Houston, TX, 77028
400,Andy Davis, San Antonio, TX, 78227
500,James Williams, Austin, TX, 78727

El siguiente código muestra los comandos Spark SQL que podemos ejecutar en la consola del shell Spark.

// Create the SQLContext first from the existing Spark Context
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Import statement to implicitly convert an RDD to a DataFrame
import sqlContext.implicits._

// Create a custom class to represent the Customer
case class Customer(customer_id: Int, name: String, city: String, state: String, zip_code: String)

// Create a DataFrame of Customer objects from the data set text file.
val dfCustomers = sc.textFile(“/home/emanuel/eje.csv”).map(_.split(“,”)).map(p => Customer(p(0).trim.toInt, p(1), p(2), p(3), p(4))).toDF()

// Register DataFrame as a table.
dfCustomers.registerTempTable(“customers”)

// Display the content of DataFrame
dfCustomers.show()

// Print the DF schema
dfCustomers.printSchema()

// Select customer name column
dfCustomers.select(“name”).show()

// Select customer name and city columns
dfCustomers.select(“name”, “city”).show()

// Select a customer by id
dfCustomers.filter(dfCustomers(“customer_id”).equalTo(500)).show()

// Count the customers by zip code
dfCustomers.groupBy(“zip_code”).count().show()

Apache Spark Sql - JDBC Data Source


Spark sql contiene un origen de datos jdbc. Con él podemos leer una base de datos relacional. Este enfoque es preferible al uso de JdbcRDD porque el origen de datos devuelve los resultados como un DataFrame que puede procesarse en Spark SQL o mezclarse con datos de otras fuentes de datos.

Veamos un ejemplo:

import org.apache.spark.sql.SQLContext

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val url = "jdbc:mysql://127.0.0.1:3306/amarokdb"

val df = spark.read.format("jdbc").option("url", url).option("dbtable", "albums").option("user", "root").option("password","pass").load()

df.printSchema() // Looks the schema of this DataFrame.
df.show()
df.count()

jueves, 3 de mayo de 2018

Apache Spark Sql


Como habíamos dicho en un ambiente Spark SQL, los 2 componentes más importantes son el DataFrame y el SQLContext.

DataFrame: Un dataframe es una colección de datos distribuida organizada dentro de nombres de columnas. Esto basado en el concepto de dataframe del Lenguaje R y similar al concepto de tablas de las base de datos relacionales.

SchemaRDD, que se encuentra en versiones anteriores de Spark SQL API y se ha renombrado como DataFrame.

DataFrames puede ser convertido en un RDDs, por medio de un método de RDD, el RDD resultante tiene las filas del DataFrames.

Los DataFrames pueden ser creados dados los siguientes origenes de datos:

  • RDDs existentes
  • un archivo estructurado
  • un Json
  • una tabla HIVE
  • y una base de datos externa. 

Spark SQL y DataFrame APIs esta disponible en Scala, Java, Python y R. 

Además de DataFrame, Spark también proporciona la API Dataset. Un Dataset es una colección distribuida de datos similar a los RDD pero que utiliza un codificador para serializar los objetos. La API Dataset está disponible en Scala y Java. Spark SQL admite métodos para convertir RDD existentes en conjuntos de datos.

SQLContext: Spark SQL proporciona SQLContext para encapsular toda la funcionalidad relacional en Spark. Se crea el SQLContext a partir del SparkContext existente, veamos un ejemplo: 

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

También hay HiveContext, que proporciona un superconjunto de la funcionalidad proporcionada por SQLContext. Se puede usar para escribir consultas usando el analizador HiveQL y para leer datos de tablas de Hive.

Notemos que no necesitamos un entorno Hive existente para usar HiveContext en los programas Spark.

jueves, 26 de abril de 2018

Apache Spark Sql

Spark Sql es parte de Spark y permite consultar datos en Spark, utilizando un lenguaje estructurado similar a SQL. Podemos exportar datos a xml, json, etc.

Spark Sql nos permite consultar archivos batch, conjunto de datos Json o tablas Hive. Spark Sql trae características muy útiles y en las ultimas versiones ha agregado importantes mejoras en las que podemos nombrar:

DataFrames: es una abstracción que permite que funcione como SQL query engine distribuido.
Una Api de origen de datos (datasource) que permite conectarse con orígenes de datos de diferente estructura.
Un servidor JDBC que hace fácil conectarnos con base de datos relacionales y permite procesar datos de igual manera que una herramienta BI tradicional.

En un ambiente Spark SQL, los 2 componentes más importantes son el DataFrame y el SQLContext. Que los veremos en próximos posts!!