Translate

viernes, 29 de octubre de 2021

Que son los Microservicios?


La única constante en el campo del desarrollo de software es que nosotros, como desarrolladores de software, nos sentamos en medio de un mar de caos y cambio. Todos sentimos la rotación cuando las nuevas tecnologías y enfoques aparecen repentinamente en escena, lo que nos hace reevaluar cómo construimos y entregamos soluciones para nuestros clientes. Un ejemplo de este abandono es la rápida adopción por parte de muchas organizaciones de crear aplicaciones utilizando microservicios. Los microservicios son servicios de software distribuidos, débilmente acoplados, que llevan a cabo una pequeña cantidad de tareas bien definidas.

Antes de que evolucionara el concepto de microservicios, la mayoría de las aplicaciones basadas en web se creaban con un estilo arquitectónico monolítico. En una arquitectura monolítica, una aplicación se entrega como un único artefacto de software implementable. Toda la IU (interfaz de usuario), el negocio y la lógica de acceso a la base de datos se empaquetan en un solo artefacto de aplicación y se implementan en un servidor de aplicaciones.

Si bien una aplicación puede implementarse como una sola unidad de trabajo, la mayoría de las veces habrá varios equipos de desarrollo trabajando en la aplicación. Cada equipo de desarrollo tendrá sus propias piezas discretas de la aplicación de las que son responsables y, a menudo, clientes específicos a los que atienden con su pieza funcional. Por ejemplo, cuando trabajaba en una gran empresa de servicios financieros, teníamos una aplicación interna de gestión de relaciones con el cliente (CRM) personalizada que implicaba la coordinación de varios equipos, incluidos la interfaz de usuario, el cliente maestro, el almacén de datos y el equipo de fondos mutuos. 

El problema aquí es que a medida que aumentaba el tamaño y la complejidad de la aplicación CRM monolítica, los costos de comunicación y coordinación de los equipos individuales que trabajaban en la aplicación no aumentaban. Cada vez que un equipo individual necesitaba hacer un cambio, toda la aplicación tenía que reconstruirse, volverse a probar y volver a implementar.

El concepto de microservicio se infiltró originalmente en la conciencia de la comunidad de desarrollo de software alrededor de 2014 y fue una respuesta directa a muchos de los desafíos de tratar de escalar aplicaciones monolíticas tanto técnica como organizativamente grandes. Recuerde, un microservicio es un servicio distribuido pequeño, débilmente acoplado.

Los microservicios le permiten tomar una aplicación grande y descomponerla en componentes fáciles de administrar con responsabilidades estrictamente definidas. Los microservicios ayudan a combatir los problemas tradicionales de complejidad en una base de código grande al descomponer la base de código grande en partes pequeñas y bien definidas. El concepto clave que debe adoptar al pensar en microservicios es descomponer y desagregar la funcionalidad de sus aplicaciones para que sean completamente independientes entre sí.

Si tomamos la aplicación CRM y la descomponemos en microservicios, podría verse como que cada equipo funcional es dueño por completo de su código de servicio y su infraestructura de servicio. Pueden compilar, implementar y probar de forma independiente entre sí porque su código, repositorio de control de fuente y la infraestructura (servidor de aplicaciones y base de datos) ahora son completamente independientes de las otras partes de la aplicación.

Una arquitectura de microservicio tiene las siguientes características:

  • La lógica de la aplicación se divide en componentes de grano pequeño con límites de responsabilidad bien definidos que se coordinan para ofrecer una solución.
  • Cada componente tiene un pequeño dominio de responsabilidad y se implementa de forma completamente independiente entre sí. Los microservicios deben tener la responsabilidad de una sola parte de un dominio empresarial. Además, un microservicio debe poder reutilizarse en varias aplicaciones.
  • Los microservicios se comunican basándose en algunos principios básicos (fíjese que dije principios, no estándares) y emplean protocolos de comunicación ligeros como HTTP y JSON (notación de objetos JavaScript) para intercambiar datos entre el consumidor y el proveedor de servicios.
  • La implementación técnica subyacente del servicio es irrelevante porque las aplicaciones siempre se comunican con un protocolo de tecnología neutral (JSON es el más común). Esto significa que una aplicación creada con una aplicación de microservicio podría crearse con varios lenguajes y tecnologías.
  • Los microservicios, por su naturaleza pequeña, independiente y distribuida, permiten a las organizaciones tener pequeños equipos de desarrollo con áreas de responsabilidad bien definidas. Estos equipos pueden trabajar hacia un único objetivo, como entregar una aplicación, pero cada equipo es responsable solo de los servicios en los que están trabajando.

