lunes, 13 de marzo de 2023

Vamos a implementar un framework de procesamiento paralelo simple con Cats

Vamos a implementar un framework de procesamiento paralelo simple pero poderoso utilizando Monoids, Functors y una serie de otras cosas.

Si ha utilizado Hadoop o Spark o ha trabajado en "grandes datos", habrá oído hablar de MapReduce, que es un modelo de programación para realizar el procesamiento de datos en paralelo en grupos de máquinas (también conocidos como "nodos"). Como sugiere el nombre, el modelo se construye alrededor de una fase de map, que es la misma función de map que conocemos de Scala y la clase de tipo Functor, y una fase de reducción, que generalmente llamamos plegar o fold en Scala.

Recuerde que la firma general para el mapa es aplicar una función A => B a una F[A], devolviendo una F[B]

map transforma cada elemento individual en una secuencia de forma independiente. Podemos paralelizar fácilmente porque no hay dependencias entre las transformaciones aplicadas a diferentes elementos (la firma de tipo de la función A => B).

Nuestro paso de reducción se convierte en un pliegue a la izquierda sobre los resultados del map distribuido.

Al distribuir el paso de reducción, perdemos el control sobre el orden de recorrido. Es posible que nuestra reducción general no sea completamente de izquierda a derecha: podemos reducir de izquierda a derecha en varias subsecuencias y luego combinar los resultados. Para asegurar la corrección necesitamos una operación de reducción que sea asociativa:

reduce(a1, reduce(a2, a3)) == reduce(reduce(a1, a2), a3)

Si tenemos asociatividad, podemos distribuir arbitrariamente el trabajo entre nuestros nodos siempre que las subsecuencias en cada nodo permanezcan en el mismo orden que el conjunto de datos inicial.

Nuestra operación de plegado nos obliga a sembrar el cálculo con un elemento de tipo B. Dado que el plegado se puede dividir en un número arbitrario de pasos paralelos, la semilla no debería afectar el resultado del cálculo. Esto naturalmente requiere que la semilla sea un elemento de identidad:

reduce(seed, a1) == reduce(a1, seed) == a1

En resumen, nuestro pliegue paralelo dará los resultados correctos si:

• requerimos que la función reductora sea asociativa;

• Sembramos el cálculo con la identidad de esta función.

¿Cómo suena este patrón? Así es, hemos completado el círculo de regreso a Monoid. El patrón de diseño monoide para trabajos de reducción de mapas está en el centro de los sistemas de big data recientes, como Summingbird de Twitter.

En este proyecto vamos a implementar un map-reduce de una sola máquina muy simple. Comenzaremos implementando un método llamado foldMap para modelar el flujo de datos que necesitamos.

Vimos foldMap es una de las operaciones derivadas que se encuentra encima de foldLeft y foldRight. Sin embargo, en lugar de usar Foldable, volveremos a implementar foldMap aquí nosotros mismos, ya que proporcionará información útil sobre la estructura de map‐reduce. Comience escribiendo la firma de foldMap. Debe aceptar los siguientes parámetros:

• una secuencia de tipo Vector[A];

• una función de tipo A => B, donde hay un Monoide para B;

Deberá agregar parámetros implícitos o límites de contexto para completar la firma de tipo.

import cats.Monoid

/** Single-threaded map-reduce function.

* Maps `func` over `values` and reduces using a `Monoid[B]`.

*/

def foldMap[A, B: Monoid](values: Vector[A])(func: A => B): B =

???

Ahora implemente el cuerpo de foldMap:

  1. comenzar con una secuencia de elementos de tipo A;
  2. mapear sobre la lista para producir una secuencia de elementos de tipo B;
  3. usa el Monoide para reducir los elementos a una sola B.

Aquí hay algunos resultados de muestra para referencia:


import cats.instances.int._ // for Monoid

foldMap(Vector(1, 2, 3))(identity)

// res1: Int = 6

import cats.instances.string._ // for Monoid

// Mapping to a String uses the concatenation monoid:

foldMap(Vector(1, 2, 3))(_.toString + "! ")

// res2: String = "1! 2! 3! "

// Mapping over a String to produce a String:

foldMap("Hello world!".toVector)(_.toString.toUpperCase)

// res3: String = "HELLO WORLD!"


Veamos la implementación: 


import cats.Monoid

import cats.syntax.semigroup._ // for |+|

def foldMap[A, B : Monoid](as: Vector[A])(func: A => B): B =

