Mostrando las entradas con la etiqueta RxJava. Mostrar todas las entradas
Mostrando las entradas con la etiqueta RxJava. Mostrar todas las entradas

martes, 19 de octubre de 2021

Componer y transformar observables en RxJava


Los observables son especialmente buenos para ser compuestos y transformados. Con el uso de algunos operadores definidos en RxJava, puede componer y transformar secuencias de datos de una manera fácil que requiere poca codificación, por lo que es menos propenso a errores. 

map: permite transformar cada elemento de la secuencia emitida con una función específica.

flatMap: realiza dos tipos de acciones: la acción de "map" que transforma los elementos emitidos en observables y una acción de "aplanar" que convierte esos observables en uno observable. 

concatMap: el operador concatMap se comporta como flatMap, excepto que asegura que los observables no estén intercalados sino concatenados, manteniendo su orden.

zip: el operador zip toma múltiples observables como entradas y combina cada emisión a través de una función específica y emite los resultados de esta función como una nueva secuencia.

concat: concatena dos o más emisiones, generando una emisión donde todos los elementos de la primera fuente de emisión aparecen antes que los elementos de la segunda fuente de emisión. Además, el operador de concat espera a que se complete cada secuencia antes de suscribirse al siguiente observable.

filter: utiliza una función específica para permitir que solo se emitan algunos elementos de la secuencia de origen.

distinct: Si un elemento se emite más de una vez, solo se emitirá la primera aparición.

first: emite solo el primer elemento de una secuencia. Si se especifica una función, se utilizará para filtrar los elementos, por lo que solo se emitirá el primer elemento de la secuencia que cumpla las condiciones.

last: si puede aplicar un filtro al comienzo de la secuencia con el operador primero, también puede filtrar el final de la secuencia con el operador al final .

take: Toma el elemento que se encuentra en la posición n, lo que permite que solo se emitan los primeros n elementos.

startWith: toma la secuencia de entrada y le agrega un elemento determinado. Puede ser útil si desea forzar que su secuencia comience con un valor predeterminado o con uno almacenado en caché.

scan: el escaneo del operador toma una secuencia y aplica una función a cada par de elementos emitidos secuencialmente.

viernes, 1 de octubre de 2021

onNext, onCompleted, onError


Como vimos anteriormente un flujo de datos lanza diferentes tipos de señales, next si hay un próximo dato y complete si se finalizo el flujo de datos. 

La interfaz rx.Observer <T> define los métodos, onNext, onCompleted, onError(Throwable). onError nos indica que sucedió un error y el flujo de datos no puede trasmitir más. 

Veamos un ejemplo: 

public void subscribeToObservable(Observable<T> observable) {

observable.subscribe(new Subscriber<>() {

@Override

public void onCompleted() {

    // invoked when Observable stops emitting items

}

@Override

public void onError(Throwable e) {

    // invoked when Observable throws an exception

    // while emitting items

}

@Override

public void onNext(T nextItem) {

    // invoked when Observable emits an item

    // usually you will consume the nextItem here

}

});

}


Usamos Subscriber<T> dado que es un objeto que implementa la interfaz rx.Observer <T>. La razón por la que utiliza Subscriber en lugar de cualquier otra implementación de la interfaz de Observer es que el Subscriber también implementa la interfaz de Suscripción, que le permite verificar si el suscriptor está cancelado (con el método isUnsubscriptions ()) y cancelar su suscripción.

En el ejemplo anterior, observe que un observador reacciona a tres tipos de eventos:

  • onNext: Ocurre cero, una o más veces. Si la secuencia se completa correctamente, el método onNext se invocará tantas veces como el número de elementos de la secuencia. Si se produce un error en un momento determinado, el método onNext no se invocará más.
  • onCompleted: solo cuando todos los elementos de la secuencia se emitan correctamente, se invocará el método onCompleted. Se invoca solo una vez y después de que se haya emitido el último elemento. No va a ser llamado nunca en una secuencia infinita.
  • onError: puede ocurrir un error en cada momento de la secuencia, y la secuencia se detendrá inmediatamente. En este caso, se invocará el método onError, pasando el error como objeto Throwable. Los otros dos métodos, onNext y onCompleted, no se invocarán, luego de este. 

Un observable no puede notificar los métodos onCompleted y onError, solo uno de ellos. Siempre será el último método invocado.

Con el método Observable.subscribe () (una operación llamada suscripción), puede conectar un Observable a un Observer, pero ¿qué sucede si desea desconectarlos? Esta la operación llamada unsubscribe:

// disconnect observable and observer
subscription.unsubscribe()

Se puede verificar si la suscripción se ha cancelado con el siguiente método:

subscription.isUnsubscribed()

Luego de la cancelación de suscripción, onNext no recibirá ningún otro elemento y los otros dos métodos, onCompleted y onError, no serán notificados. Después de la cancelación de la suscripción, el observable puede detenerse o continuar con la emisión, pero no se notificará al observador al respecto.

sábado, 25 de septiembre de 2021

Definiendo Observable y Observer


En Reactive programming un Observable es un objeto que emite una secuencia (o flujo) de eventos. Representa una colección basada en inserción, que es una colección en la que se insertan eventos cuando se crean.

Un observable emite una secuencia que puede ser vacía, finita o infinita. Cuando la secuencia es finita, se emite un evento completo después del final de la secuencia. En cualquier momento durante la emisión (pero no después de su finalización) se puede emitir un evento de error, deteniendo la emisión y cancelando la emisión del evento completo.

Cuando la secuencia está vacía, solo se emite el evento completo, sin emitir ningún ítem. Con una secuencia infinita, el evento completo nunca se emite.

La emisión se puede transformar, filtrar o combinar con otras emisiones, etc. 

Un observador es un objeto que se suscribe a un observable. Escucha y reacciona a cualquier secuencia de elementos emitida por el Observable.

El Observer no está bloqueado mientras espera nuevos elementos emitidos, por lo que en operaciones simultáneas, no se produce ningún bloqueo. Simplemente se activa cuando se emite un nuevo elemento.

Este es uno de los principios fundamentales de la programación reactiva: en lugar de ejecutar instrucciones una a la vez (siempre esperando a que se complete la instrucción anterior), el observable proporciona un mecanismo para recuperar y transformar datos, y el Observer activa este mecanismo, todos de forma concurrente.

El siguiente pseudocódigo es un ejemplo del método que implementa el Observer que reacciona a los elementos del Observable:

onNext = { it -> doSomething }


Aquí, el método está definido, pero no se invoca nada. Para comenzar a reaccionar, debe suscribirse al Observable:

observable.subscribe(onNext)

Ahora el observador está atento a los elementos y reaccionará a cada elemento nuevo que se emitirá.
Reescribamos este ejemplo en código Java usando las API de RxJava:

public void subscribeToObservable(Observable<T> observable) {
    observable.subscribe(nextItem -> {
        // invoked when Observable emits an item
        // usually you will consume the nextItem here
    });
}

Ahora está claro que para conectar un observable con un observador, debes usar el método de suscripción.


miércoles, 15 de septiembre de 2021

ReactiveX


ReactiveX es un framework para los lenguajes de programación más utilizados: Java, JavaScript, C#, Scala, Clojure, C ++, Ruby, Python, Groovy, JRuby, Kotlin, Swift y más. 

RxJava es un framwork que implementa los conceptos de ReactiveX en Java. Veamos un ejemplo RxJava:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5);

Observable.from(input).filter(new Func1() {

    @Override

     public Boolean call(Integer x) {

          return x % 2 == 0;

     }

})

o usando lambda : 

Observable.from(input).filter(x -> x % 2 == 0);


El objeto resultante (la instancia de rx.Observable) generará una secuencia de los números pares contenidos en la secuencia de entrada: 2 y 4.

En RxJava, rx.Observable agrega dos semánticas al patrón Observador de Gang of Four (la semántica predeterminada es emitir elementos creados, como una lista con elementos 2,4 en el ejemplo anterior):

  • El productor puede notificar al consumidor que no hay más datos disponible.
  • El productor puede notificar al consumidor que ha ocurrido un error.
La biblioteca RxJava proporciona un modelo de programación donde podemos trabajar con eventos generados desde UI o llamadas asincrónicas de la misma manera en que operamos con colecciones y streams en Java 8.

La biblioteca RxJava se creó en Netflix como una alternativa más inteligente a Java Futures y devoluciones de llamada. Tanto los futuros como las devoluciones de llamada son fáciles de usar cuando solo hay un nivel de ejecución asincrónica, pero son difíciles de administrar cuando están anidados.

El siguiente ejemplo muestra cómo se maneja el problema de las devoluciones de llamada anidadas en RxJava.

Suponga que necesita llamar a una API remota para autenticar a un usuario, luego a otra para obtener los datos del usuario y a otra API para obtener los contactos de un usuario. Normalmente, tendría que escribir llamadas a API anidadas y hacer complejos callbacks. Pero con RxJava se puede hacer así : 

serviceEndpoint.login().doOnNext(accessToken -> storeCredentials(accessToken)).flatMap(accessToken -> serviceEndpoint.getUser()).flatMap(user -> serviceEndpoint.getUserContact(user.getId()))