A menudo bromeo con mis colegas diciendo que los microservicios son la droga de entrada para crear aplicaciones en la nube. Usted comienza a construir microservicios porque le brindan un alto grado de flexibilidad y autonomía con sus equipos de desarrollo, pero usted y su equipo descubren rápidamente que la naturaleza pequeña e independiente de los microservicios los hace fácilmente implementables en la nube. Una vez que los servicios están en la nube, su pequeño tamaño facilita la puesta en marcha de una gran cantidad de instancias del mismo servicio y, de repente, sus aplicaciones se vuelven más escalables y, con previsión, más resistentes.

miércoles, 27 de octubre de 2021

Welcome to AWS Skill Builder


Amazon lanzo una pagina llamada skill builder, la idea es que más gente aprenda y se certifique. Por lo tanto Amazon centralizo el training. 

Aquí puede encontrar formación digital para su función o intereses. Bueno sin más... 


Dejo link: https://explore.skillbuilder.aws/learn


domingo, 24 de octubre de 2021

Primeros pasos con Apache Kafka parte 17


Seguimos con Kafka

Los consumidores de un grupo de consumidores comparten las propiedades de las particiones en los temas a los que se suscriben. Cuando agregamos un nuevo consumidor al grupo, comienza a consumir mensajes de particiones previamente consumidas por otro consumidor. Lo mismo sucede cuando un consumidor se apaga o se bloquea; abandona el grupo, y las particiones que solía consumir serán consumidas por uno de los consumidores restantes. La reasignación de particiones a los consumidores también ocurre cuando los temas que consume el grupo de consumidores se modifican (por ejemplo, si un administrador agrega nuevas particiones). Mover la propiedad de una partición de un consumidor a otro se denomina reequilibrio.

Los reequilibrios son importantes porque brindan al grupo de consumidores una alta disponibilidad y escalabilidad (lo que nos permite agregar y eliminar consumidores de manera fácil y segura), pero en el curso normal de los eventos son bastante indeseables. Durante un reequilibrio, los consumidores no pueden consumir mensajes, por lo que un reequilibrio es básicamente una breve ventana de indisponibilidad de todo el grupo de consumidores. Además, cuando las particiones se mueven de un consumidor a otro, el consumidor pierde su estado actual; si estaba almacenando en caché algún dato, deberá actualizar sus cachés, lo que ralentizará la aplicación hasta que el consumidor configure su estado nuevamente. 

La forma en que los consumidores mantienen la membresía en un grupo de consumidores y la propiedad de las particiones que se les asignan es enviando latidos a un corredor de Kafka designado como coordinador del grupo (este corredor puede ser diferente para diferentes grupos de consumidores). Siempre que el consumidor envíe latidos a intervalos regulares, se asume que está vivo, bien y procesando mensajes de sus particiones. Los latidos se envían cuando el consumidor realiza una encuesta (es decir, recupera registros) y cuando confirma los registros que ha consumido.

Si el consumidor deja de enviar latidos durante el tiempo suficiente, su sesión expirará y el coordinador del grupo la considerará muerta y provocará un reequilibrio. Si un consumidor falla y deja de procesar mensajes, el coordinador del grupo tardará unos segundos sin latidos en decidir que está muerto y activar el reequilibrio. Durante esos segundos, no se procesarán mensajes de las particiones propiedad del consumidor muerto. Al cerrar un consumidor limpiamente, el consumidor notificará al coordinador del grupo que se va, y el coordinador del grupo activará un reequilibrio de inmediato, reduciendo la brecha en el procesamiento. 

Con las versiones más recientes de Kafka, puede configurar cuánto tiempo puede pasar la aplicación sin sondear antes de que abandone el grupo y active un reequilibrio. Esta configuración se utiliza para evitar un bloqueo en vivo, en el que la aplicación no se bloqueó pero no logró avanzar por alguna razón. Esta configuración es independiente de session.time out.ms, que controla el tiempo que se tarda en detectar un bloqueo del consumidor y dejar de enviar latidos.


viernes, 22 de octubre de 2021

Mutiny!

 



Si leen este blog, algo sabran de reactive programming y de quarkus, Mutiny! es el framework elegido por la gente de red hat para implementar reactive programming en quarkus.

Podemos usarlo en nuestros proyectos, sin necesidad de usar quarkus, es decir, de forma independiente. A la vez se acopla muy bien con vert.x. Y entre sus características, podemos nombrar que es un framework moderno, liviano y pequeño. 

