Translate

viernes, 1 de octubre de 2021

onNext, onCompleted, onError


Como vimos anteriormente un flujo de datos lanza diferentes tipos de señales, next si hay un próximo dato y complete si se finalizo el flujo de datos. 

La interfaz rx.Observer <T> define los métodos, onNext, onCompleted, onError(Throwable). onError nos indica que sucedió un error y el flujo de datos no puede trasmitir más. 

Veamos un ejemplo: 

public void subscribeToObservable(Observable<T> observable) {

observable.subscribe(new Subscriber<>() {

@Override

public void onCompleted() {

    // invoked when Observable stops emitting items

}

@Override

public void onError(Throwable e) {

    // invoked when Observable throws an exception

    // while emitting items

}

@Override

public void onNext(T nextItem) {

    // invoked when Observable emits an item

    // usually you will consume the nextItem here

}

});

}


Usamos Subscriber<T> dado que es un objeto que implementa la interfaz rx.Observer <T>. La razón por la que utiliza Subscriber en lugar de cualquier otra implementación de la interfaz de Observer es que el Subscriber también implementa la interfaz de Suscripción, que le permite verificar si el suscriptor está cancelado (con el método isUnsubscriptions ()) y cancelar su suscripción.

En el ejemplo anterior, observe que un observador reacciona a tres tipos de eventos:

  • onNext: Ocurre cero, una o más veces. Si la secuencia se completa correctamente, el método onNext se invocará tantas veces como el número de elementos de la secuencia. Si se produce un error en un momento determinado, el método onNext no se invocará más.
  • onCompleted: solo cuando todos los elementos de la secuencia se emitan correctamente, se invocará el método onCompleted. Se invoca solo una vez y después de que se haya emitido el último elemento. No va a ser llamado nunca en una secuencia infinita.
  • onError: puede ocurrir un error en cada momento de la secuencia, y la secuencia se detendrá inmediatamente. En este caso, se invocará el método onError, pasando el error como objeto Throwable. Los otros dos métodos, onNext y onCompleted, no se invocarán, luego de este. 

Un observable no puede notificar los métodos onCompleted y onError, solo uno de ellos. Siempre será el último método invocado.

Con el método Observable.subscribe () (una operación llamada suscripción), puede conectar un Observable a un Observer, pero ¿qué sucede si desea desconectarlos? Esta la operación llamada unsubscribe:

// disconnect observable and observer
subscription.unsubscribe()

Se puede verificar si la suscripción se ha cancelado con el siguiente método:

subscription.isUnsubscribed()

Luego de la cancelación de suscripción, onNext no recibirá ningún otro elemento y los otros dos métodos, onCompleted y onError, no serán notificados. Después de la cancelación de la suscripción, el observable puede detenerse o continuar con la emisión, pero no se notificará al observador al respecto.

sábado, 25 de septiembre de 2021

Definiendo Observable y Observer


En Reactive programming un Observable es un objeto que emite una secuencia (o flujo) de eventos. Representa una colección basada en inserción, que es una colección en la que se insertan eventos cuando se crean.

Un observable emite una secuencia que puede ser vacía, finita o infinita. Cuando la secuencia es finita, se emite un evento completo después del final de la secuencia. En cualquier momento durante la emisión (pero no después de su finalización) se puede emitir un evento de error, deteniendo la emisión y cancelando la emisión del evento completo.

Cuando la secuencia está vacía, solo se emite el evento completo, sin emitir ningún ítem. Con una secuencia infinita, el evento completo nunca se emite.

La emisión se puede transformar, filtrar o combinar con otras emisiones, etc. 

Un observador es un objeto que se suscribe a un observable. Escucha y reacciona a cualquier secuencia de elementos emitida por el Observable.

El Observer no está bloqueado mientras espera nuevos elementos emitidos, por lo que en operaciones simultáneas, no se produce ningún bloqueo. Simplemente se activa cuando se emite un nuevo elemento.

Este es uno de los principios fundamentales de la programación reactiva: en lugar de ejecutar instrucciones una a la vez (siempre esperando a que se complete la instrucción anterior), el observable proporciona un mecanismo para recuperar y transformar datos, y el Observer activa este mecanismo, todos de forma concurrente.

El siguiente pseudocódigo es un ejemplo del método que implementa el Observer que reacciona a los elementos del Observable:

