sábado, 30 de octubre de 2021

Primeros pasos con Apache Kafka parte 18


Seguimos con Kafka

El primer paso para comenzar a consumir registros es crear una instancia de KafkaConsumer. Crear un KafkaConsumer es muy similar a crear un KafkaProducer: crea una instancia de propiedades de Java con las propiedades que desea pasar al consumidor. Para comenzar, solo necesitamos usar las tres propiedades obligatorias: bootstrap.servers, key.deserializer y value.deserializer.

La primera propiedad, bootstrap.servers, es la cadena de conexión a un clúster de Kafka. Se usa exactamente de la misma manera que en KafkaProducer. Las otras dos propiedades, key.deserializer y value.deserializer, son similares a los serializadores definidos para el productor, pero en lugar de especificar clases que conviertan los objetos Java en matrices de bytes, debe especificar clases que puedan tomar una matriz de bytes y convertirla en un objeto Java.

Hay una cuarta propiedad, que no es estrictamente obligatoria, pero por ahora pretendemos que lo es. La propiedad es group.id y especifica el grupo de consumidores al que pertenece la instancia de KafkaConsumer. Si bien es posible crear consumidores que no pertenecen a ningún grupo de consumidores, esto es poco común, por lo que durante la mayor parte del capítulo asumiremos que el consumidor es parte de un grupo.

El siguiente fragmento de código muestra cómo crear un KafkaConsumer:

Properties props = new Properties();

props.put("bootstrap.servers", "broker1:9092,broker2:9092");

props.put("group.id", "CountryCounter");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

La mayor parte de lo que ve aquí debería resultarle familiar sobre la creación de productores. Suponemos que los registros que consumimos tendrán objetos String como clave y valor del registro. La única propiedad nueva aquí es group.id, que es el nombre del grupo de consumidores al que pertenece este consumidor.

Microservicios con Spring


Spring se ha convertido en el framework de desarrollo de facto para crear aplicaciones basadas en Java. En esencia, Spring se basa en el concepto de inyección de dependencia. En una aplicación Java normal, la aplicación se descompone en clases en las que cada clase suele tener vínculos explícitos con otras clases de la aplicación. Los enlaces son la invocación de un constructor de clase directamente en el código. Una vez que se compila el código, estos puntos de enlace no se pueden cambiar.

Esto es problemático en un proyecto grande porque estos vínculos externos son frágiles y hacer un cambio puede resultar en múltiples impactos posteriores en otro código. Un marco de inyección de dependencia, como Spring, le permite administrar más fácilmente grandes proyectos de Java al externalizar la relación entre los objetos dentro de su aplicación a través de convenciones (y anotaciones) en lugar de aquellos objetos que tienen conocimientos codificados entre sí. Spring actúa como intermediario entre las diferentes clases de Java de su aplicación y administra sus dependencias. Spring esencialmente le permite ensamblar su código como un conjunto de ladrillos de Lego que se unen.

La rápida inclusión de funciones de Spring impulsó su utilidad, y el marco se convirtió rápidamente en una alternativa más liviana para los desarrolladores de aplicaciones empresariales de Java que buscaban una forma de crear aplicaciones utilizando la pila J2EE. El stack J2EE, aunque poderosa, fue considerada por muchos como bloatware, con muchas características que nunca fueron utilizadas por los equipos de desarrollo de aplicaciones. Además, una aplicación J2EE le obligó a utilizar un servidor de aplicaciones Java completo (y pesado) para implementar sus aplicaciones.

Lo sorprendente del marco de Spring y un testimonio de su comunidad de desarrollo es su capacidad para mantenerse relevante y reinventarse. El equipo de desarrollo de Spring vio rápidamente que muchos equipos de desarrollo se estaban alejando de las aplicaciones monolíticas donde la presentación de la aplicación, el negocio y la lógica de acceso a los datos estaban empaquetados y desplegados como un solo artefacto. En cambio, los equipos se estaban moviendo hacia modelos altamente distribuidos donde los servicios se construían como servicios pequeños y distribuidos que podían implementarse fácilmente en la nube. En respuesta a este cambio, el equipo de desarrollo de Spring lanzó dos proyectos: Spring Boot y Spring Cloud.

Spring Boot es una nueva visión del framework de Spring. Si bien abarca las características centrales de Spring, Spring Boot elimina muchas de las características "empresariales" que se encuentran en Spring y, en su lugar, ofrece un marco orientado a microservicios basados ​​en Java y orientados a REST (Transferencia de estado representacional). Con unas pocas anotaciones simples, un desarrollador de Java puede crear rápidamente un microservicio REST que se puede empaquetar e implementar sin la necesidad de un contenedor de aplicaciones externo.

