Me llego un mail con este libro gratuito y quiero compartirlo con ustedes:
|
Me llego un mail con este libro gratuito y quiero compartirlo con ustedes:
|
A continuación, se muestra un ejemplo de un particionador personalizado:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
public class BananaPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes,
Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String)))
throw new InvalidRecordException("We expect all messages to have customer name as key")
if (((String) key).equals("Banana"))
return numPartitions; // Banana siempre va estar en la ultima partición
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
}
public void close() {}
La forma más sencilla de crear un observable es utilizar los métodos de fabricación que se implementan en la biblioteca RxJava. Ya usamos el método Observable.from(), en un post anterior.
Observable.just() crea un Observable que emite el objeto o los objetos que se pasan como parámetros:
Observable.just(1, 2, 3, 4, 5)
Por otro lado, un observable que comienza a emitir elementos antes de conectarse a un observador se denomina observable caliente. Con los observables calientes, un observador puede suscribirse y comenzar a recibir elementos en cualquier momento durante la emisión. Con observables calientes, el observador puede recibir la secuencia completa de elementos comenzando desde el principio o no.
Veamos un ejemplo más concreto, pero simple.
Creemos un Observable que emita todos los números enteros del 1 al 5 y suscríbase a él:
Observable<Integer> observable = Observable.from(new Integer[]{1, 2, 3, 4, 5});
observable.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence completed!");
}
@Override
public void onError(Throwable e) {
System.err.println("Exception: " + e.getMessage());
}
@Override
public void onNext(Integer integer) {
System.out.println("next item is: " + integer);
}
});
Los objetos ProducerRecord incluyen un nombre de tema, una clave y un valor. Los mensajes de Kafka son pares clave-valor y, si bien es posible crear un ProducerRecord con solo un tema y un valor, con la clave establecida en nula de forma predeterminada, la mayoría de las aplicaciones producen registros con claves. Las claves sirven para dos objetivos: son información adicional que se almacena con el mensaje y también se utilizan para decidir en cuál de las particiones de tema se escribirá el mensaje. Todos los mensajes con la misma clave irán a la misma partición. Esto significa que si un proceso está leyendo solo un subconjunto de las particiones en un tema, todos los registros para una sola clave serán leídos por el mismo proceso. Para crear un registro de valor-clave, simplemente cree un ProducerRecord de la siguiente manera:
ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");
Al crear mensajes con una clave nula, simplemente puede omitir la clave:
ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "USA");
Como vimos anteriormente un flujo de datos lanza diferentes tipos de señales, next si hay un próximo dato y complete si se finalizo el flujo de datos.
La interfaz rx.Observer <T> define los métodos, onNext, onCompleted, onError(Throwable). onError nos indica que sucedió un error y el flujo de datos no puede trasmitir más.
Veamos un ejemplo:
public void subscribeToObservable(Observable<T> observable) {
observable.subscribe(new Subscriber<>() {
@Override
public void onCompleted() {
// invoked when Observable stops emitting items
}
@Override
public void onError(Throwable e) {
// invoked when Observable throws an exception
// while emitting items
}
@Override
public void onNext(T nextItem) {
// invoked when Observable emits an item
// usually you will consume the nextItem here
}
});
}
Usamos Subscriber<T> dado que es un objeto que implementa la interfaz rx.Observer <T>. La razón por la que utiliza Subscriber en lugar de cualquier otra implementación de la interfaz de Observer es que el Subscriber también implementa la interfaz de Suscripción, que le permite verificar si el suscriptor está cancelado (con el método isUnsubscriptions ()) y cancelar su suscripción.
En el ejemplo anterior, observe que un observador reacciona a tres tipos de eventos:
Un observable no puede notificar los métodos onCompleted y onError, solo uno de ellos. Siempre será el último método invocado.
Con el método Observable.subscribe () (una operación llamada suscripción), puede conectar un Observable a un Observer, pero ¿qué sucede si desea desconectarlos? Esta la operación llamada unsubscribe:
Un observable emite una secuencia que puede ser vacía, finita o infinita. Cuando la secuencia es finita, se emite un evento completo después del final de la secuencia. En cualquier momento durante la emisión (pero no después de su finalización) se puede emitir un evento de error, deteniendo la emisión y cancelando la emisión del evento completo.
Cuando la secuencia está vacía, solo se emite el evento completo, sin emitir ningún ítem. Con una secuencia infinita, el evento completo nunca se emite.
La emisión se puede transformar, filtrar o combinar con otras emisiones, etc.
Un observador es un objeto que se suscribe a un observable. Escucha y reacciona a cualquier secuencia de elementos emitida por el Observable.
El Observer no está bloqueado mientras espera nuevos elementos emitidos, por lo que en operaciones simultáneas, no se produce ningún bloqueo. Simplemente se activa cuando se emite un nuevo elemento.
Este es uno de los principios fundamentales de la programación reactiva: en lugar de ejecutar instrucciones una a la vez (siempre esperando a que se complete la instrucción anterior), el observable proporciona un mecanismo para recuperar y transformar datos, y el Observer activa este mecanismo, todos de forma concurrente.
El siguiente pseudocódigo es un ejemplo del método que implementa el Observer que reacciona a los elementos del Observable:
onNext = { it -> doSomething }
Vamos a guardar unos saludos para luego poder recuperarlos y usarlos para saludar.
Para esto vamos a crear una clase con el saludo :
package com.hexacta.model;
import javax.persistence.*;
import java.util.Objects;
@Entity
public class Greeting {
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(nullable = false)
@Id
private Integer id;
@Column
private String value;
public void setId(Integer id) {
this.id = id;
}
public Integer getId() {
return id;
}
public Greeting() {}
public Greeting(String value) {
this.value = value;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Greeting greeting = (Greeting) o;
return Objects.equals(value, greeting.value);
}
@Override
public int hashCode() {
return Objects.hash(value);
}
}
Luego vamos a hacer una clase DAO para acceso a la base de datos :
package com.hexacta.dao;
import com.hexacta.model.Greeting;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.persistence.EntityManager;
@ApplicationScoped
public class GreetingDAO {
@Inject
private EntityManager em;
public int save(Greeting aGreeting) {
this.em.persist(aGreeting);
return aGreeting.getId();
}
public Greeting get(int id) {
var qr = em.createQuery("from com.hexacta.model.Greeting g " +
"where g.id = ?1");
qr.setParameter(1, id);
return (Greeting) qr.getSingleResult();
}
}
Modificamos el servicio para que permita guardar los saludos :
package com.hexacta;
import com.hexacta.dao.GreetingDAO;
import com.hexacta.model.Greeting;
import javax.transaction.Transactional;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
@ApplicationScoped
public class GreetingServices {
@Inject
private GreetingDAO dao;
public String greeting(String name) {
return "hola " + name;
}
public String greeting(int id,String name) {
Greeting aGreeting = dao.get(id);
if (aGreeting == null) {
return name;
}
return aGreeting.getValue() + " " + name;
}
@Transactional
public int saveGreeting(String greeting) {
Greeting aGreeting = new Greeting(greeting);
return dao.save(aGreeting);
}
}
Y por último vamos a modificar los servicios REST :
package com.hexacta;
import org.jboss.resteasy.annotations.jaxrs.PathParam;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/hello")
public class GreetingResource {
private GreetingServices service;
@Inject
public GreetingResource(GreetingServices service) {
this.service = service;
}
@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
return "Hello RESTEasy";
}
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/{name}")
public String hello(@PathParam String name) {
return this.service.greeting(name);
}
@POST
@Path("/save/{greeting}")
public int saveGreeting(@PathParam String greeting) {
return this.service.saveGreeting(greeting);
}
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/{id}/{name}")
public String hello(@PathParam int id,@PathParam String name) {
return this.service.greeting(id, name);
}
}
Y tengo que configurar h2 que es la base que estamos usando, en el application.properties :
# datasource configuration
quarkus.datasource.db-kind = h2
quarkus.datasource.username = sa
quarkus.datasource.password = a
quarkus.datasource.jdbc.url = jdbc:h2:~/quarkus.db
quarkus.hibernate-orm.database.generation=update
Y listo, podemos guardar diferentes saludos y usarlos.
Dejo el link del repo :
https://github.com/emanuelpeg/quarkusExample/
java -jar build/quarkus-app/quarkus-run.jar
y listo tenemos nuestra app andando en http://localhost:8080/hello
El directorio quarkus-app que contiene el archivo jar quarkus-run.jar, que es un jar ejecutable. Pero no tiene todas las dependencias estas se copian en subdirectorios de quarkus-app/lib /. Por lo tanto debemos deployar todo el directorio quarkus-app la primera vez y cada vez que hay cambio de librerías.
Bueno, ya estamos! con esto tenemos nuestra app andando pero vamos a hacer un paso más vamos a hacer nuestra applicación nativa. Para eso debemos utilizar GraalVM 11.
La construcción de un ejecutable nativo requiere el uso de una distribución de GraalVM. Hay tres distribuciones: Oracle GraalVM Community Edition (CE), Oracle GraalVM Enterprise Edition (EE) y Mandrel. Las diferencias entre las distribuciones de Oracle y Mandrel son las siguientes:
Mandrel es una distribución descendente de Oracle GraalVM CE. El objetivo principal de Mandrel es proporcionar una forma de crear ejecutables nativos diseñados específicamente para admitir Quarkus.
Las versiones de Mandrel se crean a partir de una base de código derivada de la base de código anterior de Oracle GraalVM CE, con solo cambios menores pero algunas exclusiones importantes que no son necesarias para las aplicaciones nativas de Quarkus. Admiten las mismas capacidades para crear ejecutables nativos que Oracle GraalVM CE, sin cambios significativos en la funcionalidad. En particular, no incluyen soporte para programación políglota. El motivo de estas exclusiones es proporcionar un mejor nivel de soporte para la mayoría de los usuarios de Quarkus. Estas exclusiones también significan que Mandrel ofrece una reducción considerable en su tamaño de distribución en comparación con Oracle GraalVM CE/EE.
Mandrel está construido de forma ligeramente diferente a Oracle GraalVM CE, utilizando el proyecto estándar OpenJDK. Esto significa que no se beneficia de algunas pequeñas mejoras que Oracle ha agregado a la versión de OpenJDK utilizada para crear sus propias descargas de GraalVM. Estas mejoras se omiten porque OpenJDK no las gestiona y no puede responder por ellas. Esto es particularmente importante cuando se trata de conformidad y seguridad.
Actualmente, Mandrel solo se recomienda para compilar ejecutables nativos destinados a entornos Linux en contenedores. Esto significa que los usuarios de Mandrel deben usar contenedores para construir sus ejecutables nativos. Si está creando ejecutables nativos para plataformas de destino macOS o Windows, debería considerar usar Oracle GraalVM en su lugar, porque Mandrel no se dirige actualmente a estas plataformas. Es posible compilar ejecutables nativos directamente en Linux.
Los requisitos previos varían ligeramente dependiendo de si está utilizando Oracle GraalVM CE/EE o Mandrel.
Primero, tenemos que configurar las variables de entorno JAVA_HOME y GRAALVM_HOME (y tener instalado docker).
Y luego tenemos que hacer :
gradel build -Dquarkus.package.type=native
docker build -f src/main/docker/Dockerfile.native -t quarkus/demo .
docker run -i --rm -p 8080:8080 quarkus/demo
Cuando lo corrí la primera vez, me tiro error porque estoy usando Windows y me dijo que instale el generador de imágenes windows haciendo esto :
gu install native-image
Los archivos Avro deben almacenar el esquema completo en el archivo de datos que están asociados, almacenar el esquema completo en cada registro generalmente será más del doble del tamaño del registro. Para resolver este problema podemos utilizar un registro de esquemas. Los Schema Registry no forma parte de Apache Kafka, pero hay varias opciones de código abierto para elegir. Usaremos Confluent Schema Registry para este ejemplo.
La idea es almacenar todos los esquemas utilizados luego, simplemente almacenamos el identificador del esquema en el registro que producimos en Kafka. Los consumidores pueden usar el identificador para extraer el registro del registro de esquema y deserializar los datos.
A continuación, se muestra un ejemplo de cómo producir objetos Avro generados en Kafka:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts";
int wait = 500;
Producer<String, Customer> producer = new KafkaProducer<String,Customer>(props);
while (true) {
Customer customer = CustomerGenerator.getNext();
System.out.println("Generated customer " + customer.toString());
ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getId(), customer);
producer.send(record);
}
Apache Avro es un formato de serialización de datos independiente del lenguaje. El proyecto fue creado por Doug Cutting para proporcionar una forma de compartir archivos de datos con una gran audiencia.
Los datos de Avro se describen en un esquema independiente del lenguaje. El esquema generalmente se describe en JSON y la serialización suele ser en archivos binarios, aunque también se admite la serialización en JSON. Avro asume que el esquema está presente al leer y escribir archivos, generalmente incrustando el esquema en los propios archivos.
Supongamos que el esquema original fuera:
{
"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string""},
{"name": "faxNumber", "type": ["null", "string"], "default": "null"}
]
}
Puff como pasan las versiones de java, imparable. Como características nuevas tenemos :
306: Restore Always-Strict Floating-Point Semantics
356: Enhanced Pseudo-Random Number Generators
382: New macOS Rendering Pipeline
391: macOS/AArch64 Port
398: Deprecate the Applet API for Removal
403: Strongly Encapsulate JDK Internals
406: Pattern Matching for switch (Preview)
407: Remove RMI Activation
409: Sealed Classes
410: Remove the Experimental AOT and JIT Compiler
411: Deprecate the Security Manager for Removal
412: Foreign Function & Memory API (Incubator)
414: Vector API (Second Incubator)
415: Context-Specific Deserialization Filters
Bueno, este post es para hacerme eco de la noticia, vamos a ir probando más adelante...
Dejo link : https://jdk.java.net/17/
RxJava es un framwork que implementa los conceptos de ReactiveX en Java. Veamos un ejemplo RxJava:
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5);
Observable.from(input).filter(new Func1() {
@Override
public Boolean call(Integer x) {
return x % 2 == 0;
}
})
o usando lambda :
Observable.from(input).filter(x -> x % 2 == 0);
La biblioteca RxJava se creó en Netflix como una alternativa más inteligente a Java Futures y devoluciones de llamada. Tanto los futuros como las devoluciones de llamada son fáciles de usar cuando solo hay un nivel de ejecución asincrónica, pero son difíciles de administrar cuando están anidados.
El siguiente ejemplo muestra cómo se maneja el problema de las devoluciones de llamada anidadas en RxJava.
Suponga que necesita llamar a una API remota para autenticar a un usuario, luego a otra para obtener los datos del usuario y a otra API para obtener los contactos de un usuario. Normalmente, tendría que escribir llamadas a API anidadas y hacer complejos callbacks. Pero con RxJava se puede hacer así :
serviceEndpoint.login().doOnNext(accessToken -> storeCredentials(accessToken)).flatMap(accessToken -> serviceEndpoint.getUser()).flatMap(user -> serviceEndpoint.getUserContact(user.getId()))
Cuando el objeto que necesita enviar a Kafka no es una simple cadena o un entero, tiene la opción de usar una biblioteca de serialización genérica como Avro, Thrift o Protobuf para crear registros, o crear una serialización personalizada para los objetos que ya está usando .
Suponga que en lugar de registrar solo el nombre del cliente, crea una clase simple para representar a los clientes:
public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
}
}
|
La definición del patrón Observer del libro "Gang of Four" (Design Patterns: Elements of Reusable Object-Oriented Software por Erich Gamma, Richard Helm, Ralph Johnson y John Vlissides, ISBN 0-201-63361-2) es "Defina una dependencia de uno a varios entre objetos para que cuando un objeto cambie de estado, todos sus dependientes sean notificados y actualizados automáticamente".
Este patrón es el núcleo de la programación reactiva. Se ajusta perfectamente al concepto de programación reactiva al proporcionar las estructuras para implementar el mecanismo produce/react
En programación imperativa, se puede asignar un valor a una variable de la siguiente manera:
x = y + z
Aquí, la suma de y+z se asignará a la variable x , si llamamos esta función, los cambios que ocurran a y y a z no modificaran a x.
En la programación reactiva, el valor de x debe actualizarse siempre que cambien los valores de y o z.
Entonces, si los valores iniciales son y = 1 y z = 1, tendrás
x = y + z = 2.
Si y (o z) cambia su valor, esto no significa que x cambia automáticamente, pero debe implementar un mecanismo para actualizar los valores de x cuando se cambian los valores de y y z.
La programación reactiva funcional es un nuevo paradigma de programación; Erik Meijer lo hizo popular (quien creó la biblioteca Rx para .NET cuando trabajaba en Microsoft) y se basa en dos conceptos:
Quarkus for Spring Developers presenta Quarkus a los desarrolladores de Java, con un ojo especial en ayudar a quienes están familiarizados con las convenciones de Spring a realizar una transición rápida y sencilla.
Y sin más dejo link:
https://developers.redhat.com/books/quarkus-for-spring-developers?sc_cid=7013a000002ph3pAAA