onNext = { it -> doSomething }


Aquí, el método está definido, pero no se invoca nada. Para comenzar a reaccionar, debe suscribirse al Observable:

observable.subscribe(onNext)

Ahora el observador está atento a los elementos y reaccionará a cada elemento nuevo que se emitirá.
Reescribamos este ejemplo en código Java usando las API de RxJava:

public void subscribeToObservable(Observable<T> observable) {
    observable.subscribe(nextItem -> {
        // invoked when Observable emits an item
        // usually you will consume the nextItem here
    });
}

Ahora está claro que para conectar un observable con un observador, debes usar el método de suscripción.


viernes, 24 de septiembre de 2021

Creando la primera aplicación con Quarkus parte 5


Seguimos con quarkus

Vamos a guardar unos saludos para luego poder recuperarlos y usarlos para saludar. 

Para esto vamos a crear una clase con el saludo : 

package com.hexacta.model;

import javax.persistence.*;
import java.util.Objects;

@Entity
public class Greeting {

@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(nullable = false)
@Id
private Integer id;

@Column
private String value;

public void setId(Integer id) {
this.id = id;
}

public Integer getId() {
return id;
}

public Greeting() {}

public Greeting(String value) {
this.value = value;
}

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Greeting greeting = (Greeting) o;
return Objects.equals(value, greeting.value);
}

@Override
public int hashCode() {
return Objects.hash(value);
}
}

Luego vamos a hacer una clase DAO para acceso a la base de datos : 

package com.hexacta.dao;

import com.hexacta.model.Greeting;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.persistence.EntityManager;

@ApplicationScoped
public class GreetingDAO {

@Inject
private EntityManager em;

public int save(Greeting aGreeting) {
this.em.persist(aGreeting);
return aGreeting.getId();
}

public Greeting get(int id) {
var qr = em.createQuery("from com.hexacta.model.Greeting g " +
"where g.id = ?1");
qr.setParameter(1, id);
return (Greeting) qr.getSingleResult();
}
}

Modificamos el servicio para que permita guardar los saludos : 

package com.hexacta;

import com.hexacta.dao.GreetingDAO;
import com.hexacta.model.Greeting;

import javax.transaction.Transactional;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

@ApplicationScoped
public class GreetingServices {

@Inject
private GreetingDAO dao;

public String greeting(String name) {
return "hola " + name;
}

public String greeting(int id,String name) {
Greeting aGreeting = dao.get(id);
if (aGreeting == null) {
return name;
}
return aGreeting.getValue() + " " + name;
}

@Transactional
public int saveGreeting(String greeting) {
Greeting aGreeting = new Greeting(greeting);
return dao.save(aGreeting);
}
}

Y por último vamos a modificar los servicios REST : 

package com.hexacta;

import org.jboss.resteasy.annotations.jaxrs.PathParam;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/hello")
public class GreetingResource {

private GreetingServices service;

@Inject
public GreetingResource(GreetingServices service) {
this.service = service;
}

@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
return "Hello RESTEasy";
}

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/{name}")
public String hello(@PathParam String name) {
return this.service.greeting(name);
}

@POST
@Path("/save/{greeting}")
public int saveGreeting(@PathParam String greeting) {
return this.service.saveGreeting(greeting);
}

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/{id}/{name}")
public String hello(@PathParam int id,@PathParam String name) {
return this.service.greeting(id, name);
}


}

Y tengo que configurar h2 que es la base que estamos usando, en el application.properties : 

# datasource configuration
quarkus.datasource.db-kind = h2
quarkus.datasource.username = sa
quarkus.datasource.password = a
quarkus.datasource.jdbc.url = jdbc:h2:~/quarkus.db

quarkus.hibernate-orm.database.generation=update

Y listo, podemos guardar diferentes saludos y usarlos. 

Dejo el link del repo : 

https://github.com/emanuelpeg/quarkusExample/


martes, 21 de septiembre de 2021

Creando la primera aplicación con Quarkus parte 4


Seguimos con quarkus

Ahora vamos a empaquetar nuestra app, lo hacemos con gradle jar y la ejecutamos con 

java -jar build/quarkus-app/quarkus-run.jar

y listo tenemos nuestra app andando en http://localhost:8080/hello 