Debido a que los microservicios se han convertido en uno de los patrones arquitectónicos más comunes para la creación de aplicaciones basadas en la nube, la comunidad de desarrollo de Spring nos ha proporcionado Spring Cloud. El framework de Spring Cloud simplifica la puesta en funcionamiento e implementación de microservicios en una nube pública o privada. Spring Cloud incluye varios frameworks de microservicios de gestión de la nube populares en un framework común y hace que el uso y la implementación de estas tecnologías sean tan fáciles de usar como anotar su código. 

viernes, 29 de octubre de 2021

Que son los Microservicios?


La única constante en el campo del desarrollo de software es que nosotros, como desarrolladores de software, nos sentamos en medio de un mar de caos y cambio. Todos sentimos la rotación cuando las nuevas tecnologías y enfoques aparecen repentinamente en escena, lo que nos hace reevaluar cómo construimos y entregamos soluciones para nuestros clientes. Un ejemplo de este abandono es la rápida adopción por parte de muchas organizaciones de crear aplicaciones utilizando microservicios. Los microservicios son servicios de software distribuidos, débilmente acoplados, que llevan a cabo una pequeña cantidad de tareas bien definidas.

Antes de que evolucionara el concepto de microservicios, la mayoría de las aplicaciones basadas en web se creaban con un estilo arquitectónico monolítico. En una arquitectura monolítica, una aplicación se entrega como un único artefacto de software implementable. Toda la IU (interfaz de usuario), el negocio y la lógica de acceso a la base de datos se empaquetan en un solo artefacto de aplicación y se implementan en un servidor de aplicaciones.

Si bien una aplicación puede implementarse como una sola unidad de trabajo, la mayoría de las veces habrá varios equipos de desarrollo trabajando en la aplicación. Cada equipo de desarrollo tendrá sus propias piezas discretas de la aplicación de las que son responsables y, a menudo, clientes específicos a los que atienden con su pieza funcional. Por ejemplo, cuando trabajaba en una gran empresa de servicios financieros, teníamos una aplicación interna de gestión de relaciones con el cliente (CRM) personalizada que implicaba la coordinación de varios equipos, incluidos la interfaz de usuario, el cliente maestro, el almacén de datos y el equipo de fondos mutuos. 

El problema aquí es que a medida que aumentaba el tamaño y la complejidad de la aplicación CRM monolítica, los costos de comunicación y coordinación de los equipos individuales que trabajaban en la aplicación no aumentaban. Cada vez que un equipo individual necesitaba hacer un cambio, toda la aplicación tenía que reconstruirse, volverse a probar y volver a implementar.

El concepto de microservicio se infiltró originalmente en la conciencia de la comunidad de desarrollo de software alrededor de 2014 y fue una respuesta directa a muchos de los desafíos de tratar de escalar aplicaciones monolíticas tanto técnica como organizativamente grandes. Recuerde, un microservicio es un servicio distribuido pequeño, débilmente acoplado.

Los microservicios le permiten tomar una aplicación grande y descomponerla en componentes fáciles de administrar con responsabilidades estrictamente definidas. Los microservicios ayudan a combatir los problemas tradicionales de complejidad en una base de código grande al descomponer la base de código grande en partes pequeñas y bien definidas. El concepto clave que debe adoptar al pensar en microservicios es descomponer y desagregar la funcionalidad de sus aplicaciones para que sean completamente independientes entre sí.

Si tomamos la aplicación CRM y la descomponemos en microservicios, podría verse como que cada equipo funcional es dueño por completo de su código de servicio y su infraestructura de servicio. Pueden compilar, implementar y probar de forma independiente entre sí porque su código, repositorio de control de fuente y la infraestructura (servidor de aplicaciones y base de datos) ahora son completamente independientes de las otras partes de la aplicación.

