Translate

lunes, 13 de marzo de 2023

Vamos a implementar un framework de procesamiento paralelo simple con Cats parte 2

Dado que en el post anterios refrescamos nuestra memoria de Futures, veamos cómo podemos dividir el trabajo en lotes. Podemos consultar la cantidad de CPU disponibles en nuestra máquina mediante una llamada API desde la biblioteca estándar de Java:

Runtime.getRuntime.availableProcessors

// res11: Int = 2

Podemos particionar una secuencia (en realidad cualquier cosa que implemente Vector) usando el método agrupado. Usaremos esto para dividir lotes de trabajo para cada CPU:


(1 to 10).toList.grouped(3).toList

// res12: List[List[Int]] = List(

//List(1, 2, 3),

//List(4, 5, 6),

//List(7, 8, 9),

//List(10)

// )


Implemente una versión paralela de foldMap llamada parallelFoldMap:


def parallelFoldMap[A, B: Monoid] (values: Vector[A]) (func: A => B): Future[B] = {

    // Calculate the number of items to pass to each CPU:

    val numCores = Runtime.getRuntime.availableProcessors

    val groupSize = (1.0 * values.size / numCores).ceil.toInt

    // Create one group for each CPU:

    val groups: Iterator[Vector[A]] = values.grouped(groupSize)

    // Create a future to foldMap each group:

    val futures: Iterator[Future[B]] =

        groups map { group =>

            Future {

                group.foldLeft(Monoid[B].empty)(_ |+| func(_))

            }

    }


    // foldMap over the groups to calculate a final result:

    Future.sequence(futures) map { iterable =>

    iterable.foldLeft(Monoid[B].empty)(_ |+| _)

    }

}

val result: Future[Int] = parallelFoldMap((1 to 1000000).toVector)(identity)

Await.result(result, 1.second)

// res14: Int = 1784293664


Aunque implementamos foldMap nosotros mismos arriba, el método también está disponible como parte de la clase de tipo Foldable:


import cats.Monoid

import cats.instances.int._

// for Monoid

import cats.instances.future._ // for Applicative and Monad

import cats.instances.vector._ // for Foldable and Traverse

import cats.syntax.foldable._// for combineAll and foldMap

import cats.syntax.traverse._// for traverse

import scala.concurrent._

import scala.concurrent.duration._

import scala.concurrent.ExecutionContext.Implicits.global


def parallelFoldMap[A, B: Monoid](values: Vector[A]) (func: A => B): Future[B] = {

    val numCores = Runtime.getRuntime.availableProcessors

    val groupSize = (1.0 * values.size / numCores).ceil.toInt

    values.grouped(groupSize).toVector.traverse(group => Future(group.toVector.foldMap(func)))

        .map(_.combineAll)

}

val future: Future[Int] = parallelFoldMap((1 to 1000).toVector)(_ * 1000)

Await.result(future, 1.second)

// res18: Int = 500500000