El directorio quarkus-app que contiene el archivo jar quarkus-run.jar, que es un jar ejecutable. Pero no tiene todas las dependencias estas se copian en subdirectorios de quarkus-app/lib /. Por lo tanto debemos deployar todo el directorio quarkus-app la primera vez y cada vez que hay cambio de librerías. 

Bueno, ya estamos! con esto tenemos nuestra app andando pero vamos a hacer un paso más vamos a hacer nuestra applicación nativa. Para eso debemos utilizar GraalVM 11. 

La construcción de un ejecutable nativo requiere el uso de una distribución de GraalVM. Hay tres distribuciones: Oracle GraalVM Community Edition (CE), Oracle GraalVM Enterprise Edition (EE) y Mandrel. Las diferencias entre las distribuciones de Oracle y Mandrel son las siguientes:

Mandrel es una distribución descendente de Oracle GraalVM CE. El objetivo principal de Mandrel es proporcionar una forma de crear ejecutables nativos diseñados específicamente para admitir Quarkus.

Las versiones de Mandrel se crean a partir de una base de código derivada de la base de código anterior de Oracle GraalVM CE, con solo cambios menores pero algunas exclusiones importantes que no son necesarias para las aplicaciones nativas de Quarkus. Admiten las mismas capacidades para crear ejecutables nativos que Oracle GraalVM CE, sin cambios significativos en la funcionalidad. En particular, no incluyen soporte para programación políglota. El motivo de estas exclusiones es proporcionar un mejor nivel de soporte para la mayoría de los usuarios de Quarkus. Estas exclusiones también significan que Mandrel ofrece una reducción considerable en su tamaño de distribución en comparación con Oracle GraalVM CE/EE.

Mandrel está construido de forma ligeramente diferente a Oracle GraalVM CE, utilizando el proyecto estándar OpenJDK. Esto significa que no se beneficia de algunas pequeñas mejoras que Oracle ha agregado a la versión de OpenJDK utilizada para crear sus propias descargas de GraalVM. Estas mejoras se omiten porque OpenJDK no las gestiona y no puede responder por ellas. Esto es particularmente importante cuando se trata de conformidad y seguridad.

Actualmente, Mandrel solo se recomienda para compilar ejecutables nativos destinados a entornos Linux en contenedores. Esto significa que los usuarios de Mandrel deben usar contenedores para construir sus ejecutables nativos. Si está creando ejecutables nativos para plataformas de destino macOS o Windows, debería considerar usar Oracle GraalVM en su lugar, porque Mandrel no se dirige actualmente a estas plataformas. Es posible compilar ejecutables nativos directamente en Linux.

Los requisitos previos varían ligeramente dependiendo de si está utilizando Oracle GraalVM CE/EE o Mandrel.

Primero, tenemos que configurar las variables de entorno JAVA_HOME y GRAALVM_HOME (y tener instalado docker). 

Y luego tenemos que hacer : 

gradel build -Dquarkus.package.type=native

docker build -f src/main/docker/Dockerfile.native -t quarkus/demo .

docker run -i --rm -p 8080:8080 quarkus/demo

Cuando lo corrí la primera vez, me tiro error porque estoy usando Windows y me dijo que instale el generador de imágenes windows haciendo esto : 

gu install native-image

 

domingo, 19 de septiembre de 2021

Primeros pasos con Apache Kafka parte 13



Seguimos con Kafka. 

Los archivos Avro deben almacenar el esquema completo en el archivo de datos que están asociados, almacenar el esquema completo en cada registro generalmente será más del doble del tamaño del registro. Para resolver este problema podemos utilizar un registro de esquemas. Los Schema Registry no forma parte de Apache Kafka, pero hay varias opciones de código abierto para elegir. Usaremos Confluent Schema Registry para este ejemplo. 

La idea es almacenar todos los esquemas utilizados luego, simplemente almacenamos el identificador del esquema en el registro que producimos en Kafka. Los consumidores pueden usar el identificador para extraer el registro del registro de esquema y deserializar los datos. 

A continuación, se muestra un ejemplo de cómo producir objetos Avro generados en Kafka:

Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");

props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");

props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");

props.put("schema.registry.url", schemaUrl);

String topic = "customerContacts";

int wait = 500;