Una arquitectura de microservicio tiene las siguientes características:

  • La lógica de la aplicación se divide en componentes de grano pequeño con límites de responsabilidad bien definidos que se coordinan para ofrecer una solución.
  • Cada componente tiene un pequeño dominio de responsabilidad y se implementa de forma completamente independiente entre sí. Los microservicios deben tener la responsabilidad de una sola parte de un dominio empresarial. Además, un microservicio debe poder reutilizarse en varias aplicaciones.
  • Los microservicios se comunican basándose en algunos principios básicos (fíjese que dije principios, no estándares) y emplean protocolos de comunicación ligeros como HTTP y JSON (notación de objetos JavaScript) para intercambiar datos entre el consumidor y el proveedor de servicios.
  • La implementación técnica subyacente del servicio es irrelevante porque las aplicaciones siempre se comunican con un protocolo de tecnología neutral (JSON es el más común). Esto significa que una aplicación creada con una aplicación de microservicio podría crearse con varios lenguajes y tecnologías.
  • Los microservicios, por su naturaleza pequeña, independiente y distribuida, permiten a las organizaciones tener pequeños equipos de desarrollo con áreas de responsabilidad bien definidas. Estos equipos pueden trabajar hacia un único objetivo, como entregar una aplicación, pero cada equipo es responsable solo de los servicios en los que están trabajando.

A menudo bromeo con mis colegas diciendo que los microservicios son la droga de entrada para crear aplicaciones en la nube. Usted comienza a construir microservicios porque le brindan un alto grado de flexibilidad y autonomía con sus equipos de desarrollo, pero usted y su equipo descubren rápidamente que la naturaleza pequeña e independiente de los microservicios los hace fácilmente implementables en la nube. Una vez que los servicios están en la nube, su pequeño tamaño facilita la puesta en marcha de una gran cantidad de instancias del mismo servicio y, de repente, sus aplicaciones se vuelven más escalables y, con previsión, más resistentes.

miércoles, 27 de octubre de 2021

Welcome to AWS Skill Builder


Amazon lanzo una pagina llamada skill builder, la idea es que más gente aprenda y se certifique. Por lo tanto Amazon centralizo el training. 

Aquí puede encontrar formación digital para su función o intereses. Bueno sin más... 


Dejo link: https://explore.skillbuilder.aws/learn


domingo, 24 de octubre de 2021

Primeros pasos con Apache Kafka parte 17


Seguimos con Kafka

Los consumidores de un grupo de consumidores comparten las propiedades de las particiones en los temas a los que se suscriben. Cuando agregamos un nuevo consumidor al grupo, comienza a consumir mensajes de particiones previamente consumidas por otro consumidor. Lo mismo sucede cuando un consumidor se apaga o se bloquea; abandona el grupo, y las particiones que solía consumir serán consumidas por uno de los consumidores restantes. La reasignación de particiones a los consumidores también ocurre cuando los temas que consume el grupo de consumidores se modifican (por ejemplo, si un administrador agrega nuevas particiones). Mover la propiedad de una partición de un consumidor a otro se denomina reequilibrio.

Los reequilibrios son importantes porque brindan al grupo de consumidores una alta disponibilidad y escalabilidad (lo que nos permite agregar y eliminar consumidores de manera fácil y segura), pero en el curso normal de los eventos son bastante indeseables. Durante un reequilibrio, los consumidores no pueden consumir mensajes, por lo que un reequilibrio es básicamente una breve ventana de indisponibilidad de todo el grupo de consumidores. Además, cuando las particiones se mueven de un consumidor a otro, el consumidor pierde su estado actual; si estaba almacenando en caché algún dato, deberá actualizar sus cachés, lo que ralentizará la aplicación hasta que el consumidor configure su estado nuevamente. 

La forma en que los consumidores mantienen la membresía en un grupo de consumidores y la propiedad de las particiones que se les asignan es enviando latidos a un corredor de Kafka designado como coordinador del grupo (este corredor puede ser diferente para diferentes grupos de consumidores). Siempre que el consumidor envíe latidos a intervalos regulares, se asume que está vivo, bien y procesando mensajes de sus particiones. Los latidos se envían cuando el consumidor realiza una encuesta (es decir, recupera registros) y cuando confirma los registros que ha consumido.

Si el consumidor deja de enviar latidos durante el tiempo suficiente, su sesión expirará y el coordinador del grupo la considerará muerta y provocará un reequilibrio. Si un consumidor falla y deja de procesar mensajes, el coordinador del grupo tardará unos segundos sin latidos en decidir que está muerto y activar el reequilibrio. Durante esos segundos, no se procesarán mensajes de las particiones propiedad del consumidor muerto. Al cerrar un consumidor limpiamente, el consumidor notificará al coordinador del grupo que se va, y el coordinador del grupo activará un reequilibrio de inmediato, reduciendo la brecha en el procesamiento. 

