Quiero compartirles el informe que hizo los amigos de jetbrains, sobre el estado del ecosistema del desarrollador 2021.
Dejo link: https://www.jetbrains.com/es-es/lp/devecosystem-2021/
Dejo link: https://www.jetbrains.com/es-es/lp/devecosystem-2021/
Los consumidores de Kafka suelen formar parte de un grupo de consumidores. Cuando varios consumidores están suscritos a un tema y pertenecen al mismo grupo de consumidores, cada consumidor del grupo recibirá mensajes de un subconjunto diferente de las particiones del tema.
Tomemos el tema T1 con cuatro particiones. Ahora suponga que creamos un nuevo consumidor, C1, que es el único consumidor del grupo G1, y lo usamos para suscribirse al tema T1. El consumidor C1 recibirá todos los mensajes de las cuatro particiones t1.
Si agregamos otro consumidor, C2, al grupo G1, cada consumidor solo recibirá mensajes de dos particiones. Quizás los mensajes de la partición 0 y 2 van a C1 y los mensajes de las particiones 1 y 3 van al consumidor C2.
Si G1 tiene cuatro consumidores, cada uno leerá los mensajes de una sola partición.
Si agregamos más consumidores a un solo grupo con un solo tema que las particiones que tenemos, algunos de los consumidores estarán inactivos y no recibirán ningún mensaje.
La principal forma en que escalamos el consumo de datos de un tema de Kafka es agregando más consumidores a un grupo de consumidores. Es común que los consumidores de Kafka realicen operaciones de alta latencia, como escribir en una base de datos o un cálculo lento de los datos. En estos casos, es posible que un solo consumidor no pueda mantenerse al día con los flujos de datos de velocidad en un tema, y agregar más consumidores que compartan la carga al hacer que cada consumidor posea solo un subconjunto de las particiones y los mensajes es nuestro método principal de escalado. Esta es una buena razón para crear temas con una gran cantidad de particiones: permite agregar más consumidores cuando aumenta la carga. Tenga en cuenta que no tiene sentido agregar más consumidores de los que tiene particiones en un tema; algunos de los consumidores simplemente estarán inactivos.
Además de agregar consumidores para escalar una sola aplicación, es muy común tener múltiples aplicaciones que necesitan leer datos del mismo tema. De hecho, uno de los principales objetivos de diseño en Kafka era hacer que los datos producidos para los temas de Kafka estuvieran disponibles para muchos casos de uso en toda la organización. En esos casos, queremos que cada aplicación obtenga todos los mensajes, en lugar de solo un subconjunto. Para asegurarse de que una aplicación reciba todos los mensajes de un tema, asegúrese de que la aplicación tenga su propio grupo de consumidores. A diferencia de muchos sistemas de mensajería tradicionales, Kafka se adapta a una gran cantidad de consumidores y grupos de consumidores sin reducir el rendimiento. En el ejemplo anterior, si agregamos un nuevo grupo de consumidores G2 con un solo consumidor, este consumidor obtendrá todos los mensajes del tema T1 independientemente de lo que esté haciendo G1. G2 puede tener más de un consumidor, en cuyo caso cada uno obtendrá un subconjunto de particiones, tal como mostramos para G1, pero G2 en su conjunto seguirá recibiendo todos los mensajes independientemente de otros grupos de consumidores.
Para resumir, crea un nuevo grupo de consumidores para cada aplicación que necesita todos los mensajes de uno o más temas. Agrega consumidores a un grupo de consumidores existente para escalar la lectura y el procesamiento de mensajes de los temas, por lo que cada consumidor adicional en un grupo solo obtendrá un subconjunto de los mensajes.
Me llego un mail con este libro gratuito y quiero compartirlo con ustedes:
|
A continuación, se muestra un ejemplo de un particionador personalizado:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
public class BananaPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes,
Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String)))
throw new InvalidRecordException("We expect all messages to have customer name as key")
if (((String) key).equals("Banana"))
return numPartitions; // Banana siempre va estar en la ultima partición
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
}
public void close() {}
La forma más sencilla de crear un observable es utilizar los métodos de fabricación que se implementan en la biblioteca RxJava. Ya usamos el método Observable.from(), en un post anterior.
Observable.just() crea un Observable que emite el objeto o los objetos que se pasan como parámetros:
Observable.just(1, 2, 3, 4, 5)
Por otro lado, un observable que comienza a emitir elementos antes de conectarse a un observador se denomina observable caliente. Con los observables calientes, un observador puede suscribirse y comenzar a recibir elementos en cualquier momento durante la emisión. Con observables calientes, el observador puede recibir la secuencia completa de elementos comenzando desde el principio o no.
Veamos un ejemplo más concreto, pero simple.
Creemos un Observable que emita todos los números enteros del 1 al 5 y suscríbase a él:
Observable<Integer> observable = Observable.from(new Integer[]{1, 2, 3, 4, 5});
observable.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence completed!");
}
@Override
public void onError(Throwable e) {
System.err.println("Exception: " + e.getMessage());
}
@Override
public void onNext(Integer integer) {
System.out.println("next item is: " + integer);
}
});
Los objetos ProducerRecord incluyen un nombre de tema, una clave y un valor. Los mensajes de Kafka son pares clave-valor y, si bien es posible crear un ProducerRecord con solo un tema y un valor, con la clave establecida en nula de forma predeterminada, la mayoría de las aplicaciones producen registros con claves. Las claves sirven para dos objetivos: son información adicional que se almacena con el mensaje y también se utilizan para decidir en cuál de las particiones de tema se escribirá el mensaje. Todos los mensajes con la misma clave irán a la misma partición. Esto significa que si un proceso está leyendo solo un subconjunto de las particiones en un tema, todos los registros para una sola clave serán leídos por el mismo proceso. Para crear un registro de valor-clave, simplemente cree un ProducerRecord de la siguiente manera:
ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");
Al crear mensajes con una clave nula, simplemente puede omitir la clave:
ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "USA");
Como vimos anteriormente un flujo de datos lanza diferentes tipos de señales, next si hay un próximo dato y complete si se finalizo el flujo de datos.
La interfaz rx.Observer <T> define los métodos, onNext, onCompleted, onError(Throwable). onError nos indica que sucedió un error y el flujo de datos no puede trasmitir más.
Veamos un ejemplo:
public void subscribeToObservable(Observable<T> observable) {
observable.subscribe(new Subscriber<>() {
@Override
public void onCompleted() {
// invoked when Observable stops emitting items
}
@Override
public void onError(Throwable e) {
// invoked when Observable throws an exception
// while emitting items
}
@Override
public void onNext(T nextItem) {
// invoked when Observable emits an item
// usually you will consume the nextItem here
}
});
}
Usamos Subscriber<T> dado que es un objeto que implementa la interfaz rx.Observer <T>. La razón por la que utiliza Subscriber en lugar de cualquier otra implementación de la interfaz de Observer es que el Subscriber también implementa la interfaz de Suscripción, que le permite verificar si el suscriptor está cancelado (con el método isUnsubscriptions ()) y cancelar su suscripción.
En el ejemplo anterior, observe que un observador reacciona a tres tipos de eventos:
Un observable no puede notificar los métodos onCompleted y onError, solo uno de ellos. Siempre será el último método invocado.
Con el método Observable.subscribe () (una operación llamada suscripción), puede conectar un Observable a un Observer, pero ¿qué sucede si desea desconectarlos? Esta la operación llamada unsubscribe:
Un observable emite una secuencia que puede ser vacía, finita o infinita. Cuando la secuencia es finita, se emite un evento completo después del final de la secuencia. En cualquier momento durante la emisión (pero no después de su finalización) se puede emitir un evento de error, deteniendo la emisión y cancelando la emisión del evento completo.
Cuando la secuencia está vacía, solo se emite el evento completo, sin emitir ningún ítem. Con una secuencia infinita, el evento completo nunca se emite.
La emisión se puede transformar, filtrar o combinar con otras emisiones, etc.
Un observador es un objeto que se suscribe a un observable. Escucha y reacciona a cualquier secuencia de elementos emitida por el Observable.
El Observer no está bloqueado mientras espera nuevos elementos emitidos, por lo que en operaciones simultáneas, no se produce ningún bloqueo. Simplemente se activa cuando se emite un nuevo elemento.
Este es uno de los principios fundamentales de la programación reactiva: en lugar de ejecutar instrucciones una a la vez (siempre esperando a que se complete la instrucción anterior), el observable proporciona un mecanismo para recuperar y transformar datos, y el Observer activa este mecanismo, todos de forma concurrente.
El siguiente pseudocódigo es un ejemplo del método que implementa el Observer que reacciona a los elementos del Observable:
onNext = { it -> doSomething }
Vamos a guardar unos saludos para luego poder recuperarlos y usarlos para saludar.
Para esto vamos a crear una clase con el saludo :
package com.hexacta.model;
import javax.persistence.*;
import java.util.Objects;
@Entity
public class Greeting {
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(nullable = false)
@Id
private Integer id;
@Column
private String value;
public void setId(Integer id) {
this.id = id;
}
public Integer getId() {
return id;
}
public Greeting() {}
public Greeting(String value) {
this.value = value;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Greeting greeting = (Greeting) o;
return Objects.equals(value, greeting.value);
}
@Override
public int hashCode() {
return Objects.hash(value);
}
}
Luego vamos a hacer una clase DAO para acceso a la base de datos :
package com.hexacta.dao;
import com.hexacta.model.Greeting;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.persistence.EntityManager;
@ApplicationScoped
public class GreetingDAO {
@Inject
private EntityManager em;
public int save(Greeting aGreeting) {
this.em.persist(aGreeting);
return aGreeting.getId();
}
public Greeting get(int id) {
var qr = em.createQuery("from com.hexacta.model.Greeting g " +
"where g.id = ?1");
qr.setParameter(1, id);
return (Greeting) qr.getSingleResult();
}
}
Modificamos el servicio para que permita guardar los saludos :
package com.hexacta;
import com.hexacta.dao.GreetingDAO;
import com.hexacta.model.Greeting;
import javax.transaction.Transactional;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
@ApplicationScoped
public class GreetingServices {
@Inject
private GreetingDAO dao;
public String greeting(String name) {
return "hola " + name;
}
public String greeting(int id,String name) {
Greeting aGreeting = dao.get(id);
if (aGreeting == null) {
return name;
}
return aGreeting.getValue() + " " + name;
}
@Transactional
public int saveGreeting(String greeting) {
Greeting aGreeting = new Greeting(greeting);
return dao.save(aGreeting);
}
}
Y por último vamos a modificar los servicios REST :
package com.hexacta;
import org.jboss.resteasy.annotations.jaxrs.PathParam;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/hello")
public class GreetingResource {
private GreetingServices service;
@Inject
public GreetingResource(GreetingServices service) {
this.service = service;
}
@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
return "Hello RESTEasy";
}
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/{name}")
public String hello(@PathParam String name) {
return this.service.greeting(name);
}
@POST
@Path("/save/{greeting}")
public int saveGreeting(@PathParam String greeting) {
return this.service.saveGreeting(greeting);
}
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/{id}/{name}")
public String hello(@PathParam int id,@PathParam String name) {
return this.service.greeting(id, name);
}
}
Y tengo que configurar h2 que es la base que estamos usando, en el application.properties :
# datasource configuration
quarkus.datasource.db-kind = h2
quarkus.datasource.username = sa
quarkus.datasource.password = a
quarkus.datasource.jdbc.url = jdbc:h2:~/quarkus.db
quarkus.hibernate-orm.database.generation=update
Y listo, podemos guardar diferentes saludos y usarlos.
Dejo el link del repo :
https://github.com/emanuelpeg/quarkusExample/
java -jar build/quarkus-app/quarkus-run.jar
y listo tenemos nuestra app andando en http://localhost:8080/hello
El directorio quarkus-app que contiene el archivo jar quarkus-run.jar, que es un jar ejecutable. Pero no tiene todas las dependencias estas se copian en subdirectorios de quarkus-app/lib /. Por lo tanto debemos deployar todo el directorio quarkus-app la primera vez y cada vez que hay cambio de librerías.
Bueno, ya estamos! con esto tenemos nuestra app andando pero vamos a hacer un paso más vamos a hacer nuestra applicación nativa. Para eso debemos utilizar GraalVM 11.
La construcción de un ejecutable nativo requiere el uso de una distribución de GraalVM. Hay tres distribuciones: Oracle GraalVM Community Edition (CE), Oracle GraalVM Enterprise Edition (EE) y Mandrel. Las diferencias entre las distribuciones de Oracle y Mandrel son las siguientes:
Mandrel es una distribución descendente de Oracle GraalVM CE. El objetivo principal de Mandrel es proporcionar una forma de crear ejecutables nativos diseñados específicamente para admitir Quarkus.
Las versiones de Mandrel se crean a partir de una base de código derivada de la base de código anterior de Oracle GraalVM CE, con solo cambios menores pero algunas exclusiones importantes que no son necesarias para las aplicaciones nativas de Quarkus. Admiten las mismas capacidades para crear ejecutables nativos que Oracle GraalVM CE, sin cambios significativos en la funcionalidad. En particular, no incluyen soporte para programación políglota. El motivo de estas exclusiones es proporcionar un mejor nivel de soporte para la mayoría de los usuarios de Quarkus. Estas exclusiones también significan que Mandrel ofrece una reducción considerable en su tamaño de distribución en comparación con Oracle GraalVM CE/EE.
Mandrel está construido de forma ligeramente diferente a Oracle GraalVM CE, utilizando el proyecto estándar OpenJDK. Esto significa que no se beneficia de algunas pequeñas mejoras que Oracle ha agregado a la versión de OpenJDK utilizada para crear sus propias descargas de GraalVM. Estas mejoras se omiten porque OpenJDK no las gestiona y no puede responder por ellas. Esto es particularmente importante cuando se trata de conformidad y seguridad.
Actualmente, Mandrel solo se recomienda para compilar ejecutables nativos destinados a entornos Linux en contenedores. Esto significa que los usuarios de Mandrel deben usar contenedores para construir sus ejecutables nativos. Si está creando ejecutables nativos para plataformas de destino macOS o Windows, debería considerar usar Oracle GraalVM en su lugar, porque Mandrel no se dirige actualmente a estas plataformas. Es posible compilar ejecutables nativos directamente en Linux.
Los requisitos previos varían ligeramente dependiendo de si está utilizando Oracle GraalVM CE/EE o Mandrel.
Primero, tenemos que configurar las variables de entorno JAVA_HOME y GRAALVM_HOME (y tener instalado docker).
Y luego tenemos que hacer :
gradel build -Dquarkus.package.type=native
docker build -f src/main/docker/Dockerfile.native -t quarkus/demo .
docker run -i --rm -p 8080:8080 quarkus/demo
Cuando lo corrí la primera vez, me tiro error porque estoy usando Windows y me dijo que instale el generador de imágenes windows haciendo esto :
gu install native-image
Los archivos Avro deben almacenar el esquema completo en el archivo de datos que están asociados, almacenar el esquema completo en cada registro generalmente será más del doble del tamaño del registro. Para resolver este problema podemos utilizar un registro de esquemas. Los Schema Registry no forma parte de Apache Kafka, pero hay varias opciones de código abierto para elegir. Usaremos Confluent Schema Registry para este ejemplo.
La idea es almacenar todos los esquemas utilizados luego, simplemente almacenamos el identificador del esquema en el registro que producimos en Kafka. Los consumidores pueden usar el identificador para extraer el registro del registro de esquema y deserializar los datos.
A continuación, se muestra un ejemplo de cómo producir objetos Avro generados en Kafka:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts";
int wait = 500;
Producer<String, Customer> producer = new KafkaProducer<String,Customer>(props);
while (true) {
Customer customer = CustomerGenerator.getNext();
System.out.println("Generated customer " + customer.toString());
ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getId(), customer);
producer.send(record);
}
Apache Avro es un formato de serialización de datos independiente del lenguaje. El proyecto fue creado por Doug Cutting para proporcionar una forma de compartir archivos de datos con una gran audiencia.
Los datos de Avro se describen en un esquema independiente del lenguaje. El esquema generalmente se describe en JSON y la serialización suele ser en archivos binarios, aunque también se admite la serialización en JSON. Avro asume que el esquema está presente al leer y escribir archivos, generalmente incrustando el esquema en los propios archivos.
Supongamos que el esquema original fuera:
{
"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string""},
{"name": "faxNumber", "type": ["null", "string"], "default": "null"}
]
}
Puff como pasan las versiones de java, imparable. Como características nuevas tenemos :
306: Restore Always-Strict Floating-Point Semantics
356: Enhanced Pseudo-Random Number Generators
382: New macOS Rendering Pipeline
391: macOS/AArch64 Port
398: Deprecate the Applet API for Removal
403: Strongly Encapsulate JDK Internals
406: Pattern Matching for switch (Preview)
407: Remove RMI Activation
409: Sealed Classes
410: Remove the Experimental AOT and JIT Compiler
411: Deprecate the Security Manager for Removal
412: Foreign Function & Memory API (Incubator)
414: Vector API (Second Incubator)
415: Context-Specific Deserialization Filters
Bueno, este post es para hacerme eco de la noticia, vamos a ir probando más adelante...
Dejo link : https://jdk.java.net/17/
RxJava es un framwork que implementa los conceptos de ReactiveX en Java. Veamos un ejemplo RxJava:
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5);
Observable.from(input).filter(new Func1() {
@Override
public Boolean call(Integer x) {
return x % 2 == 0;
}
})
o usando lambda :
Observable.from(input).filter(x -> x % 2 == 0);
La biblioteca RxJava se creó en Netflix como una alternativa más inteligente a Java Futures y devoluciones de llamada. Tanto los futuros como las devoluciones de llamada son fáciles de usar cuando solo hay un nivel de ejecución asincrónica, pero son difíciles de administrar cuando están anidados.
El siguiente ejemplo muestra cómo se maneja el problema de las devoluciones de llamada anidadas en RxJava.
Suponga que necesita llamar a una API remota para autenticar a un usuario, luego a otra para obtener los datos del usuario y a otra API para obtener los contactos de un usuario. Normalmente, tendría que escribir llamadas a API anidadas y hacer complejos callbacks. Pero con RxJava se puede hacer así :
serviceEndpoint.login().doOnNext(accessToken -> storeCredentials(accessToken)).flatMap(accessToken -> serviceEndpoint.getUser()).flatMap(user -> serviceEndpoint.getUserContact(user.getId()))