Producer<String, Customer> producer = new KafkaProducer<String,Customer>(props);

while (true) {

    Customer customer = CustomerGenerator.getNext();

    System.out.println("Generated customer " + customer.toString());

    ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getId(), customer);

    producer.send(record);

}

Usamos KafkaAvroSerializer para serializar nuestros objetos con Avro. AvroSerializer también puede manejar primitivas, por lo que luego podemos usar String como clave de registro y nuestro objeto Customer como valor.

schema.registry.url es un nuevo parámetro. Esto simplemente apunta a dónde almacenamos los esquemas.

El cliente es nuestro objeto generado. Le decimos al productor que nuestros registros contendrán Cliente como valor.

También creamos una instancia de ProducerRecord con Customer como el tipo de valor y pasamos un objeto Customer al crear el nuevo registro.

Eso es todo. Enviamos el registro con nuestro objeto Cliente y KafkaAvroSerializer se encargará del resto.

¿Qué sucede si prefiere utilizar objetos Avro genéricos en lugar de los objetos Avro generados?
No hay problema. En este caso, solo necesita proporcionar el esquema:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", url);

String schemaString = "{\"namespace\": \"customerManagement.avro\",
\"type\": \"record\", " +
"\"name\": \"Customer\"," +
"\"fields\": [" +
"{\"name\": \"id\", \"type\": \"int\"}," +
"{\"name\": \"name\", \"type\": \"string\"}," +
"{\"name\": \"email\", \"type\": [\"null\",\"string
\"], \"default\":\"null\" }" +
"]}";

Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);

for (int nCustomers = 0; nCustomers < customers; nCustomers++) {
    String name = "exampleCustomer" + nCustomers;
    String email = "example " + nCustomers + "@example.com"
    GenericRecord customer = new GenericData.Record(schema);
    customer.put("id", nCustomer);
    customer.put("name", name);
    customer.put("email", email);
    ProducerRecord<String, GenericRecord> data = new ProducerRecord<String,
    GenericRecord>("customerContacts", name, customer);
    producer.send(data);
}

Seguimos usando el mismo KafkaAvroSerializer.

Y proporcionamos el URI del mismo registro de esquema.

Pero ahora también necesitamos proporcionar el esquema Avro, ya que no lo proporciona el objeto generado por Avro.

Nuestro tipo de objeto es un Avro GenericRecord, que inicializamos con nuestro esquema y los datos que queremos escribir.

Entonces, el valor de ProducerRecord es simplemente un GenericRecord que cuenta nuestro esquema y datos. El serializador sabrá cómo obtener el esquema de este registro, almacenarlo en el registro de esquema y serializar los datos del objeto.


viernes, 17 de septiembre de 2021

Primeros pasos con Apache Kafka parte 12


Seguimos con Kafka. 

Apache Avro es un formato de serialización de datos independiente del lenguaje. El proyecto fue creado por Doug Cutting para proporcionar una forma de compartir archivos de datos con una gran audiencia.

Los datos de Avro se describen en un esquema independiente del lenguaje. El esquema generalmente se describe en JSON y la serialización suele ser en archivos binarios, aunque también se admite la serialización en JSON. Avro asume que el esquema está presente al leer y escribir archivos, generalmente incrustando el esquema en los propios archivos.




Una de las características más interesantes de Avro, y lo que lo hace adecuado para su uso en sistemas de mensajería como Kafka, es que cuando la aplicación que está escribiendo mensajes cambia a un nuevo esquema, las aplicaciones que leen los datos pueden continuar procesando mensajes sin necesidad de cambiar o actualizar.

Supongamos que el esquema original fuera:

{

     "namespace": "customerManagement.avro",

     "type": "record",

     "name": "Customer",

     "fields": [

          {"name": "id", "type": "int"},

          {"name": "name", "type": "string""},

          {"name": "faxNumber", "type": ["null", "string"], "default": "null"}

     ]

}

Usamos este esquema durante unos meses y generamos algunos terabytes de datos en este formato. Ahora suponga que decidimos que en la nueva versión, actualizaremos al siglo XXI y ya no incluiremos un campo de número de fax y en su lugar usaremos un campo de correo electrónico.

El nuevo esquema sería:

{"namespace": "customerManagement.avro",
    "type": "record",
    "name": "Customer",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "name", "type": "string"},
        {"name": "email", "type": ["null", "string"], "default": "null"}
    ]
}

