domingo, 12 de agosto de 2018

Ejemplo con Java y Spark


Vamos hacer un pequeño ejemplo, una aplicación Java que utilice Spark para contar palabras. La aplicación calculara cuantas veces aparece una palabra en un archivo de texto.

Para hacer esto vamos a utilizar eclipse y maven.

Primero creamos un proyecto simple de maven, para lo cual podemos utilizar la consola, con el siguiente comando:

mvn archetype:generate
-DarchetypeGroupId=org.apache.maven.archetypes
-DarchetypeArtifactId=maven-archetype-quickstart
-DarchetypeVersion=RELEASE
-DgroupId=org.miOrganizacion
-DartifactId=nombreDelProyecto

o con eclipse, vamos a File -> New -> Project -> Maven -> Maven Project y luego elegimos es arquetipo "maven-archetype-quickstart" y luego completamos los campos groupId y artifactId. Y listo!

Ahora vamos agregar spark en las dependencias en el pom.xml :

  <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.1</version>
</dependency>

Y vamos a indicarle que utilizaremos java 8 con la siguiente lineas :

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

El pom.xml queda de la siguiente manera:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.hexacta</groupId>
<artifactId>testSpark</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>testSpark</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.1</version>
</dependency>


<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>


</dependencies>
</project>

Y ahora, a programa!!.

Vamos hacer el ejemplo de contar palabras de un archivo de texto. Para lo cual haremos un objeto llamado "App.java" : 

package com.hexacta.testSpark;

import java.util.Arrays;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import scala.Tuple2;

public class App {

public static void main(String[] args) {
// Create a SparkContext to initialize
SparkConf conf = new SparkConf().setMaster("local").setAppName("Word Count");

// Create a Java version of the Spark Context
JavaSparkContext sc = new JavaSparkContext(conf);

// Load the text into a Spark RDD, which is a distributed representation of each
// line of text
JavaRDD<String> textFile = sc.textFile("/java/spark/artists.csv");

JavaPairRDD<String, Integer> counts = textFile
                                .flatMap(s -> Arrays.asList(s.split("[ ,]")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1)).reduceByKey((a, b) -> a + b);
counts.foreach(p -> System.out.println(p));
System.out.println("Total words: " + counts.count());
counts.saveAsTextFile("/java/spark/artistsCount.txt");
}
}

Dejo el repo git :
https://github.com/emanuelpeg/client-java-spark