Apache Kafka + String Boot + Avro + Schema Registry

Mario Aliaga
7 min readApr 2, 2021

--

Introducción

Escribo este articulo desde la necesidad de encontrar información para utilizar mensajería de Kafka y su validación con Avro como esquema.

Todo lo que encontré está en inglés y muchas veces no están los códigos funcionales.

El proyecto consiste en una API Rest que recibe un Usuario (nombre y apellido) y lo envía al cluster de Kafka mediante un Producer. Para luego desde el mismo proyecto obtenemos los mensajes por intermedio de un Consumer apuntando al mismo tópico.

Así que les dejo esta ayuda a las personas de habla hispana que necesiten un punto de partida con Spring Boot, Schema Registry y Avro.

Las partes del articulo serán:

  • Breves definiciones
  • Implementación de Cluster Confluent en Local
  • Implementación de Código
  • Pruebas
  • Conclusión

Breves Definiciones

Que es Kafka?

Desde la página misma página de los creadores podemos extraer:

“Apache Kafka is a community distributed event streaming platform capable of handling trillions of events a day. Initially conceived as a messaging queue, Kafka is based on an abstraction of a distributed commit log. Since being created and open sourced by LinkedIn in 2011, Kafka has quickly evolved from messaging queue to a full-fledged event streaming platform.”

confluent.io

Lo que nos dice es que Apache Kafka es una plataforma de transmisión de eventos distribuida, capaz de manejar billones de eventos al día. Creado inicialmente como una cola de mensajería. Y que Kafka se basa en una abstracción de un registro de confirmación distribuido.

Desde que fue creado y abierto por LinkedIn en 2011, Kafka ha evolucionado rápidamente de la cola de mensajería a una plataforma de transmisión de eventos completa.

Hoy tenemos un sistema integral y muy completo para manejar eventos y mensajería con el modelo editor/suscriptor.

Que es Apache Schema Registry?

Es un componente adicional para Apache Kafka, es open source y desarrollado por Confluent, su página acá.

Funciona como repositorio centralizado. El cual permite almacenar y registrar esquemas en formato Avro o Json.

Asegura que los datos se inserten con un esquema en concreto para cumplir con la especificación y estandarización entre productores y consumidores.

https://docs.confluent.io/platform/current/schema-registry/index.html

Que es Avro?

Avro es un marco de serialización de datos y llamadas de procedimiento remoto orientado a filas desarrollado dentro del proyecto Hadoop de Apache. Utiliza JSON para definir tipos de datos y protocolos, y serializa datos en un formato binario compacto.

Referencia completa Apache Avro.

Implementación de Cluster Confluent en Local

Primero nos descargamos los binarios del cluster de Confluent desde acá poniendo nuestro email y el formato TAR.

https://www.confluent.io/get-started

Este comprimido lo desempaquetamos en algún directorio. Yo lo tengo en “/Users/tu_user/tools/”.

Tenemos que dejarlo accesible desde el contexto del sistema para usarlo desde nuestro terminal (uso mac). Para esto editamos el archivo de configuración .zshrc en mi caso (puedes tener bash) y ponemos la siguiente configuración al final de este:

export CONFLUENT_HOME=/Users/tu_user/tools/confluent-6.1.0
export PATH=$PATH:$CONFLUENT_HOME/bin

Siguiente a esto editamos el archivo server.properties de kafka que se encuentra en “/Users/tu_user/tools/confluent-6.1.0/bin

Buscamos la linea del listener y la dejamos así:

listeners=PLAINTEXT://0.0.0.0:9092

Descomentamos la linea 36, del advertised y editamos el host name, dejandolo así:

advertised.listeners=PLAINTEXT://localhost:9092

Y por último descomentamos la seguridad del listener en la linea 39:

listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

Para probar la configuración del cluster tapiamos en el terminal:

  • Para levantar el cluster → confluent local services start
  • Para detener el cluster → confluent local services stop
  • Y para limpiar y eliminar temporales → confluent local destroy

Si ejecutamos el start, nos mostrará en consola algo como esto:

“Si vez problemas para iniciar porque tienes varios JDKs instalados, solo define el JAVA_HOME en tu archivo de ambiente, en mi caso .zshrc”

Ahora lconprobamos el buen funcionamiento de los anterior ingresando a la interfaz web del cluster en http://localhost:9021

Implementación de Código