Ahora, después de actualizar a la nueva versión, los registros antiguos contendrán "faxNumber" y los registros nuevos contendrán "email". En muchas organizaciones, las actualizaciones se realizan lentamente y durante muchos meses. Por lo tanto, debemos considerar cómo las aplicaciones anteriores a la actualización que aún usan los números de fax y las aplicaciones posteriores a la actualización que usan el correo electrónico podrán manejar todos los eventos en Kafka.

La aplicación de lectura contendrá llamadas a métodos similares a getName (), getId () y getFaxNumber. Si encuentra un mensaje escrito con el nuevo esquema, getName() y getId () continuará funcionando sin modificaciones, pero getFaxNumber () devolverá nulo porque el mensaje no contendrá un número de fax.

Ahora suponga que actualizamos nuestra aplicación de lectura y ya no tiene el método getFaxNumber() sino getEmail(). Si encuentra un mensaje escrito con el esquema anterior, getEmail() devolverá un valor nulo porque los mensajes anteriores no contienen una dirección de correo electrónico.

Este ejemplo ilustra el beneficio de usar Avro: aunque cambiemos el esquema en los mensajes sin cambiar todas las aplicaciones que leen los datos, no habrá excepciones ni errores de ruptura y no será necesario realizar costosas actualizaciones de los datos existentes.

Sin embargo, hay dos advertencias para este escenario:
  • El esquema utilizado para escribir los datos y el esquema esperado por la aplicación de lectura deben ser compatibles. La documentación de Avro incluye reglas de compatibilidad.
  • El deserializador necesitará acceder al esquema que se utilizó al escribir los datos, incluso cuando sea diferente del esquema esperado por la aplicación que accede a los datos. En los archivos Avro, el esquema de escritura se incluye en el propio archivo, pero hay una mejor manera de manejar esto para los mensajes de Kafka. Que veremos en próximos post...

jueves, 16 de septiembre de 2021

Fue lanzado Java 17!!


Oracle ha lanzado la versión 17 del lenguaje de programación Java. Como el primer lanzamiento como long-term support (LTS) desde JDK 11 en 2018.

Puff como pasan las versiones de java, imparable. Como características nuevas tenemos : 

  306: Restore Always-Strict Floating-Point Semantics

  356: Enhanced Pseudo-Random Number Generators

  382: New macOS Rendering Pipeline

  391: macOS/AArch64 Port

  398: Deprecate the Applet API for Removal

  403: Strongly Encapsulate JDK Internals

  406: Pattern Matching for switch (Preview)

  407: Remove RMI Activation

  409: Sealed Classes

  410: Remove the Experimental AOT and JIT Compiler

  411: Deprecate the Security Manager for Removal

  412: Foreign Function & Memory API (Incubator)

  414: Vector API (Second Incubator)

  415: Context-Specific Deserialization Filters

Bueno, este post es para hacerme eco de la noticia, vamos a ir probando más adelante...

Dejo link : https://jdk.java.net/17/

miércoles, 15 de septiembre de 2021

ReactiveX


ReactiveX es un framework para los lenguajes de programación más utilizados: Java, JavaScript, C#, Scala, Clojure, C ++, Ruby, Python, Groovy, JRuby, Kotlin, Swift y más. 

RxJava es un framwork que implementa los conceptos de ReactiveX en Java. Veamos un ejemplo RxJava:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5);

Observable.from(input).filter(new Func1() {

    @Override

     public Boolean call(Integer x) {

          return x % 2 == 0;

     }

})

o usando lambda : 

Observable.from(input).filter(x -> x % 2 == 0);


El objeto resultante (la instancia de rx.Observable) generará una secuencia de los números pares contenidos en la secuencia de entrada: 2 y 4.

En RxJava, rx.Observable agrega dos semánticas al patrón Observador de Gang of Four (la semántica predeterminada es emitir elementos creados, como una lista con elementos 2,4 en el ejemplo anterior):

  • El productor puede notificar al consumidor que no hay más datos disponible.
  • El productor puede notificar al consumidor que ha ocurrido un error.
La biblioteca RxJava proporciona un modelo de programación donde podemos trabajar con eventos generados desde UI o llamadas asincrónicas de la misma manera en que operamos con colecciones y streams en Java 8.

