Translate

Mostrando las entradas con la etiqueta ReactiveX. Mostrar todas las entradas
Mostrando las entradas con la etiqueta ReactiveX. Mostrar todas las entradas

viernes, 8 de octubre de 2021

Observables calientes y fríos


Un Observable comienza a emitir una secuencia de elementos cuando el Observador se suscribe: se denominan observables fríos. Los observables fríos siempre esperan tener al menos un observador suscrito para comenzar a emitir elementos.

Por otro lado, un observable que comienza a emitir elementos antes de conectarse a un observador se denomina observable caliente. Con los observables calientes, un observador puede suscribirse y comenzar a recibir elementos en cualquier momento durante la emisión. Con observables calientes, el observador puede recibir la secuencia completa de elementos comenzando desde el principio o no.

Veamos un ejemplo más concreto, pero simple. 

Creemos un Observable que emita todos los números enteros del 1 al 5 y suscríbase a él:

Observable<Integer> observable = Observable.from(new Integer[]{1, 2, 3, 4, 5});

observable.subscribe(new Subscriber<Integer>() {

    @Override

    public void onCompleted() {

        System.out.println("Sequence completed!");

    }

    @Override

    public void onError(Throwable e) {

        System.err.println("Exception: " + e.getMessage());

    }

    @Override

    public void onNext(Integer integer) {

        System.out.println("next item is: " + integer);

    }

});

La salida esperada es

next item is: 1
next item is: 2
next item is: 3
next item is: 4
next item is: 5
Sequence completed!

Este es un observable frío porque comenzará a emitir elementos solo cuando el observador se suscriba. 
El observable generará una secuencia de cinco elementos, cada uno representando un objeto entero (de 1 a 5), ​​por lo que el método onNext del observador se invocará cinco veces. Al final de la secuencia, se notificará el método onCompleted. El método onError nunca será notificado porque esta secuencia no genera ningún tipo de error o excepción.

Un ejemplo de un observable caliente podría ser un observable que emite un evento cada vez que se hace clic en un botón de la interfaz de usuario. No comienza a emitir eventos cuando el observador se suscribe; emite eventos incluso si no hay ningún suscriptor suscrito. 

En este ejemplo, crea,ps un observable usando el método Observable.from (), un método de fábrica estático que puede crear un Observable a partir de un matriz, iterable o Future.

Esta no es la única forma de crear observables. Pero esa es una historia para otro post...


miércoles, 15 de septiembre de 2021

ReactiveX


ReactiveX es un framework para los lenguajes de programación más utilizados: Java, JavaScript, C#, Scala, Clojure, C ++, Ruby, Python, Groovy, JRuby, Kotlin, Swift y más. 

RxJava es un framwork que implementa los conceptos de ReactiveX en Java. Veamos un ejemplo RxJava:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5);

Observable.from(input).filter(new Func1() {

    @Override

     public Boolean call(Integer x) {

          return x % 2 == 0;

     }

})

o usando lambda : 

Observable.from(input).filter(x -> x % 2 == 0);


El objeto resultante (la instancia de rx.Observable) generará una secuencia de los números pares contenidos en la secuencia de entrada: 2 y 4.

En RxJava, rx.Observable agrega dos semánticas al patrón Observador de Gang of Four (la semántica predeterminada es emitir elementos creados, como una lista con elementos 2,4 en el ejemplo anterior):

  • El productor puede notificar al consumidor que no hay más datos disponible.
  • El productor puede notificar al consumidor que ha ocurrido un error.
La biblioteca RxJava proporciona un modelo de programación donde podemos trabajar con eventos generados desde UI o llamadas asincrónicas de la misma manera en que operamos con colecciones y streams en Java 8.

La biblioteca RxJava se creó en Netflix como una alternativa más inteligente a Java Futures y devoluciones de llamada. Tanto los futuros como las devoluciones de llamada son fáciles de usar cuando solo hay un nivel de ejecución asincrónica, pero son difíciles de administrar cuando están anidados.

El siguiente ejemplo muestra cómo se maneja el problema de las devoluciones de llamada anidadas en RxJava.

Suponga que necesita llamar a una API remota para autenticar a un usuario, luego a otra para obtener los datos del usuario y a otra API para obtener los contactos de un usuario. Normalmente, tendría que escribir llamadas a API anidadas y hacer complejos callbacks. Pero con RxJava se puede hacer así : 

serviceEndpoint.login().doOnNext(accessToken -> storeCredentials(accessToken)).flatMap(accessToken -> serviceEndpoint.getUser()).flatMap(user -> serviceEndpoint.getUserContact(user.getId()))




martes, 21 de julio de 2020

ReactiveX parte 2


Continuando con el post anterior...

En el modelo ReactiveX, Observable le permite tratar flujos de eventos asincrónicos con el mismo tipo de operaciones simples y componibles que se utilizan para colecciones. Lo libera de redes enredadas de devoluciones de llamada (como los callbacks) y, por lo tanto, hace un código sea más legible y menos propenso a errores.

Las técnicas como Java Futures son fáciles de usar para un solo nivel de ejecución asincrónica, pero comienzan a agregar una complejidad no trivial cuando están anidadas.

Es difícil usar Futures para componer de manera óptima flujos de ejecución asincrónicos condicionales (o imposible, ya que las latencias de cada solicitud varían en tiempo de ejecución). Esto se puede hacer, por supuesto, pero rápidamente se vuelve complicado (y por lo tanto propenso a errores) o se bloquea prematuramente en Future.get (), lo que elimina el beneficio de la ejecución asincrónica.