Con las versiones más recientes de Kafka, puede configurar cuánto tiempo puede pasar la aplicación sin sondear antes de que abandone el grupo y active un reequilibrio. Esta configuración se utiliza para evitar un bloqueo en vivo, en el que la aplicación no se bloqueó pero no logró avanzar por alguna razón. Esta configuración es independiente de session.time out.ms, que controla el tiempo que se tarda en detectar un bloqueo del consumidor y dejar de enviar latidos.


viernes, 22 de octubre de 2021

Mutiny!

 



Si leen este blog, algo sabran de reactive programming y de quarkus, Mutiny! es el framework elegido por la gente de red hat para implementar reactive programming en quarkus.

Podemos usarlo en nuestros proyectos, sin necesidad de usar quarkus, es decir, de forma independiente. A la vez se acopla muy bien con vert.x. Y entre sus características, podemos nombrar que es un framework moderno, liviano y pequeño. 

Veamos un "hola mundo" : 

import io.smallrye.mutiny.Uni;


public class FirstProgram {


  public static void main(String[] args) {

    Uni.createFrom().item("hello")

      .onItem().transform(item -> item + " mutiny")

      .onItem().transform(String::toUpperCase)

      .subscribe().with(

        item -> System.out.println(">> " + item));

  }

}

Podemos ver como sigue los conceptos básicos de un framework reactive, es decir, tiene Observable, transformaciones y suscriptores.  Lo interesante es cómo se construye el mensaje. Describimos una tubería de procesamiento que toma un item, lo procesa y finalmente lo consume.

Primero, creamos un Uni, uno de los dos tipos con Multi que proporciona Mutiny. Un Uni es un flujo que emite un solo elemento o un error.

Aquí, creamos un Uni que emite el elemento "hello". Esta es la entrada de nuestro pipeline. Luego procesamos este item:

  • agregamos "mutiny", luego
  • lo convertimos en una cadena en mayúsculas.
  • Esto forma la parte de procesamiento de nuestra canalización, y luego finalmente nos suscribimos a la canalización.


Esta última parte es fundamental. Si no tiene un suscriptor final, no sucederá nada. Los tipos de mutiny son perezosos, lo que significa que debes suscribirte. Si no lo hace, el cálculo ni siquiera comenzará.

Si su programa no hace nada, tenemos que verificar que no se nos olvidó de suscribirnos.

Dejo link : https://smallrye.io/smallrye-mutiny/ 

martes, 19 de octubre de 2021

Componer y transformar observables en RxJava


Los observables son especialmente buenos para ser compuestos y transformados. Con el uso de algunos operadores definidos en RxJava, puede componer y transformar secuencias de datos de una manera fácil que requiere poca codificación, por lo que es menos propenso a errores. 

map: permite transformar cada elemento de la secuencia emitida con una función específica.

flatMap: realiza dos tipos de acciones: la acción de "map" que transforma los elementos emitidos en observables y una acción de "aplanar" que convierte esos observables en uno observable. 

concatMap: el operador concatMap se comporta como flatMap, excepto que asegura que los observables no estén intercalados sino concatenados, manteniendo su orden.

zip: el operador zip toma múltiples observables como entradas y combina cada emisión a través de una función específica y emite los resultados de esta función como una nueva secuencia.

concat: concatena dos o más emisiones, generando una emisión donde todos los elementos de la primera fuente de emisión aparecen antes que los elementos de la segunda fuente de emisión. Además, el operador de concat espera a que se complete cada secuencia antes de suscribirse al siguiente observable.

filter: utiliza una función específica para permitir que solo se emitan algunos elementos de la secuencia de origen.

distinct: Si un elemento se emite más de una vez, solo se emitirá la primera aparición.

first: emite solo el primer elemento de una secuencia. Si se especifica una función, se utilizará para filtrar los elementos, por lo que solo se emitirá el primer elemento de la secuencia que cumpla las condiciones.

last: si puede aplicar un filtro al comienzo de la secuencia con el operador primero, también puede filtrar el final de la secuencia con el operador al final .

take: Toma el elemento que se encuentra en la posición n, lo que permite que solo se emitan los primeros n elementos.

startWith: toma la secuencia de entrada y le agrega un elemento determinado. Puede ser útil si desea forzar que su secuencia comience con un valor predeterminado o con uno almacenado en caché.

scan: el escaneo del operador toma una secuencia y aplica una función a cada par de elementos emitidos secuencialmente.

domingo, 17 de octubre de 2021

El estado del ecosistema del desarrollador 2021


Quiero compartirles el informe que hizo los amigos de jetbrains, sobre  el estado del ecosistema del desarrollador 2021. 