Para poder usar las validaciones de Avro tenemos que tener una clase Java con la implementación de la especificación de Avro por cada modelo que vayamos a utilizar. Esto para Serializar y Deserealizar la data.

Como este es un proyecto de ejemplo haré todo junto, Producer y Consumer.

Primer paso, creamos el proyecto con el Spring Initialzr desde el IDE Intellij Idea. O también puede ser desde la web de Spring acá y descargar el comprimido para luego importarlo desde el IDE que profieran.

Las tres librerías que debemos seleccionar son:

  • Spring Web: para la creación del servicio rest
  • Spring for Apache Kafka: Para la interacción con el cluster
  • Lombok: para facilitarnos la vida con logs y POJOs.

También agregamos las librerías de Avro al pom.xml

Para que las librerias de Avro puedan hacer su magia (creación de las clases) debemos configurar el plugin de maven así:

Tomar en cuenta el lugar de donde se obtendrá el archivo del schema que usará en la construcción y la ruta relativa del propio schema (user.avsc).

Creamos el esquema de ejemplo en la carpeta de resources que definimos en el plugin en el paso anterior. Que en mi caso es user.avsc:

Este schema es muy simple, solo tiene la definición de dos fields: firstName y lastName. Donde los dos son String.

Siguiente, ejecutamos el plugin de maven para crear la clase equivalente al schema. Dando como resultado la nueva implementación de Avro:

ó mvn clean complile

Una vez que tenemos nuestra clase de Users generada con el pluging de maven. Vamos a contruir nuestra funte de mensajes, el Producer.

Kafka Producer

Para que el Producer funcione en nuestro ejemplo le pasamos el nombre del topico donde injectará los mensajes. Este se lo entregaré desde un archivo de propiedades (yaml).

El proyecto se crea con el archivo application.properties. A este le cambio la extensión a YAML. Que lo encuantro más legible.

Ahora que tenemos la capacidad de poder enviar mensajes al cluster de Kafka agregamos un consumidor de estos mensajes, el Consumer.

En el consumidor tenemos que indicar el mismo topico del Producer y además indicar el grupo al que pertenecerá el Consumer.

En este instante tenemos la entrada y salida de los mensajes. Vamos a agregar un endpoint para recibir a los Usuarios e injectarlos al cluster de Kafka mediante el Producer. Para luego rescatarlos desde el Consumer construido y verlos por la consula del IDE o bien por la página web del cluster de Kafka.

Para que todo esto funcione hay que agregar la configuración de la serialización y deserialización con el modelo creado.

Primero agregamos la configuración para Spring utilize un tipo de serialización custom en base a nuestro User (avro).

Segundo, agregamos la librería de serialización de avro que utilizaremos para manipular nuestra data. Esta librería la proporciona el artifactory de confluent.

Librería de Avro para serializar:

Repositorio de Confluent:

Pruebas

Ahora que tenemos un enpoint de tipo post. Haremos un envio de nuestro primer Usuario al cluster.

Primero es levantar el cluster de Kafka con el comando en nuestra consola:

confluent local services start

Segundo, levantamos nuestro proyecto Spring.

Y si tenemos las configuraciones de nuestro consumer y producer en la traza de inicio. Esto nos indicará que se pudo comunicar bien con el cluster de kafka.

Consumer config
Producer config

Para verificar que todo está bien vamos a la interfaz web del cluster y verificarmos si el clueste se configuro con el broker por default.

Y si está creado el topico que definimos en nuestra aplicación Spring. Y dentro de este el schema de user que proveerá el Schema Registry.

http://localhost:9021/clusters/KLu6bCD3RO2a1isjij0Q3A/management/topics/avro-topic/schema/value

Para la consulta Http utlizaremos Postman:

Y el resultado lo podemos ver por la consola del IDE:

También podemos ver el mensaje en la interfaz web del cluster.

http://localhost:9021/clusters/KLu6bCD3RO2a1isjij0Q3A/management/topics/avro-topic/message-viewer

Conclusión

Con esto tenemos un flujo sencillo de como enviar mensajes a un cluster de Kafka y rescatarlos.

Por ser un proyecto para demostrar las funcionalidades y conceptos básicos no lo quice complicar de más. Les dejo la URL del proyecto en GitHub acá.

Espero les sirva para tener un inicio más amigable con Kafka que es una muy buena herramienta.

--

--