Veamos un "hola mundo" : 

import io.smallrye.mutiny.Uni;


public class FirstProgram {


  public static void main(String[] args) {

    Uni.createFrom().item("hello")

      .onItem().transform(item -> item + " mutiny")

      .onItem().transform(String::toUpperCase)

      .subscribe().with(

        item -> System.out.println(">> " + item));

  }

}

Podemos ver como sigue los conceptos básicos de un framework reactive, es decir, tiene Observable, transformaciones y suscriptores.  Lo interesante es cómo se construye el mensaje. Describimos una tubería de procesamiento que toma un item, lo procesa y finalmente lo consume.

Primero, creamos un Uni, uno de los dos tipos con Multi que proporciona Mutiny. Un Uni es un flujo que emite un solo elemento o un error.

Aquí, creamos un Uni que emite el elemento "hello". Esta es la entrada de nuestro pipeline. Luego procesamos este item:

  • agregamos "mutiny", luego
  • lo convertimos en una cadena en mayúsculas.
  • Esto forma la parte de procesamiento de nuestra canalización, y luego finalmente nos suscribimos a la canalización.


Esta última parte es fundamental. Si no tiene un suscriptor final, no sucederá nada. Los tipos de mutiny son perezosos, lo que significa que debes suscribirte. Si no lo hace, el cálculo ni siquiera comenzará.

Si su programa no hace nada, tenemos que verificar que no se nos olvidó de suscribirnos.

Dejo link : https://smallrye.io/smallrye-mutiny/ 

martes, 19 de octubre de 2021

Componer y transformar observables en RxJava


Los observables son especialmente buenos para ser compuestos y transformados. Con el uso de algunos operadores definidos en RxJava, puede componer y transformar secuencias de datos de una manera fácil que requiere poca codificación, por lo que es menos propenso a errores. 

map: permite transformar cada elemento de la secuencia emitida con una función específica.

flatMap: realiza dos tipos de acciones: la acción de "map" que transforma los elementos emitidos en observables y una acción de "aplanar" que convierte esos observables en uno observable. 

concatMap: el operador concatMap se comporta como flatMap, excepto que asegura que los observables no estén intercalados sino concatenados, manteniendo su orden.

zip: el operador zip toma múltiples observables como entradas y combina cada emisión a través de una función específica y emite los resultados de esta función como una nueva secuencia.

concat: concatena dos o más emisiones, generando una emisión donde todos los elementos de la primera fuente de emisión aparecen antes que los elementos de la segunda fuente de emisión. Además, el operador de concat espera a que se complete cada secuencia antes de suscribirse al siguiente observable.

filter: utiliza una función específica para permitir que solo se emitan algunos elementos de la secuencia de origen.

distinct: Si un elemento se emite más de una vez, solo se emitirá la primera aparición.

first: emite solo el primer elemento de una secuencia. Si se especifica una función, se utilizará para filtrar los elementos, por lo que solo se emitirá el primer elemento de la secuencia que cumpla las condiciones.

last: si puede aplicar un filtro al comienzo de la secuencia con el operador primero, también puede filtrar el final de la secuencia con el operador al final .

take: Toma el elemento que se encuentra en la posición n, lo que permite que solo se emitan los primeros n elementos.

startWith: toma la secuencia de entrada y le agrega un elemento determinado. Puede ser útil si desea forzar que su secuencia comience con un valor predeterminado o con uno almacenado en caché.

scan: el escaneo del operador toma una secuencia y aplica una función a cada par de elementos emitidos secuencialmente.

domingo, 17 de octubre de 2021

El estado del ecosistema del desarrollador 2021


Quiero compartirles el informe que hizo los amigos de jetbrains, sobre  el estado del ecosistema del desarrollador 2021. 

Dejo link: https://www.jetbrains.com/es-es/lp/devecosystem-2021/

sábado, 16 de octubre de 2021

Primeros pasos con Apache Kafka parte 16

Seguimos con Kafka. 

Existe la necesidad de escalar el consumo por temas. Al igual que varios productores pueden escribir sobre el mismo tema, debemos permitir que varios consumidores lean el mismo tema, dividiendo los datos entre ellos.

Los consumidores de Kafka suelen formar parte de un grupo de consumidores. Cuando varios consumidores están suscritos a un tema y pertenecen al mismo grupo de consumidores, cada consumidor del grupo recibirá mensajes de un subconjunto diferente de las particiones del tema.