La biblioteca RxJava se creó en Netflix como una alternativa más inteligente a Java Futures y devoluciones de llamada. Tanto los futuros como las devoluciones de llamada son fáciles de usar cuando solo hay un nivel de ejecución asincrónica, pero son difíciles de administrar cuando están anidados.

El siguiente ejemplo muestra cómo se maneja el problema de las devoluciones de llamada anidadas en RxJava.

Suponga que necesita llamar a una API remota para autenticar a un usuario, luego a otra para obtener los datos del usuario y a otra API para obtener los contactos de un usuario. Normalmente, tendría que escribir llamadas a API anidadas y hacer complejos callbacks. Pero con RxJava se puede hacer así : 

serviceEndpoint.login().doOnNext(accessToken -> storeCredentials(accessToken)).flatMap(accessToken -> serviceEndpoint.getUser()).flatMap(user -> serviceEndpoint.getUserContact(user.getId()))




sábado, 11 de septiembre de 2021

Primeros pasos con Apache Kafka parte 11

Seguimos con Kafka. 

Como se vio en ejemplos anteriores, la configuración del productor incluye serializadores y hemos visto cómo utilizar el serializador de cadenas predeterminado. Kafka también incluye serializadores para enteros y ByteArrays, pero algunas veces necesitamos serializar de una forma especial.

Cuando el objeto que necesita enviar a Kafka no es una simple cadena o un entero, tiene la opción de usar una biblioteca de serialización genérica como Avro, Thrift o Protobuf para crear registros, o crear una serialización personalizada para los objetos que ya está usando . 

Suponga que en lugar de registrar solo el nombre del cliente, crea una clase simple para representar a los clientes:

public class Customer {

      private int customerID;

      private String customerName;

      public Customer(int ID, String name) {

            this.customerID = ID;

            this.customerName = name;

      }

      public int getID() {

            return customerID;

      }

      public String getName() {

            return customerName;

      }

}

Ahora suponga que queremos crear un serializador personalizado para esta clase.:

import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CustomerSerializer implements Serializer<Customer> {

      @Override
      public void configure(Map configs, boolean isKey) {
            // nothing to configure
      }

      @Override
      /**
      We are serializing Customer as:
      4 byte int representing customerId
      4 byte int representing length of customerName in UTF-8 bytes (0 if name is Null)
      N bytes representing customerName in UTF-8
      */
      public byte[] serialize(String topic, Customer data) {
      try {
            byte[] serializedName;
            int stringSize;
            if (data == null)
                  return null;
            else {
                  if (data.getName() != null) {
                        serializeName = data.getName().getBytes("UTF-8");
                        stringSize = serializedName.length;
                  } else {
                        serializedName = new byte[0];
                        stringSize = 0;
                  }
            }
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
            buffer.putInt(data.getID());
            buffer.putInt(stringSize);
            buffer.put(serializedName);
            return buffer.array();
      } catch (Exception e) {
            throw new SerializationException("Error when serializing Customer to byte[] " + e);
      }
   }

  @Override
  public void close() {
         // nothing to close
  }
}

La configuración de un productor con este CustomerSerializer le permitirá definir ProducerRecord <String, Customer> y enviar datos del cliente y pasar los objetos del cliente directamente al productor. 

Este ejemplo es bastante simple, pero puede ver lo frágil que es el código. Si alguna vez tenemos demasiados clientes, por ejemplo, y necesitamos cambiar customerID a Long, o si alguna vez decidimos agregar un campo startDate a Customer, tendremos un problema serio para mantener la compatibilidad entre los mensajes antiguos y nuevos. La depuración de problemas de compatibilidad entre diferentes versiones de serializadores y deserializadores es bastante desafiante; es necesario comparar matrices de bytes sin procesar. Para empeorar las cosas, si varios equipos de la misma empresa terminan escribiendo datos del Cliente en Kafka, todos deberán usar los mismos serializadores y modificar el código al mismo tiempo.

Por estos motivos, es buena idea utilizar serializadores y deserializadores existentes, como JSON, Apache Avro, Thrift o Protobuf. 

viernes, 10 de septiembre de 2021

Libro Gratuito sobre microservicios

EBOOK

Image