Dejo link: https://www.jetbrains.com/es-es/lp/devecosystem-2021/

sábado, 16 de octubre de 2021

Primeros pasos con Apache Kafka parte 16

Seguimos con Kafka. 

Existe la necesidad de escalar el consumo por temas. Al igual que varios productores pueden escribir sobre el mismo tema, debemos permitir que varios consumidores lean el mismo tema, dividiendo los datos entre ellos.

Los consumidores de Kafka suelen formar parte de un grupo de consumidores. Cuando varios consumidores están suscritos a un tema y pertenecen al mismo grupo de consumidores, cada consumidor del grupo recibirá mensajes de un subconjunto diferente de las particiones del tema.

Tomemos el tema T1 con cuatro particiones. Ahora suponga que creamos un nuevo consumidor, C1, que es el único consumidor del grupo G1, y lo usamos para suscribirse al tema T1. El consumidor C1 recibirá todos los mensajes de las cuatro particiones t1.

Si agregamos otro consumidor, C2, al grupo G1, cada consumidor solo recibirá mensajes de dos particiones. Quizás los mensajes de la partición 0 y 2 van a C1 y los mensajes de las particiones 1 y 3 van al consumidor C2.

Si G1 tiene cuatro consumidores, cada uno leerá los mensajes de una sola partición.

Si agregamos más consumidores a un solo grupo con un solo tema que las particiones que tenemos, algunos de los consumidores estarán inactivos y no recibirán ningún mensaje.

La principal forma en que escalamos el consumo de datos de un tema de Kafka es agregando más consumidores a un grupo de consumidores. Es común que los consumidores de Kafka realicen operaciones de alta latencia, como escribir en una base de datos o un cálculo lento de los datos. En estos casos, es posible que un solo consumidor no pueda mantenerse al día con los flujos de datos de velocidad en un tema, y agregar más consumidores que compartan la carga al hacer que cada consumidor posea solo un subconjunto de las particiones y los mensajes es nuestro método principal de escalado. Esta es una buena razón para crear temas con una gran cantidad de particiones: permite agregar más consumidores cuando aumenta la carga. Tenga en cuenta que no tiene sentido agregar más consumidores de los que tiene particiones en un tema; algunos de los consumidores simplemente estarán inactivos. 

Además de agregar consumidores para escalar una sola aplicación, es muy común tener múltiples aplicaciones que necesitan leer datos del mismo tema. De hecho, uno de los principales objetivos de diseño en Kafka era hacer que los datos producidos para los temas de Kafka estuvieran disponibles para muchos casos de uso en toda la organización. En esos casos, queremos que cada aplicación obtenga todos los mensajes, en lugar de solo un subconjunto. Para asegurarse de que una aplicación reciba todos los mensajes de un tema, asegúrese de que la aplicación tenga su propio grupo de consumidores. A diferencia de muchos sistemas de mensajería tradicionales, Kafka se adapta a una gran cantidad de consumidores y grupos de consumidores sin reducir el rendimiento. En el ejemplo anterior, si agregamos un nuevo grupo de consumidores G2 con un solo consumidor, este consumidor obtendrá todos los mensajes del tema T1 independientemente de lo que esté haciendo G1. G2 puede tener más de un consumidor, en cuyo caso cada uno obtendrá un subconjunto de particiones, tal como mostramos para G1, pero G2 en su conjunto seguirá recibiendo todos los mensajes independientemente de otros grupos de consumidores.

Para resumir, crea un nuevo grupo de consumidores para cada aplicación que necesita todos los mensajes de uno o más temas. Agrega consumidores a un grupo de consumidores existente para escalar la lectura y el procesamiento de mensajes de los temas, por lo que cada consumidor adicional en un grupo solo obtendrá un subconjunto de los mensajes.


viernes, 15 de octubre de 2021