Tomemos el tema T1 con cuatro particiones. Ahora suponga que creamos un nuevo consumidor, C1, que es el único consumidor del grupo G1, y lo usamos para suscribirse al tema T1. El consumidor C1 recibirá todos los mensajes de las cuatro particiones t1.

Si agregamos otro consumidor, C2, al grupo G1, cada consumidor solo recibirá mensajes de dos particiones. Quizás los mensajes de la partición 0 y 2 van a C1 y los mensajes de las particiones 1 y 3 van al consumidor C2.

Si G1 tiene cuatro consumidores, cada uno leerá los mensajes de una sola partición.

Si agregamos más consumidores a un solo grupo con un solo tema que las particiones que tenemos, algunos de los consumidores estarán inactivos y no recibirán ningún mensaje.

La principal forma en que escalamos el consumo de datos de un tema de Kafka es agregando más consumidores a un grupo de consumidores. Es común que los consumidores de Kafka realicen operaciones de alta latencia, como escribir en una base de datos o un cálculo lento de los datos. En estos casos, es posible que un solo consumidor no pueda mantenerse al día con los flujos de datos de velocidad en un tema, y agregar más consumidores que compartan la carga al hacer que cada consumidor posea solo un subconjunto de las particiones y los mensajes es nuestro método principal de escalado. Esta es una buena razón para crear temas con una gran cantidad de particiones: permite agregar más consumidores cuando aumenta la carga. Tenga en cuenta que no tiene sentido agregar más consumidores de los que tiene particiones en un tema; algunos de los consumidores simplemente estarán inactivos. 

Además de agregar consumidores para escalar una sola aplicación, es muy común tener múltiples aplicaciones que necesitan leer datos del mismo tema. De hecho, uno de los principales objetivos de diseño en Kafka era hacer que los datos producidos para los temas de Kafka estuvieran disponibles para muchos casos de uso en toda la organización. En esos casos, queremos que cada aplicación obtenga todos los mensajes, en lugar de solo un subconjunto. Para asegurarse de que una aplicación reciba todos los mensajes de un tema, asegúrese de que la aplicación tenga su propio grupo de consumidores. A diferencia de muchos sistemas de mensajería tradicionales, Kafka se adapta a una gran cantidad de consumidores y grupos de consumidores sin reducir el rendimiento. En el ejemplo anterior, si agregamos un nuevo grupo de consumidores G2 con un solo consumidor, este consumidor obtendrá todos los mensajes del tema T1 independientemente de lo que esté haciendo G1. G2 puede tener más de un consumidor, en cuyo caso cada uno obtendrá un subconjunto de particiones, tal como mostramos para G1, pero G2 en su conjunto seguirá recibiendo todos los mensajes independientemente de otros grupos de consumidores.

Para resumir, crea un nuevo grupo de consumidores para cada aplicación que necesita todos los mensajes de uno o más temas. Agrega consumidores a un grupo de consumidores existente para escalar la lectura y el procesamiento de mensajes de los temas, por lo que cada consumidor adicional en un grupo solo obtendrá un subconjunto de los mensajes.


viernes, 15 de octubre de 2021

[O'Reilly eBook] Web Application Security

 Me llego un mail con este libro gratuito y quiero compartirlo con ustedes: 

EBOOK

Web Application Security Ebook: Exploitation and Countermeasures for Modern Web Applications

While many resources for network and IT security are available, detailed knowledge regarding modern web application security has been lacking – until now. This practical guide provides both offensive and defensive security concepts that software engineers can easily learn and apply.

NGINX is proud to make the O’Reilly eBook, Web Application Security, available for free download with our compliments. This eBook is written by Andrew Hoffman, a senior security engineer at Salesforce, and introduces three pillars of web application security: recon, offense, and defense. It also features a foreword by Chris Witeck of NGINX at F5.

Download this eBook to learn:

  • About common vulnerabilities plaguing today's web applications
  • How to deploy mitigations to protect your applications against hackers
  • Practical tips to help you improve the overall security of your web applications

miércoles, 13 de octubre de 2021

Primeros pasos con Apache Kafka parte 15


Hasta ahora, hemos discutido las características del particionador predeterminado, que es el que se usa con más frecuencia. Sin embargo, Kafka no lo limita a particiones hash y, a veces, existen buenas razones para particionar los datos de manera diferente. Por ejemplo, suponga que es un proveedor B2B y su mayor cliente es una empresa que fabrica dispositivos portátiles llamados Bananas. Suponga que hace tantos negocios con el cliente "Banana" que más del 10% de sus transacciones diarias son con este cliente. Si usa la partición de hash predeterminada, los registros de Banana se asignarán a la misma partición que otras cuentas, lo que dará como resultado que una partición sea aproximadamente el doble de grande que el resto. Esto puede hacer que los servidores se queden sin espacio, que el procesamiento se ralentice, etc. Lo que realmente queremos es darle a Banana su propia partición y luego usar particiones hash para asignar el resto de las cuentas a las particiones.

A continuación, se muestra un ejemplo de un particionador personalizado:

import org.apache.kafka.clients.producer.Partitioner;

import org.apache.kafka.common.Cluster;

import org.apache.kafka.common.PartitionInfo;

import org.apache.kafka.common.record.InvalidRecordException;

import org.apache.kafka.common.utils.Utils;


public class BananaPartitioner implements Partitioner {


public void configure(Map<String, ?> configs) {}

public int partition(String topic, Object key, byte[] keyBytes,

        Object value, byte[] valueBytes,

        Cluster cluster) {

    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

    int numPartitions = partitions.size();

    if ((keyBytes == null) || (!(key instanceOf String))) 

        throw new InvalidRecordException("We expect all messages to have customer name as key")

    if (((String) key).equals("Banana"))

        return numPartitions; // Banana siempre va estar en la ultima partición

    return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))

}