Los Observables de ReactiveX, están destinados a componer flujos y secuencias de datos asincrónicos.

ReactiveX Observables admite no solo la emisión de valores escalares únicos (como lo hacen los futuros), sino también secuencias de valores o incluso flujos infinitos. Observable es una única abstracción que puede usarse para cualquiera de estos casos de uso. Un Observable tiene toda la flexibilidad y elegancia asociadas a el Iterable:


ReactiveX no está sesgado hacia alguna fuente particular de concurrencia o asincronía. Los observables se pueden implementar utilizando grupos de subprocesos, bucles de eventos, I/O sin bloqueo, actores (como de Akka) o cualquier implementación que se adapte a sus necesidades, su estilo o su experiencia. El código del cliente trata todas sus interacciones con Observables como asíncronas, ya sea que su implementación subyacente sea bloqueante o no.

ReactiveX proporciona una colección de operadores con los que puede filtrar, seleccionar, transformar, combinar y componer Observables. Esto permite una ejecución y composición eficientes.

Se Puede pensar en la clase Observable como un equivalente a Iterable. Con un Iterable, el consumidor extrae valores del productor y los bloques de subprocesos hasta que lleguen esos valores. Por el contrario, con un Observable, el productor introduce valores al consumidor cuando los valores están disponibles. Este enfoque es más flexible, porque los valores pueden llegar sincrónicamente o asincrónicamente.


El tipo Observable agrega dos semánticas faltantes al patrón Observador de la Banda de los Cuatro, para que coincida con las que están disponibles en el tipo Iterable:

  • la capacidad del productor de indicarle al consumidor que no hay más datos disponibles (un bucle foreach en un Iterable se completa y regresa normalmente en tal caso; un Observable llama al método onCompleted de su observador)
  • la capacidad del productor de indicarle al consumidor que se ha producido un error (un Iterable genera una excepción si se produce un error durante la iteración; un Observable llama al método onError de su observador)
Con estas adiciones, ReactiveX armoniza los tipos Iterable y Observable. La única diferencia entre ellos es la dirección en la que fluyen los datos. Esto es muy importante porque ahora cualquier operación que pueda realizar en un Iterable, también puede realizarla en un Observable.

Dejo link : http://reactivex.io


sábado, 18 de julio de 2020

ReactiveX

Hace tiempo que existe este framework y es muy raro que no haya escrito de él antes, tal vez porque se utiliza muchísimo en Angular y no soy un experto en esta tecnología. 

Pero bueno, todo llega y vamos a hablar de ReactiveX que en su pagina web dice que ReactiveX es una combinación de buenas ideas: el patrón Observer, el patrón Iterator y la programación funcional.

Patrones y programación funcional, temas que tocamos todo el tiempo en el blog, que raro que no lo vi antes o si lo vi, le reste importancia. Me paso lo mismo con typescript

ReactiveX sigue el patrón Reactor. Que si vamos a la wikipedia podemos leer : 

"El patrón de diseño reactor es un patrón de programación concurrente para manejar los pedidos de servicio entregados de forma concurrente a un manejador de servicio desde una o más entradas. El manejador de servicio demultiplexa los pedidos entrantes y los entrega de forma sincrónica a los manejadores de pedidos asociados."

y

"Este patrón separa completamente al código específico de la aplicación de la implementación del reactor, permitiendo que los componentes de la aplicación puedan ser divididos en módulos reutilizables. Además, la llamada síncrona de los manejadores de pedidos permite concurrencia, de grano grueso, sin agregar la complejidad de múltiples hilos en el sistema."

A la vez ReactiveX utiliza los conceptos de programación reactiva. Pero que sería la programación reactiva?? 

Esto es más fácil explicarlo con un ejemplo, supongamos que tenemos que hacer una aplicación que tiene que calcular una serie de montos, el monto 1 se ingresa, el monto 2 es un porcentaje del monto 1, el monto 3 se ingresa, el monto 4, es la suma del montos. Si programaste aplicaciones seguro que te encontraste con un problema similar. Si ingresa el monto 1 debemos refrescar todos los montos calculados y si ingresan el monto 3 solo el 4. Podemos atacamos este problema poniendo un botón que refresque los valores, el tema es que más allá que es poco práctico, no esta bueno que si modificamos el monto 3, se refresque todo (ya que es innecesario)

Excel o una hoja de calculo, la hace más fácil, cuando modificamos la celda, refresca automáticamente todos los valores que la usan esta celda para su calculo, este es el concepto de reactividad. Y es mucho más eficiente. 

Pero ahora volvamos a reactiveX que básicamente implementa lo que acabo de explicar, con diferentes conceptos : observador y suscripción. 

Por lo tanto ReactiveX es una librería para componer programas asincrónicos y basados en eventos mediante el uso de secuencias observables.

Extiende el patrón de observador para admitir secuencias de datos y/o eventos y agrega operadores que le permiten componer secuencias juntas de manera declarativa, mientras abstrae preocupaciones sobre cosas como subprocesos de bajo nivel, sincronización, seguridad de subprocesos, estructuras de datos concurrentes y no bloqueo de I/O.

ReactiveX es multilenguaje y multiplataforma, talvez la versión más utilizada es la de javascript, ya que viene con Angular y otros frameworks webs. Pero se puede utilizar en diferentes plataformas y lenguajes. 

Les dejo la lista: 

Languages

ReactiveX for platforms and frameworks

Y como me quedo muy largo el post, voy a seguir en otros...

Dejo link: 
http://reactivex.io/