[O'Reilly eBook] Web Application Security

 Me llego un mail con este libro gratuito y quiero compartirlo con ustedes: 

EBOOK

Web Application Security Ebook: Exploitation and Countermeasures for Modern Web Applications

While many resources for network and IT security are available, detailed knowledge regarding modern web application security has been lacking – until now. This practical guide provides both offensive and defensive security concepts that software engineers can easily learn and apply.

NGINX is proud to make the O’Reilly eBook, Web Application Security, available for free download with our compliments. This eBook is written by Andrew Hoffman, a senior security engineer at Salesforce, and introduces three pillars of web application security: recon, offense, and defense. It also features a foreword by Chris Witeck of NGINX at F5.

Download this eBook to learn:

  • About common vulnerabilities plaguing today's web applications
  • How to deploy mitigations to protect your applications against hackers
  • Practical tips to help you improve the overall security of your web applications

miércoles, 13 de octubre de 2021

Primeros pasos con Apache Kafka parte 15


Hasta ahora, hemos discutido las características del particionador predeterminado, que es el que se usa con más frecuencia. Sin embargo, Kafka no lo limita a particiones hash y, a veces, existen buenas razones para particionar los datos de manera diferente. Por ejemplo, suponga que es un proveedor B2B y su mayor cliente es una empresa que fabrica dispositivos portátiles llamados Bananas. Suponga que hace tantos negocios con el cliente "Banana" que más del 10% de sus transacciones diarias son con este cliente. Si usa la partición de hash predeterminada, los registros de Banana se asignarán a la misma partición que otras cuentas, lo que dará como resultado que una partición sea aproximadamente el doble de grande que el resto. Esto puede hacer que los servidores se queden sin espacio, que el procesamiento se ralentice, etc. Lo que realmente queremos es darle a Banana su propia partición y luego usar particiones hash para asignar el resto de las cuentas a las particiones.

A continuación, se muestra un ejemplo de un particionador personalizado:

import org.apache.kafka.clients.producer.Partitioner;

import org.apache.kafka.common.Cluster;

import org.apache.kafka.common.PartitionInfo;

import org.apache.kafka.common.record.InvalidRecordException;

import org.apache.kafka.common.utils.Utils;


public class BananaPartitioner implements Partitioner {


public void configure(Map<String, ?> configs) {}

public int partition(String topic, Object key, byte[] keyBytes,

        Object value, byte[] valueBytes,

        Cluster cluster) {

    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

    int numPartitions = partitions.size();

    if ((keyBytes == null) || (!(key instanceOf String))) 

        throw new InvalidRecordException("We expect all messages to have customer name as key")

    if (((String) key).equals("Banana"))

        return numPartitions; // Banana siempre va estar en la ultima partición

    return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))

}

public void close() {}

}

lunes, 11 de octubre de 2021

Creando Observables con RxJava



La forma más sencilla de crear un observable es utilizar los métodos de fabricación que se implementan en la biblioteca RxJava. Ya usamos el método Observable.from(), en un post anterior.  

Observable.just() crea un Observable que emite el objeto o los objetos que se pasan como parámetros:

Observable.just(1, 2, 3, 4, 5)

Observable.range (a, n) crea un Observable que emite un rango de n enteros consecutivos a partir de a :

Observable.range(1, 5)

Observable.interval (long, TimeUnit) : crea un Observable que emite una secuencia de enteros a partir de 0 que están espaciados por un intervalo de tiempo dado. El primer argumento es la cantidad de tiempo y el segundo argumento define la unidad de tiempo. El siguiente observable emite un elemento cada 1 segundo:

Observable.interval(1, TimeUnit.SECONDS)

La secuencia es una secuencia infinital, por lo que onCompleted nunca será notificado. La secuencia se detiene solo cuando no hay más observadores conectados (suscritos) a lo observable. 

Observable.timer (long, TimeUnit) crea un Observable que emite solo un elemento después de un retraso determinado.

Observable.create () es el método permite crear un Observable desde cero. Por ejemplo, si desea crear un observable que emita solo una cadena, "¡Hola!", Puede escribir :

Observable.create(
    new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> observer) {
            observer.onNext("Hello!");
            observer.onCompleted();
        }
    }
);

Observable.empty () crea un Observable que emite una secuencia vacía (cero elementos) y luego se completa. Por lo tanto, solo se notificará a onCompleted().
Puede ser útil si desea emitir una secuencia vacía en lugar de emitir elementos nulos o arrojar errores

Observable.error (throwable) crea un Observable que emite una secuencia vacía (cero elementos) y luego notifica un error. Por lo tanto, solo se llamará a onError().

Observable.never () crea un Observable que emite una secuencia vacía (cero elementos) y nunca se completa. No se invocará ningún método del observador.

Observable.defer () crea un Observable solo cuando un suscriptor se suscribe.
La mejor manera de explicar qué hace defer () es con el siguiente ejemplo. Empecemos por la clase Persona, que tiene dos campos: nombre y edad.

class Person {
    private String name;
    private int age;
    // geter y setters
}

Ahora creamos una instancia de Person, y dos Observables para ser notificados con los valores de edad y nombre :