public void close() {}

}

lunes, 11 de octubre de 2021

Creando Observables con RxJava



La forma más sencilla de crear un observable es utilizar los métodos de fabricación que se implementan en la biblioteca RxJava. Ya usamos el método Observable.from(), en un post anterior.  

Observable.just() crea un Observable que emite el objeto o los objetos que se pasan como parámetros:

Observable.just(1, 2, 3, 4, 5)

Observable.range (a, n) crea un Observable que emite un rango de n enteros consecutivos a partir de a :

Observable.range(1, 5)

Observable.interval (long, TimeUnit) : crea un Observable que emite una secuencia de enteros a partir de 0 que están espaciados por un intervalo de tiempo dado. El primer argumento es la cantidad de tiempo y el segundo argumento define la unidad de tiempo. El siguiente observable emite un elemento cada 1 segundo:

Observable.interval(1, TimeUnit.SECONDS)

La secuencia es una secuencia infinital, por lo que onCompleted nunca será notificado. La secuencia se detiene solo cuando no hay más observadores conectados (suscritos) a lo observable. 

Observable.timer (long, TimeUnit) crea un Observable que emite solo un elemento después de un retraso determinado.

Observable.create () es el método permite crear un Observable desde cero. Por ejemplo, si desea crear un observable que emita solo una cadena, "¡Hola!", Puede escribir :

Observable.create(
    new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> observer) {
            observer.onNext("Hello!");
            observer.onCompleted();
        }
    }
);

Observable.empty () crea un Observable que emite una secuencia vacía (cero elementos) y luego se completa. Por lo tanto, solo se notificará a onCompleted().
Puede ser útil si desea emitir una secuencia vacía en lugar de emitir elementos nulos o arrojar errores

Observable.error (throwable) crea un Observable que emite una secuencia vacía (cero elementos) y luego notifica un error. Por lo tanto, solo se llamará a onError().

Observable.never () crea un Observable que emite una secuencia vacía (cero elementos) y nunca se completa. No se invocará ningún método del observador.

Observable.defer () crea un Observable solo cuando un suscriptor se suscribe.
La mejor manera de explicar qué hace defer () es con el siguiente ejemplo. Empecemos por la clase Persona, que tiene dos campos: nombre y edad.

class Person {
    private String name;
    private int age;
    // geter y setters
}

Ahora creamos una instancia de Person, y dos Observables para ser notificados con los valores de edad y nombre :

// create a new instance of Person
final Person person = new Person();
Observable<String> nameObservable = Observable.just(person.getName());
Observable<Integer> ageObservable = Observable.just(person.getAge());

// set age and name
person.setName("Bob");
person.setAge(35);
ageObservable.subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(Integer age) {
        System.out.println("age is: " + age);
    }
});

nameObservable.subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(String name) {
        System.out.println("name is: " + name);
    }
});

¿Qué sucede cuando llama a los métodos observeName () y observeAge () en una instancia de Person? ¿Cuál será la secuencia emitida por los observables? Desafortunadamente, la salida será

age is: 0
name is: null

El problema aquí es que Observable.just() se evalúa tan pronto como se invoca, por lo que creará una secuencia utilizando el valor exacto de ese nombre y referencia de edad cuando se crea el observable. En el ejemplo, cuando se crea el observable, la edad es 0 y el nombre es nulo.

