Dado que en el post anterios refrescamos nuestra memoria de Futures, veamos cómo podemos dividir el trabajo en lotes. Podemos consultar la cantidad de CPU disponibles en nuestra máquina mediante una llamada API desde la biblioteca estándar de Java:
Runtime.getRuntime.availableProcessors
// res11: Int = 2
Podemos particionar una secuencia (en realidad cualquier cosa que implemente Vector) usando el método agrupado. Usaremos esto para dividir lotes de trabajo para cada CPU:
(1 to 10).toList.grouped(3).toList
// res12: List[List[Int]] = List(
//List(1, 2, 3),
//List(4, 5, 6),
//List(7, 8, 9),
//List(10)
// )
Implemente una versión paralela de foldMap llamada parallelFoldMap:
def parallelFoldMap[A, B: Monoid] (values: Vector[A]) (func: A => B): Future[B] = {
// Calculate the number of items to pass to each CPU:
val numCores = Runtime.getRuntime.availableProcessors
val groupSize = (1.0 * values.size / numCores).ceil.toInt
// Create one group for each CPU:
val groups: Iterator[Vector[A]] = values.grouped(groupSize)
// Create a future to foldMap each group:
val futures: Iterator[Future[B]] =
groups map { group =>
Future {
group.foldLeft(Monoid[B].empty)(_ |+| func(_))
}
}
// foldMap over the groups to calculate a final result:
Future.sequence(futures) map { iterable =>
iterable.foldLeft(Monoid[B].empty)(_ |+| _)
}
}
val result: Future[Int] = parallelFoldMap((1 to 1000000).toVector)(identity)
Await.result(result, 1.second)
// res14: Int = 1784293664
Aunque implementamos foldMap nosotros mismos arriba, el método también está disponible como parte de la clase de tipo Foldable:
import cats.Monoid
import cats.instances.int._
// for Monoid
import cats.instances.future._ // for Applicative and Monad
import cats.instances.vector._ // for Foldable and Traverse
import cats.syntax.foldable._// for combineAll and foldMap
import cats.syntax.traverse._// for traverse
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
def parallelFoldMap[A, B: Monoid](values: Vector[A]) (func: A => B): Future[B] = {
val numCores = Runtime.getRuntime.availableProcessors
val groupSize = (1.0 * values.size / numCores).ceil.toInt
values.grouped(groupSize).toVector.traverse(group => Future(group.toVector.foldMap(func)))
.map(_.combineAll)
}
val future: Future[Int] = parallelFoldMap((1 to 1000).toVector)(_ * 1000)
Await.result(future, 1.second)
// res18: Int = 500500000