// create a new instance of Person
final Person person = new Person();
Observable<String> nameObservable = Observable.just(person.getName());
Observable<Integer> ageObservable = Observable.just(person.getAge());

// set age and name
person.setName("Bob");
person.setAge(35);
ageObservable.subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(Integer age) {
        System.out.println("age is: " + age);
    }
});

nameObservable.subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(String name) {
        System.out.println("name is: " + name);
    }
});

¿Qué sucede cuando llama a los métodos observeName () y observeAge () en una instancia de Person? ¿Cuál será la secuencia emitida por los observables? Desafortunadamente, la salida será

age is: 0
name is: null

El problema aquí es que Observable.just() se evalúa tan pronto como se invoca, por lo que creará una secuencia utilizando el valor exacto de ese nombre y referencia de edad cuando se crea el observable. En el ejemplo, cuando se crea el observable, la edad es 0 y el nombre es nulo.

Para esto existe Observable.defer ().

Observable<String> nameObservable = Observable.defer(new
    Func0<Observable<String>>() {
        @Override
        public Observable<String> call() {
            return Observable.just(person.getName());
        }
});

Observable<Integer> ageObservable = Observable.defer(new Func0<Observable<Integer>>() {
    @Override
    public Observable<Integer> call() {
        return Observable.just(person.getAge());
    }
});

Al usar estos dos observables, la salida de los ejemplos anteriores se convierte en

age is: 35
name is: Bob






viernes, 8 de octubre de 2021

Observables calientes y fríos


Un Observable comienza a emitir una secuencia de elementos cuando el Observador se suscribe: se denominan observables fríos. Los observables fríos siempre esperan tener al menos un observador suscrito para comenzar a emitir elementos.

Por otro lado, un observable que comienza a emitir elementos antes de conectarse a un observador se denomina observable caliente. Con los observables calientes, un observador puede suscribirse y comenzar a recibir elementos en cualquier momento durante la emisión. Con observables calientes, el observador puede recibir la secuencia completa de elementos comenzando desde el principio o no.

Veamos un ejemplo más concreto, pero simple. 

Creemos un Observable que emita todos los números enteros del 1 al 5 y suscríbase a él:

Observable<Integer> observable = Observable.from(new Integer[]{1, 2, 3, 4, 5});

observable.subscribe(new Subscriber<Integer>() {

    @Override

    public void onCompleted() {

        System.out.println("Sequence completed!");

    }

    @Override

    public void onError(Throwable e) {

        System.err.println("Exception: " + e.getMessage());

    }

    @Override

    public void onNext(Integer integer) {

        System.out.println("next item is: " + integer);

    }

});

La salida esperada es

next item is: 1
next item is: 2
next item is: 3
next item is: 4
next item is: 5
Sequence completed!

Este es un observable frío porque comenzará a emitir elementos solo cuando el observador se suscriba. 
El observable generará una secuencia de cinco elementos, cada uno representando un objeto entero (de 1 a 5), ​​por lo que el método onNext del observador se invocará cinco veces. Al final de la secuencia, se notificará el método onCompleted. El método onError nunca será notificado porque esta secuencia no genera ningún tipo de error o excepción.

Un ejemplo de un observable caliente podría ser un observable que emite un evento cada vez que se hace clic en un botón de la interfaz de usuario. No comienza a emitir eventos cuando el observador se suscribe; emite eventos incluso si no hay ningún suscriptor suscrito. 

En este ejemplo, crea,ps un observable usando el método Observable.from (), un método de fábrica estático que puede crear un Observable a partir de un matriz, iterable o Future.

Esta no es la única forma de crear observables. Pero esa es una historia para otro post...


lunes, 4 de octubre de 2021

Primeros pasos con Apache Kafka parte 14


 Seguimos con Kafka. 

Los objetos ProducerRecord incluyen un nombre de tema, una clave y un valor. Los mensajes de Kafka son pares clave-valor y, si bien es posible crear un ProducerRecord con solo un tema y un valor, con la clave establecida en nula de forma predeterminada, la mayoría de las aplicaciones producen registros con claves. Las claves sirven para dos objetivos: son información adicional que se almacena con el mensaje y también se utilizan para decidir en cuál de las particiones de tema se escribirá el mensaje. Todos los mensajes con la misma clave irán a la misma partición. Esto significa que si un proceso está leyendo solo un subconjunto de las particiones en un tema, todos los registros para una sola clave serán leídos por el mismo proceso. Para crear un registro de valor-clave, simplemente cree un ProducerRecord de la siguiente manera:

ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");