Para esto existe Observable.defer ().

Observable<String> nameObservable = Observable.defer(new
    Func0<Observable<String>>() {
        @Override
        public Observable<String> call() {
            return Observable.just(person.getName());
        }
});

Observable<Integer> ageObservable = Observable.defer(new Func0<Observable<Integer>>() {
    @Override
    public Observable<Integer> call() {
        return Observable.just(person.getAge());
    }
});

Al usar estos dos observables, la salida de los ejemplos anteriores se convierte en

age is: 35
name is: Bob






viernes, 8 de octubre de 2021

Observables calientes y fríos


Un Observable comienza a emitir una secuencia de elementos cuando el Observador se suscribe: se denominan observables fríos. Los observables fríos siempre esperan tener al menos un observador suscrito para comenzar a emitir elementos.

Por otro lado, un observable que comienza a emitir elementos antes de conectarse a un observador se denomina observable caliente. Con los observables calientes, un observador puede suscribirse y comenzar a recibir elementos en cualquier momento durante la emisión. Con observables calientes, el observador puede recibir la secuencia completa de elementos comenzando desde el principio o no.

Veamos un ejemplo más concreto, pero simple. 

Creemos un Observable que emita todos los números enteros del 1 al 5 y suscríbase a él:

Observable<Integer> observable = Observable.from(new Integer[]{1, 2, 3, 4, 5});

observable.subscribe(new Subscriber<Integer>() {

    @Override

    public void onCompleted() {

        System.out.println("Sequence completed!");

    }

    @Override

    public void onError(Throwable e) {

        System.err.println("Exception: " + e.getMessage());

    }

    @Override

    public void onNext(Integer integer) {

        System.out.println("next item is: " + integer);

    }

});

La salida esperada es

next item is: 1
next item is: 2
next item is: 3
next item is: 4
next item is: 5
Sequence completed!

Este es un observable frío porque comenzará a emitir elementos solo cuando el observador se suscriba. 
El observable generará una secuencia de cinco elementos, cada uno representando un objeto entero (de 1 a 5), ​​por lo que el método onNext del observador se invocará cinco veces. Al final de la secuencia, se notificará el método onCompleted. El método onError nunca será notificado porque esta secuencia no genera ningún tipo de error o excepción.

Un ejemplo de un observable caliente podría ser un observable que emite un evento cada vez que se hace clic en un botón de la interfaz de usuario. No comienza a emitir eventos cuando el observador se suscribe; emite eventos incluso si no hay ningún suscriptor suscrito. 

En este ejemplo, crea,ps un observable usando el método Observable.from (), un método de fábrica estático que puede crear un Observable a partir de un matriz, iterable o Future.

Esta no es la única forma de crear observables. Pero esa es una historia para otro post...


lunes, 4 de octubre de 2021

Primeros pasos con Apache Kafka parte 14


 Seguimos con Kafka. 

Los objetos ProducerRecord incluyen un nombre de tema, una clave y un valor. Los mensajes de Kafka son pares clave-valor y, si bien es posible crear un ProducerRecord con solo un tema y un valor, con la clave establecida en nula de forma predeterminada, la mayoría de las aplicaciones producen registros con claves. Las claves sirven para dos objetivos: son información adicional que se almacena con el mensaje y también se utilizan para decidir en cuál de las particiones de tema se escribirá el mensaje. Todos los mensajes con la misma clave irán a la misma partición. Esto significa que si un proceso está leyendo solo un subconjunto de las particiones en un tema, todos los registros para una sola clave serán leídos por el mismo proceso. Para crear un registro de valor-clave, simplemente cree un ProducerRecord de la siguiente manera:

ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");

Al crear mensajes con una clave nula, simplemente puede omitir la clave:

ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "USA");


Aquí, la clave simplemente se establecerá en nula, lo que puede indicar que faltaba el nombre de un cliente en un formulario.

Cuando la clave es nula y se usa el particionador predeterminado, el registro se enviará a una de las particiones disponibles del tema al azar. Se utilizará un algoritmo de operación por turnos para equilibrar los mensajes entre las particiones.

Si existe una clave y se usa el particionador predeterminado, Kafka aplicará un hash a la clave (usando su propio algoritmo hash, por lo que los valores hash no cambiarán cuando se actualice Java) y usará el resultado para asignar el mensaje a una partición específica. Dado que es importante que una clave siempre se asigne a la misma partición, usamos todas las particiones del tema para calcular la asignación, no solo las particiones disponibles. Esto significa que si una partición específica no está disponible cuando escribe datos en ella, es posible que obtenga un error. Esto es bastante raro, como verá en el Capítulo 6 cuando analicemos la replicación y disponibilidad de Kafka.

