Publique un mensaje usando Spring AMQP
En este tutorial rápido, veremos cómo podemos publicar y consumir mensajes de un bróker como RabbitMQ con Spring Boot y a su vez implementar ExecutorService que nos ayude a simplificar está tarea de manera asíncrona.
1. ¿Qué es RabbitMQ?
RabbitMQ es uno de los intermediarios de mensajes de código abierto más populares. Desde T-Mobile hasta Runtastic , RabbitMQ se utiliza en todo el mundo en pequeñas empresas emergentes y grandes empresas.
RabbitMQ es liviano y fácil de implementar en las instalaciones y en la nube. Admite múltiples protocolos de mensajería. RabbitMQ se puede implementar en configuraciones distribuidas y asociadas para cumplir con los requisitos de alta disponibilidad y gran escala.
RabbitMQ se ejecuta en muchos sistemas operativos y entornos de nube, y proporciona una amplia gama de herramientas de desarrollo para los lenguajes más populares.
2. ¿Cómo instalar RabbitMQ?
Para esto necesitamos ir al siguiente enlace: https://www.rabbitmq.com/download.html
Aquí elegiremos el instalador dependiendo del S.O que estemos utilizando. En nuestro caso será descargaremos el instalador para Windows.
Nota: Es posible que al momento de la instalación nos pida instalar Erlang, en caso de que así sea solo es descargar el instalador del siguiente enlace: https://www.erlang.org/downloads
Teniendo el instalador, solo es ejecutarlo y seguir los siguientes pasos.
Paso 1.
Paso 2.
Paso 3.
Paso 4.
Es necesario configurar la ruta donde se encuentran nuestros archivos .bat y agregarla a nuestras variables de entorno.
Una vez configurada nuestra variable de entorno, abriremos una terminal para ejecutar el siguiente comando:
“rabbitmq-plugins enable rabbitmq_management”
Si abrimos la siguiente ruta http://localhost:15672/ en nuestro navegador deberías de poder ver la siguiente pantalla.
User: guest
Pass: guest
Si todo va bien ya tenemos nuestro servidor de RabbitMQ instalado.
Necesitamos agregar una nueva Queue para poder usarla en nuestra aplicación, para iremos a la pestaña de nombre Queues y agregaremos el nombre que queramos. En este caso usare “test.queue” y solo es dar click en botón Add queue.
Teniendo nuestra Queue creada ahora si podemos seguir con la creación de nuestra aplicación.
El punto hasta ahora es: que con esto estaríamos enviando algo directamente a la cola, pero con RabbitMQ es recomendable enviarlo a un exchange y este exchange lo envía a la cola.
Tipos de intercambio:
Direct: entrega el mensaje a las colas que están conectadas a él, permitiendo el uso de claves de enrutamiento.
Fanout: este tipo se usa principalmente como transmisión, todas las partes interesadas recibirán los mensajes sin ningún filtro. La clave de enrutamiento no es compatible.
Topic: Parece una mezcla entre los dos primeros, permite enviar a varias colas y también permite el uso de claves de enrutamiento, pudiendo vincular mensajes según el valor enviado en la clave de enrutamiento.
Para crear este ejemplo, usaremos Spring Framework junto con la biblioteca AMQP
¿Qué es la clave de enrutamiento?
Clave de enrutamiento (Routing key): Es una clave enviada con el mensaje que utiliza el intercambio para decidir a dónde enrutar el mensaje.
Para crear nuestro Exchange solo es ir a la pestaña Exchanges, aquí podremos ver algunos por defecto que ya tiene Rabbit, para este ejemplo usaremos el de nombre amq.direct daremos click y nos llevara a la siguiente venta:
Ingresaremos los datos como se muestra a continuación:
En este caso para el campo To queue el nombre de nuestra Queue que creamos, para el campo Routing key pondremos el valor que nosotros creamos correspondiente. Una vez echo esto solo es dar click en el botón Bind.
Ahora ya contamos con los siguientes datos para poder empezar a crear nuestro proyecto.
- Host: localhost
- User: guest
- Pass: guest
- Queue: test.queue
- Exchange: amq.direct
- Routing key: routing.key.test.queue
3. Dependencias de Maven
4. Estructura del Proyecto
4. RabbitMqConfig
Indicaremos la configuración para establecer la conexión a la queue. En este caso, la queue es única y todos los mensajes publicados irán directamente a está por lo que usamos un routing key.
5. Producer y Consumer
Primeramente, tendremos un servicio productor el cual se encargara por medio de un ExecutorService para que de manera asíncrona se encargue de mandar los mensajes a la Queue.
Por consiguiente, tendremos otro servicio que se encargara de escuchar cuando haya un mensaje en la Queue. Este también implementara un ExecutorService para que sea de manera asíncrona.
¿Qué es Executor Service?
En términos generales, ExecutorService proporciona automáticamente un conjunto de subprocesos y un API para asignarles tareas.
¿Cómo crear un Executor Service?
La forma más fácil de crear ExecutorService es usar uno de los métodos de la clase Executors .
Por ejemplo, la siguiente línea de código creará un grupo de subprocesos con 10 subprocesos:
ExecutorService executor = Executors.newFixedThreadPool(10);
Este caso sería cuando sabemos de antemano cuántos procesos tenemos disponibles para ser usados por la máquina virtual de Java.
Pero sería mejor poder determinar el número de procesos disponibles para la máquina virtual Java mediante el método en tiempo de ejecución estático, availableProcessors . Con esto una vez que haya determinado la cantidad de procesadores disponibles, creara esa cantidad de subprocesos y por consecuencia divide su trabajo.
Ahora nuestro ejemplo quedaría de la siguiente manera:
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
6. Clase principal
En este caso para mandar a llamar el servicio productor implementaremos un CommanLinnerRunner con un ciclo for que se ejecute 15 veces.
Ejecutaremos nuestra aplicación y deberíamos ver los mensajes producidos y consumidos en consola.
Se pude observar que se mendaron15 mensajes, que posteriormente fueron consumidos. Solo que estos fueron consumidos de manera asíncrona lo cual se puede ver en el número thread utilizado en la parte izquierda. Podemos realizar una prueba con un número mayor a 15 para ver una perspectiva mejor del rendimiento y el uso de los threads.
7. RabbitMQ
Por último en la consola de RabbitMQ podemos ver el comportamiento de nuestra Queue.
En está de primera instancia podremos ver que tenemos 1 Consumidor que esta actualmente escuchando por si llega un nuevo mensaje a la Queue.
Si gustas ver el código fuente, este está disponible en Gitlab
Referencias
baeldung. (22 de 12 de 2021). Baeldung. Obtenido de https://www.baeldung.com/: https://www.baeldung.com/java-executor-service-tutorial
JasCav. (s.f.). QA Stack. Obtenido de https://qastack.mx/: https://qastack.mx/programming/1980832/how-to-scale-threads-according-to-cpu-cores
RabbitMQ. (19 de 01 de 2022). RabbitMQ. Obtenido de https://www.rabbitmq.com/: https://www.rabbitmq.com/