Microservices architectures offer faster change speeds, better scalability, and cleaner, evolvable system designs. But implementing your first microservices architecture is difficult. How do you make myriad choices, educate your team on all the technical details, and lead the organization to a successful execution to maximize your chance of success? With this book, complements of NGINX, authors Ronnie Mitra and Irakli Nadareishvili provide step-by-step guidance for building an effective microservices architecture.

They guide you on an implementation journey based on techniques and architectures that have proven to work for microservices systems – building an operating model, a microservices design, an infrastructure foundation, and two working microservices, then putting those pieces together as a single implementation. For anyone tasked with building microservices or a microservices architecture, this guide is invaluable.

In this eBook you will learn:

  • How to design an effective and explicit microservices system end-to-end
  • About forming teams, assigning responsibilities, and working together
  • How to slice a big application into a collection of microservices
  • About building a simple yet powerful CI/CD pipeline for infrastructure changes


El patrón observador en la programación reactiva


El patrón Observer es un patrón de diseño en el que hay dos tipos de objetos: observadores y sujetos observables. Un observador es un objeto que observa los cambios de uno o más sujetos; un sujeto es un objeto que mantiene una lista de sus observadores y les notifica automáticamente cuando cambia de estado.

La definición del patrón Observer del libro "Gang of Four" (Design Patterns: Elements of Reusable Object-Oriented Software por Erich Gamma, Richard Helm, Ralph Johnson y John Vlissides, ISBN 0-201-63361-2) es "Defina una dependencia de uno a varios entre objetos para que cuando un objeto cambie de estado, todos sus dependientes sean notificados y actualizados automáticamente".

Este patrón es el núcleo de la programación reactiva. Se ajusta perfectamente al concepto de programación reactiva al proporcionar las estructuras para implementar el mecanismo produce/react

Java SDK implementa el patrón Observer con la clase java.util.Observable y la interfaz java.util.Observer.

class Subject extends java.util.Observable {
    public void doWorkAndNotify() {
        Object result = doWork();
        notifyObservers(result);
    }
}

class MyObserver implements Observer {
    @Override
    public void update(Observable obs, Object item) {
        doSomethingWith(item)
    }
}

La clase Subject extiende java.utils.Observable y es responsable de producir un objeto y notificar a los observadores tan pronto como se haya producido el elemento.

MyObserver implementa Observer y es responsable de observar al sujeto y consumir todos los elementos que produce el sujeto.

Veamos como interactuan estos objetos: 

MyObserver myObserver = new MyObserver();
Subject subject = new Subject();
subject.addObserver(myObserver);
subject.doWorkAndNotify();

Desafortunadamente, esta implementación se revela demasiado simple cuando comienza a escribir una lógica más compleja. No se utilizan estas implementaciones.

miércoles, 8 de septiembre de 2021

Programación reactiva


La programación reactiva lleva la programación funcional un poco más allá, agregando el concepto de flujo de datos y propagación de cambios de datos.

En programación imperativa, se puede asignar un valor a una variable de la siguiente manera:

x = y + z

Aquí, la suma de y+z se asignará a la variable x , si llamamos esta función, los cambios que ocurran a y y a z no modificaran a x.

En la programación reactiva, el valor de x debe actualizarse siempre que cambien los valores de y o z.

Entonces, si los valores iniciales son y = 1 y z = 1, tendrás

x = y + z = 2.

Si y (o z) cambia su valor, esto no significa que x cambia automáticamente, pero debe implementar un mecanismo para actualizar los valores de x cuando se cambian los valores de y y z.

La programación reactiva funcional es un nuevo paradigma de programación; Erik Meijer lo hizo popular (quien creó la biblioteca Rx para .NET cuando trabajaba en Microsoft) y se basa en dos conceptos:

  • El código "reacciona" a los eventos.
  • El código maneja los valores a medida que varían en el tiempo, propagando los cambios a cada parte del código que usa esos valores.

La clave para entender la programación reactiva es pensar en ella como si operara en un flujo de datos.

Pero, ¿qué quiero decir con "flujo de datos"? Se refiere a una secuencia de eventos, donde un evento podría ser una entrada del usuario (como un toque en un botón), una respuesta de una solicitud de API (como un feed de Facebook), datos contenidos en una colección o incluso una sola variable.