El mapeo de claves a particiones es consistente solo mientras no cambie el número de particiones en un tema. Entonces, siempre que el número de particiones sea constante, puede estar seguro de que, por ejemplo, los registros relacionados con el usuario 045189 siempre se escribirán en la partición 34. Esto permite todo tipo de optimización al leer datos de particiones. Sin embargo, en el momento en que agrega nuevas particiones al tema, esto ya no está garantizado; los registros antiguos permanecerán en la partición 34, mientras que los registros nuevos se escribirán en una partición diferente. Cuando la partición de claves es importante, la solución más sencilla es crear temas con suficientes particiones y nunca agregar particiones.

viernes, 1 de octubre de 2021

onNext, onCompleted, onError


Como vimos anteriormente un flujo de datos lanza diferentes tipos de señales, next si hay un próximo dato y complete si se finalizo el flujo de datos. 

La interfaz rx.Observer <T> define los métodos, onNext, onCompleted, onError(Throwable). onError nos indica que sucedió un error y el flujo de datos no puede trasmitir más. 

Veamos un ejemplo: 

public void subscribeToObservable(Observable<T> observable) {

observable.subscribe(new Subscriber<>() {

@Override

public void onCompleted() {

    // invoked when Observable stops emitting items

}

@Override

public void onError(Throwable e) {

    // invoked when Observable throws an exception

    // while emitting items

}

@Override

public void onNext(T nextItem) {

    // invoked when Observable emits an item

    // usually you will consume the nextItem here

}

});

}


Usamos Subscriber<T> dado que es un objeto que implementa la interfaz rx.Observer <T>. La razón por la que utiliza Subscriber en lugar de cualquier otra implementación de la interfaz de Observer es que el Subscriber también implementa la interfaz de Suscripción, que le permite verificar si el suscriptor está cancelado (con el método isUnsubscriptions ()) y cancelar su suscripción.

En el ejemplo anterior, observe que un observador reacciona a tres tipos de eventos:

  • onNext: Ocurre cero, una o más veces. Si la secuencia se completa correctamente, el método onNext se invocará tantas veces como el número de elementos de la secuencia. Si se produce un error en un momento determinado, el método onNext no se invocará más.
  • onCompleted: solo cuando todos los elementos de la secuencia se emitan correctamente, se invocará el método onCompleted. Se invoca solo una vez y después de que se haya emitido el último elemento. No va a ser llamado nunca en una secuencia infinita.
  • onError: puede ocurrir un error en cada momento de la secuencia, y la secuencia se detendrá inmediatamente. En este caso, se invocará el método onError, pasando el error como objeto Throwable. Los otros dos métodos, onNext y onCompleted, no se invocarán, luego de este. 

Un observable no puede notificar los métodos onCompleted y onError, solo uno de ellos. Siempre será el último método invocado.

Con el método Observable.subscribe () (una operación llamada suscripción), puede conectar un Observable a un Observer, pero ¿qué sucede si desea desconectarlos? Esta la operación llamada unsubscribe:

// disconnect observable and observer
subscription.unsubscribe()

Se puede verificar si la suscripción se ha cancelado con el siguiente método:

subscription.isUnsubscribed()

Luego de la cancelación de suscripción, onNext no recibirá ningún otro elemento y los otros dos métodos, onCompleted y onError, no serán notificados. Después de la cancelación de la suscripción, el observable puede detenerse o continuar con la emisión, pero no se notificará al observador al respecto.

sábado, 25 de septiembre de 2021

Definiendo Observable y Observer


En Reactive programming un Observable es un objeto que emite una secuencia (o flujo) de eventos. Representa una colección basada en inserción, que es una colección en la que se insertan eventos cuando se crean.

Un observable emite una secuencia que puede ser vacía, finita o infinita. Cuando la secuencia es finita, se emite un evento completo después del final de la secuencia. En cualquier momento durante la emisión (pero no después de su finalización) se puede emitir un evento de error, deteniendo la emisión y cancelando la emisión del evento completo.

Cuando la secuencia está vacía, solo se emite el evento completo, sin emitir ningún ítem. Con una secuencia infinita, el evento completo nunca se emite.

La emisión se puede transformar, filtrar o combinar con otras emisiones, etc. 

Un observador es un objeto que se suscribe a un observable. Escucha y reacciona a cualquier secuencia de elementos emitida por el Observable.

