Translate

domingo, 6 de mayo de 2018

Ejemplo de Apache Spark Sql - parte 2

En el ejemplo anterior, el esquema se deduce utilizando reflexión. También podemos especificar mediante programación el esquema del dataset. Esto es útil cuando las clases personalizadas no se pueden definir con anticipación porque la estructura de los datos está codificada en una cadena.

El siguiente ejemplo de código muestra cómo especificar el esquema utilizando las clases de tipo de datos StructType, StringType y StructField.

//
// Programmatically Specifying the Schema
//
// Create SQLContext from the existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create an RDD
val rddCustomers = sc.textFile(“/home/emanuel/eje.csv”)

// The schema is encoded in a string
val schemaString = “customer_id name city state zip_code”

// Import Spark SQL data types and Row.
import org.apache.spark.sql._
import org.apache.spark.sql.types._;

// Generate the schema based on the string of schema
val schema = StructType(schemaString.split(“ “).map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (rddCustomers) to Rows.
val rowRDD = rddCustomers.map(_.split(“,”)).map(p => Row(p(0).trim,p(1),p(2),p(3),p(4)))

// Apply the schema to the RDD.
val dfCustomers = sqlContext.createDataFrame(rowRDD, schema)

// Register the DataFrames as a table.
dfCustomers.registerTempTable(“customers”)

// SQL statements can be run by using the sql methodsprovided by sqlContext.
val custNames = sqlContext.sql(“SELECT name FROM customers”)

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
custNames.map(t => “Name: “ + t(0)).collect().foreach(println)

// SQL statements can be run by using the sql methods provided by sqlContext.
val customersByCity = sqlContext.sql(“SELECT name,zip_code FROM customers ORDER BY zip_code”)

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
customersByCity.map(t => t(0) + “,” + t(1)).collect().
foreach(println)

También podemos cargar los datos de otras fuentes de datos como archivos de datos JSON, tablas Hive o incluso tablas de bases de datos relacionales que utilizan la fuente de datos JDBC.
Spark SQL proporciona una agradable interfaz SQL para interactuar con datos que se cargan desde diversas fuentes de datos, utilizando la conocida sintaxis de consulta SQL.
Esto es especialmente útil para miembros de proyectos no técnicos, como analistas de datos y DBA.