En la programación reactiva, a menudo hay un componente que actúa como fuente, emitiendo una secuencia de elementos (o un flujo de datos) y algunos otros componentes que observan este flujo de elementos y reaccionan a cada elemento emitido ("reaccionan" al elemento emisión).

martes, 7 de septiembre de 2021

Quarkus para desarrolladores Spring



Quiero recomendarles este libro gratuito Quarkus for Spring Developers de la gente de Red hat. 

Quarkus for Spring Developers presenta Quarkus a los desarrolladores de Java, con un ojo especial en ayudar a quienes están familiarizados con las convenciones de Spring a realizar una transición rápida y sencilla.

Y sin más dejo link: 

https://developers.redhat.com/books/quarkus-for-spring-developers?sc_cid=7013a000002ph3pAAA

lunes, 6 de septiembre de 2021

Creando la primera aplicación con Quarkus parte 3

Seguimos con quarkus

quarkus:dev ejecuta Quarkus en modo desarrollo. Esto permite deployment en caliente con compilación en segundo plano, lo que significa que cuando modifica sus archivos Java y / o sus archivos de recursos y actualiza su navegador, estos cambios se aplicarán automáticamente. Esto también funciona para archivos de recursos como el archivo de propiedades de configuración. La actualización del navegador desencadena un análisis del espacio de trabajo y, si se detecta algún cambio, los archivos Java se vuelven a compilar y la aplicación se vuelve a implementar; su solicitud luego es atendida por la aplicación redistribuida. Si hay algún problema con la compilación o la implementación, una página de error se lo informará.

Esto también escuchará un depurador en el puerto 5005. Si desea esperar a que el depurador se conecte antes de ejecutarse, puede pasar -Dsuspend en la línea de comandos. Si no desea el depurador en absoluto, puede usar -Ddebug = false.

En mi caso quiero hacer debug con intellij pero no funciona, si el deploy en caliente pero no me funciona el debug llamando a gradle quarkus:dev en debug.  Lo que hay que hacer es muy fácil, ir a run -> attach to process y en ese menú se va a listar el puerto 5005 que es el puerto de debug y luego el debug anda joya. 

Ya tenemos el debug y el deployment en caliente. Ahora vamos a ver los test, los test unitarios tienen que estar y escapa a este post y a la tecnología que usemos. Ahora bien, esta bueno hacer test de integración para poder revisar toda mi aplicación. desde el servicio REST a la base. Para esto, quarkus nos crea un test por defecto : 

@QuarkusTest

public class GreetingResourceTest {


    @Test

    public void testHelloEndpoint() {

        given()

          .when().get("/hello")

          .then()

             .statusCode(200)

             .body(is("Hello RESTEasy"));

    }


}

Con esto estamos probando que el servicio /hello retorne 200 y su body sea "Hello RESTEasy"

Si corremos el test, quarqus nos levanta el server y le pega. 

Vamos a hacer un test de nuestro servicio /hello/mundo 

    @Test

    public void testHelloWithParameterEndpoint() {

        var name = "Mundo";

        given()

                .when().get("/hello/" + name)

                .then()

                .statusCode(200)

                .body(is("hola " + name));

    }


Y listo!! 


 


jueves, 2 de septiembre de 2021

Creando la primera aplicación con Quarkus parte 2

 Seguimos con quarkus


Ahora vamos a tomar la app del post pasado y vamos a hacer un servicio que salude y lo vamos a inyectar y luego creamos un web services REST que permita utilizarlo. 


import javax.enterprise.context.ApplicationScoped;


@ApplicationScoped

public class GreetingServices {


    public String greeting(String name) {

        return "holas " + name;

    }

}

Y ahora inyectamos : 

import org.jboss.resteasy.annotations.jaxrs.PathParam;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/hello")
public class GreetingResource {

    private GreetingServices service;

    @Inject
    public GreetingResource(GreetingServices service) {
        this.service = service;
    }

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String hello() {
        return "Hello RESTEasy";
    }

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("/{name}")
    public String hello(@PathParam String name) {
        return this.service.greeting(name);
    }
}

Siempre que podamos debemos inyectar a nivel de constructor, de esta manera queda más explicita la dependencia. 

Y listo!! si vamos a http://localhost:8080/hello/mundo vamos a obtener : 

holas mundo