Al crear mensajes con una clave nula, simplemente puede omitir la clave:

ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "USA");


Aquí, la clave simplemente se establecerá en nula, lo que puede indicar que faltaba el nombre de un cliente en un formulario.

Cuando la clave es nula y se usa el particionador predeterminado, el registro se enviará a una de las particiones disponibles del tema al azar. Se utilizará un algoritmo de operación por turnos para equilibrar los mensajes entre las particiones.

Si existe una clave y se usa el particionador predeterminado, Kafka aplicará un hash a la clave (usando su propio algoritmo hash, por lo que los valores hash no cambiarán cuando se actualice Java) y usará el resultado para asignar el mensaje a una partición específica. Dado que es importante que una clave siempre se asigne a la misma partición, usamos todas las particiones del tema para calcular la asignación, no solo las particiones disponibles. Esto significa que si una partición específica no está disponible cuando escribe datos en ella, es posible que obtenga un error. Esto es bastante raro, como verá en el Capítulo 6 cuando analicemos la replicación y disponibilidad de Kafka.

El mapeo de claves a particiones es consistente solo mientras no cambie el número de particiones en un tema. Entonces, siempre que el número de particiones sea constante, puede estar seguro de que, por ejemplo, los registros relacionados con el usuario 045189 siempre se escribirán en la partición 34. Esto permite todo tipo de optimización al leer datos de particiones. Sin embargo, en el momento en que agrega nuevas particiones al tema, esto ya no está garantizado; los registros antiguos permanecerán en la partición 34, mientras que los registros nuevos se escribirán en una partición diferente. Cuando la partición de claves es importante, la solución más sencilla es crear temas con suficientes particiones y nunca agregar particiones.

viernes, 1 de octubre de 2021

onNext, onCompleted, onError


Como vimos anteriormente un flujo de datos lanza diferentes tipos de señales, next si hay un próximo dato y complete si se finalizo el flujo de datos. 

La interfaz rx.Observer <T> define los métodos, onNext, onCompleted, onError(Throwable). onError nos indica que sucedió un error y el flujo de datos no puede trasmitir más. 

Veamos un ejemplo: 

public void subscribeToObservable(Observable<T> observable) {

observable.subscribe(new Subscriber<>() {

@Override

public void onCompleted() {

    // invoked when Observable stops emitting items

}

@Override

public void onError(Throwable e) {

    // invoked when Observable throws an exception

    // while emitting items

}

@Override

public void onNext(T nextItem) {

    // invoked when Observable emits an item

    // usually you will consume the nextItem here

}

});

}


Usamos Subscriber<T> dado que es un objeto que implementa la interfaz rx.Observer <T>. La razón por la que utiliza Subscriber en lugar de cualquier otra implementación de la interfaz de Observer es que el Subscriber también implementa la interfaz de Suscripción, que le permite verificar si el suscriptor está cancelado (con el método isUnsubscriptions ()) y cancelar su suscripción.

En el ejemplo anterior, observe que un observador reacciona a tres tipos de eventos:

  • onNext: Ocurre cero, una o más veces. Si la secuencia se completa correctamente, el método onNext se invocará tantas veces como el número de elementos de la secuencia. Si se produce un error en un momento determinado, el método onNext no se invocará más.
  • onCompleted: solo cuando todos los elementos de la secuencia se emitan correctamente, se invocará el método onCompleted. Se invoca solo una vez y después de que se haya emitido el último elemento. No va a ser llamado nunca en una secuencia infinita.
  • onError: puede ocurrir un error en cada momento de la secuencia, y la secuencia se detendrá inmediatamente. En este caso, se invocará el método onError, pasando el error como objeto Throwable. Los otros dos métodos, onNext y onCompleted, no se invocarán, luego de este. 

Un observable no puede notificar los métodos onCompleted y onError, solo uno de ellos. Siempre será el último método invocado.

Con el método Observable.subscribe () (una operación llamada suscripción), puede conectar un Observable a un Observer, pero ¿qué sucede si desea desconectarlos? Esta la operación llamada unsubscribe:

// disconnect observable and observer
subscription.unsubscribe()

Se puede verificar si la suscripción se ha cancelado con el siguiente método:

subscription.isUnsubscribed()

Luego de la cancelación de suscripción, onNext no recibirá ningún otro elemento y los otros dos métodos, onCompleted y onError, no serán notificados. Después de la cancelación de la suscripción, el observable puede detenerse o continuar con la emisión, pero no se notificará al observador al respecto.