as.map(func).foldLeft(Monoid[B].empty)(_ |+| _)


Podemos hacer una pequeña alteración a este código para hacer todo en un solo paso:


def foldMap[A, B : Monoid](as: Vector[A])(func: A => B): B =

as.foldLeft(Monoid[B].empty)(_ |+| func(_))


Ahora que tenemos una implementación funcional de subproceso único de foldMap, veamos cómo distribuir el trabajo para que se ejecute en paralelo. Usaremos nuestra versión de subproceso único de foldMap como bloque de construcción. Escribiremos una implementación de CPU múltiple que simule la forma en que distribuiríamos el trabajo en un clúster de reducción de mapa:

  1. comenzamos con una lista inicial de todos los datos que necesitamos procesar;
  2. dividimos los datos en lotes, enviando un lote a cada CPU;
  3. las CPU ejecutan una fase de mapa a nivel de lote en paralelo;
  4. Las CPU ejecutan una fase de reducción de nivel de lote en paralelo, produciendo un local resultado de cada lote;
  5. reducimos los resultados de cada lote a un único resultado final.

Scala proporciona algunas herramientas simples para distribuir el trabajo entre subprocesos. Podríamos usar la biblioteca de colecciones paralelas para implementar una solución, pero desafiémonos a nosotros mismos profundizando un poco más e implementando el algoritmo nosotros mismos usando Futures.

Ya sabemos bastante sobre la naturaleza monádica de Futures. Tomemos un momento para un resumen rápido y para describir cómo se programan los futuros de Scala detrás de escena.

Los futuros se ejecutan en un grupo de subprocesos, determinado por un parámetro ExecutionContext implícito.

Cada vez que creamos un futuro, ya sea a través de una llamada a Future.apply o algún otro combinador, debemos tener un ExecutionContext implícito en el alcance:

import scala.concurrent.Future

import scala.concurrent.ExecutionContext.Implicits.global

val future1 = Future {

     (1 to 100).toList.foldLeft(0)(_ + _)

}

// future1: Future[Int] = Future(Success(5050))

val future2 = Future {

    (100 to 200).toList.foldLeft(0)(_ + _)

}

// future2: Future[Int] = Future(Success(15150))


En este ejemplo, hemos importado un ExecutionContext.Implicits.global. Este contexto predeterminado asigna un grupo de subprocesos con un subproceso por CPU en nuestra máquina. Cuando creamos un futuro, ExecutionContext lo programa para su ejecución. Si hay un subproceso libre en el grupo, Future comienza a ejecutarse de inmediato. La mayoría de las máquinas modernas tienen al menos dos CPU, por lo que en nuestro ejemplo es probable que future1 y future2 se ejecuten en paralelo.

Algunos combinadores crean nuevos Futuros que programan el trabajo en función de los resultados de otros Futuros. Los métodos map y flatMap, por ejemplo, programan cálculos que se ejecutan tan pronto como se calculan sus valores de entrada y hay una CPU disponible:

val future3 = future1.map(_.toString)

// future3: Future[String] = Future(Success(5050))

val future4 = for {

    a <- future1

    b <- future2

} yield a + b

// future4: Future[Int] = Future(Success(20200))


Podemos convertir una List[Future[A]] en una Future[List[A]] usando Future.sequence:


Future.sequence(List(Future(1), Future(2), Future(3)))

// res6: Future[List[Int]] = Future(Success(List(1, 2, 3)))


Podemos convertir una List[Future[A]] en una Future[List[A]] usando Future.sequence:


import cats.instances.future._ // for Applicative

import cats.instances.list._// for Traverse

import cats.syntax.traverse._// for sequence

List(Future(1), Future(2), Future(3)).sequence

// res7: Future[List[Int]] = Future(Success(List(1, 2, 3)))


Se requiere un ExecutionContext en cualquier caso. Finalmente, podemos usar Await.result para bloquear un futuro hasta que haya un resultado disponible:


import scala.concurrent._

import scala.concurrent.duration._

Await.result(Future(1), 1.second) // wait for the result

// res8: Int = 1


También hay implementaciones de Monad y Monoid para Future disponibles en cats.instances.future:


import cats.{Monad, Monoid}

import cats.instances.int._

// for Monoid

import cats.instances.future._ // for Monad and Monoid

Monad[Future].pure(42)

Monoid[Future[Int]].combine(Future(1), Future(2))


Me quedo medio largo el post, así que vamos a seguir en el proximo post. 



No hay comentarios.:

Publicar un comentario