El Observer no está bloqueado mientras espera nuevos elementos emitidos, por lo que en operaciones simultáneas, no se produce ningún bloqueo. Simplemente se activa cuando se emite un nuevo elemento.

Este es uno de los principios fundamentales de la programación reactiva: en lugar de ejecutar instrucciones una a la vez (siempre esperando a que se complete la instrucción anterior), el observable proporciona un mecanismo para recuperar y transformar datos, y el Observer activa este mecanismo, todos de forma concurrente.

El siguiente pseudocódigo es un ejemplo del método que implementa el Observer que reacciona a los elementos del Observable:

onNext = { it -> doSomething }


Aquí, el método está definido, pero no se invoca nada. Para comenzar a reaccionar, debe suscribirse al Observable:

observable.subscribe(onNext)

Ahora el observador está atento a los elementos y reaccionará a cada elemento nuevo que se emitirá.
Reescribamos este ejemplo en código Java usando las API de RxJava:

public void subscribeToObservable(Observable<T> observable) {
    observable.subscribe(nextItem -> {
        // invoked when Observable emits an item
        // usually you will consume the nextItem here
    });
}

Ahora está claro que para conectar un observable con un observador, debes usar el método de suscripción.


viernes, 24 de septiembre de 2021

Creando la primera aplicación con Quarkus parte 5


Seguimos con quarkus

Vamos a guardar unos saludos para luego poder recuperarlos y usarlos para saludar. 

Para esto vamos a crear una clase con el saludo : 

package com.hexacta.model;

import javax.persistence.*;
import java.util.Objects;

@Entity
public class Greeting {

@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(nullable = false)
@Id
private Integer id;

@Column
private String value;

public void setId(Integer id) {
this.id = id;
}

public Integer getId() {
return id;
}

public Greeting() {}

public Greeting(String value) {
this.value = value;
}

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Greeting greeting = (Greeting) o;
return Objects.equals(value, greeting.value);
}

@Override
public int hashCode() {
return Objects.hash(value);
}
}

Luego vamos a hacer una clase DAO para acceso a la base de datos : 

package com.hexacta.dao;

import com.hexacta.model.Greeting;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.persistence.EntityManager;

@ApplicationScoped
public class GreetingDAO {

@Inject
private EntityManager em;

public int save(Greeting aGreeting) {
this.em.persist(aGreeting);
return aGreeting.getId();
}

public Greeting get(int id) {
var qr = em.createQuery("from com.hexacta.model.Greeting g " +
"where g.id = ?1");
qr.setParameter(1, id);
return (Greeting) qr.getSingleResult();
}
}

Modificamos el servicio para que permita guardar los saludos : 

package com.hexacta;

import com.hexacta.dao.GreetingDAO;
import com.hexacta.model.Greeting;

import javax.transaction.Transactional;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

@ApplicationScoped
public class GreetingServices {

@Inject
private GreetingDAO dao;

public String greeting(String name) {
return "hola " + name;
}

public String greeting(int id,String name) {
Greeting aGreeting = dao.get(id);
if (aGreeting == null) {
return name;
}
return aGreeting.getValue() + " " + name;
}

@Transactional
public int saveGreeting(String greeting) {
Greeting aGreeting = new Greeting(greeting);
return dao.save(aGreeting);
}
}

Y por último vamos a modificar los servicios REST : 

package com.hexacta;

import org.jboss.resteasy.annotations.jaxrs.PathParam;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/hello")
public class GreetingResource {

private GreetingServices service;

@Inject
public GreetingResource(GreetingServices service) {
this.service = service;
}

@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
return "Hello RESTEasy";
}

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/{name}")
public String hello(@PathParam String name) {
return this.service.greeting(name);
}

@POST
@Path("/save/{greeting}")
public int saveGreeting(@PathParam String greeting) {
return this.service.saveGreeting(greeting);
}

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/{id}/{name}")
public String hello(@PathParam int id,@PathParam String name) {
return this.service.greeting(id, name);
}


}

Y tengo que configurar h2 que es la base que estamos usando, en el application.properties : 

# datasource configuration
quarkus.datasource.db-kind = h2
quarkus.datasource.username = sa
quarkus.datasource.password = a
quarkus.datasource.jdbc.url = jdbc:h2:~/quarkus.db

quarkus.hibernate-orm.database.generation=update

Y listo, podemos guardar diferentes saludos y usarlos. 

Dejo el link del repo : 

https://github.com/emanuelpeg/quarkusExample/