‘Hello, World’ for Spring-AMQP and RabbitMQ

RabbitMQ provides messaging capabilities for applications. It supports several operating systems and programming languages which makes it adaptable to a wide band of applications.

For Java applications, Spring already provides support for RabbitMQ via the Spring-AMQP package.

RabbitMQ Configuraiton

To start using RabbitMQ, the following needs to be installed:

After installation, RabbitMQ’s server must be started and configured. To start RabbitMQ server,

  1. Go to {RabbitMQ home directory}/sbin directory
  2. Run rabbitmq-server

RabbitMQ already provides a default username/password and a default virtual host. But you might want to organize your message queues by creating new users and virtual hosts. To do these, the following can be done:

  • To create a new user, run
    rabbitmqctl add_user {username} {password}
  • To create a virtual host, run
    rabbitmqctl add_vhost {vhost}
  • To give the new user permission to use the virtual host, run
    rabbitmqctl set_permissions -p {vhost} {username} ".*" ".*" ".*"

In addition, RabbitMQ provides a web-based management console. This has to be first enabled by running

rabbitmq-plugins enable rabbitmq_management

. The management web console can then be accessed via http://{server}:15672/ after restarting the RabbitMQ server.

“Hello, World!” Java Application

For our simple application, we will be producing messages of “Hello, {ordinal number}” and this should be received and echoed to the console by the consumer. The producer will be putting this message every second.

Spring AMQP configuration

To start of, Spring AMQP must be downloaded. Here’s the maven configuration for Spring AMQP for rabbit:

<dependency>
 <groupId>org.springframework.amqp</groupId>
 <artifactId>spring-rabbit</artifactId>
 <version>1.1.3.RELEASE</version>
</dependency>

Spring AMQP gives us abstractions via configuration (either XML configuration or @Bean configuration). For our simple application, we will be using the XML configuration.

The first step is to configure the connection factory, exchange and queue that will be used by the application.

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  xsi:schemaLocation="http://www.springframework.org/schema/rabbit
  http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

  <rabbit:connection-factory id="connectionFactory" host="localhost" virtual-host="sample" username="admin" password="qwerty" />

  <rabbit:admin connection-factory="connectionFactory" />

  <rabbit:queue name="testqueue" />

  <rabbit:direct-exchange name="test exchange"
    <rabbit:bindings>
      <rabbit:binding queue="testqueue"></rabbit:binding>
    </rabbit:bindings>
  </rabbit:direct-exchange>

   <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="testexchange" queue="testqueue" /> 
</beans>
  1. The connection factory is configured in line 7. The virtual-host, username and password can be specified here. If they are not supplied the default values of “/”, “guest” and “guest”, respectively, will be used.
  2. Line 9 gives this application admin rights for creating the queues and exchanges (if they’re not yet created).
  3. Line 11 declares a queue with the queue name “testqueue”. If a queue with the name “testqueue” does not exist, it will be created.
  4. Lines 13-17 declare the exchange that will be used by the producer. The bindings tell it to send the message to “testqueue”.
  5. Line 19 declares the template that will be used for sending and consuming messages. This template already has several convenience methods for sending and getting messages. The default connection factory, exchange and queue that will be used by the template can be configured here.

Producer Code

The producer can simply use the AMQP template to send messages.

package com.rabbit;

import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.scheduling.annotation.Scheduled;

public class Producer {

	@Autowired
	private AmqpTemplate messageQueue;
	
	private final AtomicInteger counter = new AtomicInteger();

	public static void main(String[] args) {
		new ClassPathXmlApplicationContext(
				"classpath:META-INF/spring/mq-producer-context.xml");
	}

	@Scheduled(fixedRate = 1000)
	public void execute() {
		System.out.println("execute...");
		messageQueue.convertAndSend("hello " + counter.incrementAndGet());
	}
}
  1. In line 25, the template’s convertAndSend is used to send messages to the queue.
  2. Spring’s scheduled task is used in line 22 to allow the execution of the execute() method every 1 second.

Here’s the complete XML configuration of the producer:

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/rabbit
		http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
		http://www.springframework.org/schema/beans
		http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
		http://www.springframework.org/schema/task
		http://www.springframework.org/schema/task/spring-task-3.0.xsd">

	<import resource="mq-context.xml" />

	<task:scheduler id="myScheduler" pool-size="10" />
	<task:annotation-driven scheduler="myScheduler" />

	<bean id="producer" class="com.rabbit.Producer"></bean>
</beans>

Consumer code

For the consumer, we will be using asynchronous messaging to avoid polling for messages from the queue. To use this, MessageListener must be implemented and a concrete implementation of onMessage(Message) method must be supplied.

package com.rabbit;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Consumer implements MessageListener {

	public static void main(String[] args) {
		new ClassPathXmlApplicationContext(
				"classpath:META-INF/spring/mq-consumer-context.xml");
	}

	public void onMessage(Message message) {
		System.out.println(message);
		try {
			Thread.sleep(1500);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

The listener must be registered to Spring AMQP via the XML configuration.

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/rabbit
		http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
		http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
		http://www.springframework.org/schema/beans
		http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
		http://www.springframework.org/schema/task
		http://www.springframework.org/schema/task/spring-task-3.0.xsd">
		
	<import resource="mq-context.xml"/>
	
	<rabbit:listener-container
		connection-factory="connectionFactory">
		<rabbit:listener ref="consumer" queue-names="testqueue" />
	</rabbit:listener-container>
	
	<context:annotation-config />
	<context:component-scan base-package="com.rabbit" />
	<aop:aspectj-autoproxy />

	<bean id="consumer" class="com.rabbit.Consumer"></bean>
</beans>

Lines 17-20 sets up the consumer class as the listener who will receive messages fromt “testqueue”.

Running both the consumer and producer above will send and receive several “hello…” messages.