A Sample Usage of Java’s Future and Spring’s @Async

Yesterday, a colleague asked about Java Threading. We have this sending-of-files-via-FTP requirement that is already working. Our users wanted us to add functionality that will allow them to do the sending asynchronously. The initial plan was to put the sending of the files in another Thread manually. This means that we have to create a Runnable implementation and manage the Threads ourselves.

My initial thought when my colleague asked about this is that this is something that JavaScript’s Promises can solve very easily. And then it hit me that I read something about this in Java 7. In Java 7, this is called Future. Futures allow the result of an asynchronous operation to be retrieved at a later time. Vanilla Java has implementations, such as FutureTask, that allow the execution of the task to be run in a separate thread. Spring made this easier and cleaner by making it declarable via the @Async annotation.

Here’s a very simple code that uses Java’s Future and Spring’s @Async feature:

After downloading the required Spring dependencies (it’s in spring-context for Maven users), we have to enable Spring’s asynchronous tasks by using the @EnableAsync annotation.

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;

@Configuration
@EnableAsync
public class FtpAsyncConfig {
}

We can then use the @Async annotation to tell Spring to execute the method asynchronously when called.

import java.io.File;
import java.util.concurrent.Future;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;

@Component
public class DefaultFtpApp implements FtpApp {
	@Async
	public Future<Boolean> sendAsync(final File file, final String host)
			throws InterruptedException {
		boolean result = false;

		//call the long running task here
		// this is in place of the actual sending of the file via FTP
		Thread.sleep(1000);

		// say sending succeeds and returns true
		result = true;

		System.out.println("Done running...");

		// Wrap the result in an AsyncResult
		return new AsyncResult<>(result);
	}
}

The other things to note above is the return type and the return value. Spring’s asynchronous task allows a void return value but since we want the result of the operation to be queried by the users, we’re returning a Future object. The return type must be specified inside the angle brackets. The actual return value is wrapped in an implementation of the Future interface, the AsyncResult class. The actual return value must be passed into its constructor.

We can now test this.

import java.io.File;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.support.AnnotationConfigContextLoader;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(loader = AnnotationConfigContextLoader.class, classes = {
		FtpAsyncConfig.class, DefaultFtpApp.class })
public class FtpAppTest {

	@Autowired
	private FtpApp ftp;

	@Test
	public void test() throws InterruptedException, ExecutionException {

		System.out.println("About to run...");

		// Call to sendAsync
		// this will execute in another thread. The sysout below this line will
		// execute before the sysout within sendAsync
		Future<Boolean> ftpResult = ftp.sendAsync(new File("somefile"),
				"somehost");

		System.out.println("This will run immediately.");

		// Using get without a timeout will wait for the async task to finish.
		// If you want to wait for only a certain time for the result, you may
		// use the get(long timeout, TimeUnit unit) instead. This flavor of
		// get() will throw a TimeoutException if the result is not yet given
		// after the specified amount of time.
		Boolean result = ftpResult.get();

		System.out.println("And the result of get() is " + result);

		Assert.assertTrue(result);

	}
}

The get() method can be used on the Future interface to retrieve the result of the asynchronous operation. This method also has another flavor where a timeout can be specified.

‘Hello, World’ for Spring-AMQP and RabbitMQ

RabbitMQ provides messaging capabilities for applications. It supports several operating systems and programming languages which makes it adaptable to a wide band of applications.

For Java applications, Spring already provides support for RabbitMQ via the Spring-AMQP package.

RabbitMQ Configuraiton

To start using RabbitMQ, the following needs to be installed:

After installation, RabbitMQ’s server must be started and configured. To start RabbitMQ server,

  1. Go to {RabbitMQ home directory}/sbin directory
  2. Run rabbitmq-server

RabbitMQ already provides a default username/password and a default virtual host. But you might want to organize your message queues by creating new users and virtual hosts. To do these, the following can be done:

  • To create a new user, run
    rabbitmqctl add_user {username} {password}
  • To create a virtual host, run
    rabbitmqctl add_vhost {vhost}
  • To give the new user permission to use the virtual host, run
    rabbitmqctl set_permissions -p {vhost} {username} ".*" ".*" ".*"

In addition, RabbitMQ provides a web-based management console. This has to be first enabled by running

rabbitmq-plugins enable rabbitmq_management

. The management web console can then be accessed via http://{server}:15672/ after restarting the RabbitMQ server.

“Hello, World!” Java Application

For our simple application, we will be producing messages of “Hello, {ordinal number}” and this should be received and echoed to the console by the consumer. The producer will be putting this message every second.

Spring AMQP configuration

To start of, Spring AMQP must be downloaded. Here’s the maven configuration for Spring AMQP for rabbit:

<dependency>
 <groupId>org.springframework.amqp</groupId>
 <artifactId>spring-rabbit</artifactId>
 <version>1.1.3.RELEASE</version>
</dependency>

Spring AMQP gives us abstractions via configuration (either XML configuration or @Bean configuration). For our simple application, we will be using the XML configuration.

The first step is to configure the connection factory, exchange and queue that will be used by the application.

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  xsi:schemaLocation="http://www.springframework.org/schema/rabbit
  http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

  <rabbit:connection-factory id="connectionFactory" host="localhost" virtual-host="sample" username="admin" password="qwerty" />

  <rabbit:admin connection-factory="connectionFactory" />

  <rabbit:queue name="testqueue" />

  <rabbit:direct-exchange name="test exchange"
    <rabbit:bindings>
      <rabbit:binding queue="testqueue"></rabbit:binding>
    </rabbit:bindings>
  </rabbit:direct-exchange>

   <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="testexchange" queue="testqueue" /> 
</beans>
  1. The connection factory is configured in line 7. The virtual-host, username and password can be specified here. If they are not supplied the default values of “/”, “guest” and “guest”, respectively, will be used.
  2. Line 9 gives this application admin rights for creating the queues and exchanges (if they’re not yet created).
  3. Line 11 declares a queue with the queue name “testqueue”. If a queue with the name “testqueue” does not exist, it will be created.
  4. Lines 13-17 declare the exchange that will be used by the producer. The bindings tell it to send the message to “testqueue”.
  5. Line 19 declares the template that will be used for sending and consuming messages. This template already has several convenience methods for sending and getting messages. The default connection factory, exchange and queue that will be used by the template can be configured here.

Producer Code

The producer can simply use the AMQP template to send messages.

package com.rabbit;

import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.scheduling.annotation.Scheduled;

public class Producer {

	@Autowired
	private AmqpTemplate messageQueue;
	
	private final AtomicInteger counter = new AtomicInteger();

	public static void main(String[] args) {
		new ClassPathXmlApplicationContext(
				"classpath:META-INF/spring/mq-producer-context.xml");
	}

	@Scheduled(fixedRate = 1000)
	public void execute() {
		System.out.println("execute...");
		messageQueue.convertAndSend("hello " + counter.incrementAndGet());
	}
}
  1. In line 25, the template’s convertAndSend is used to send messages to the queue.
  2. Spring’s scheduled task is used in line 22 to allow the execution of the execute() method every 1 second.

Here’s the complete XML configuration of the producer:

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/rabbit
		http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
		http://www.springframework.org/schema/beans
		http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
		http://www.springframework.org/schema/task
		http://www.springframework.org/schema/task/spring-task-3.0.xsd">

	<import resource="mq-context.xml" />

	<task:scheduler id="myScheduler" pool-size="10" />
	<task:annotation-driven scheduler="myScheduler" />

	<bean id="producer" class="com.rabbit.Producer"></bean>
</beans>

Consumer code

For the consumer, we will be using asynchronous messaging to avoid polling for messages from the queue. To use this, MessageListener must be implemented and a concrete implementation of onMessage(Message) method must be supplied.

package com.rabbit;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Consumer implements MessageListener {

	public static void main(String[] args) {
		new ClassPathXmlApplicationContext(
				"classpath:META-INF/spring/mq-consumer-context.xml");
	}

	public void onMessage(Message message) {
		System.out.println(message);
		try {
			Thread.sleep(1500);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

The listener must be registered to Spring AMQP via the XML configuration.

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/rabbit
		http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
		http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
		http://www.springframework.org/schema/beans
		http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
		http://www.springframework.org/schema/task
		http://www.springframework.org/schema/task/spring-task-3.0.xsd">
		
	<import resource="mq-context.xml"/>
	
	<rabbit:listener-container
		connection-factory="connectionFactory">
		<rabbit:listener ref="consumer" queue-names="testqueue" />
	</rabbit:listener-container>
	
	<context:annotation-config />
	<context:component-scan base-package="com.rabbit" />
	<aop:aspectj-autoproxy />

	<bean id="consumer" class="com.rabbit.Consumer"></bean>
</beans>

Lines 17-20 sets up the consumer class as the listener who will receive messages fromt “testqueue”.

Running both the consumer and producer above will send and receive several “hello…” messages.

Annotation-based HTML to Object Mapper using JSoup Parser

I’ve recently worked on a project that requires crawling and retrieval of information from a website. After looking for open source Java HTML parsers, we found JSoup. JSoup is a library that provides JQuery-like selectors for extracting data from an HTML source.

JSoup is awesome but it also left us with a lot of boilerplate codes for parsing different HTML pages. To avoid verbose code, I tried playing around with annotations. The idea is to use annotations to map an HTML source to a Java object (sort of like JAXB). The basic code of what I came up with is discussed in this blog post. (Please do note that I used Spring and there may be some Spring APIs in the code.)

For the implementation, the annotations’ targets are the setters of the Java object’s fields.

The first annotation is the @Selector. This will store the CSS selector for retrieving the element that contains the value that will be set using the annotated setter. The value parameter should contain the CSS selector of the HTML element.

@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
public @interface Selector {
    String value();
}

@Selector will need either of the following annotations to determine how the value will be extracted from the selected element:

  • @TextValue – retrieve the text within the element (remove all HTML tags within the element)
  • @HtmlValue – retrieve the HTML within the element
  • @AttributeValue – retrieve the value from an attribute in the element. The name of the attribute can be specified in the name parameter.
@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
public @interface TextValue {
}

@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
public @interface HtmlValue {
}

@Target({ ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
public @interface AttributeValue {
String name();
}

The HTML parser just needs to read annotations from a Java bean’s methods and retrieve the different annotations above. When a @Selector is present in a method, the value of the @Selector will be used to retrieve the element. @TextValue, @HtmlValue or @AttributeValue will then be used to get the data from the element.

import java.io.InputStream;
import java.lang.reflect.Method;

import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import org.springframework.core.convert.ConversionService
import org.springframework.core.convert.support.DefaultConversionService;
import com.google.common.base.Preconditions;

public class JSoupHtmlParser<T> implements HtmlParser<T> {

    // host of the website that will be crawled
    private final static String HOST = "localhost:8080/sample";

    private final Class<T> classModel;

    // Pass in the class Java bean that will contain the mapped data from the HTML source
    public JSoupHtmlParser( final Class<T> classModel) {
        this.classModel = classModel;
    }

    // Main method that will translate HTML to object
    public T parse( final InputStream is) throws HtmlParserException {
        try {
            final Document doc = Jsoup.parse(is, "UTF-8", HOST );
            T model = this.classModel.newInstance();

            for (Method m : this.classModel.getMethods()) {
                String value = null;
                // check if Selector annotation is present in any of the methods
                if (m.isAnnotationPresent(Selector .class)) {
                    value = parseValue(doc, m);
                }

                if (value != null) {
                    m.invoke( model , convertValue(value, m));
                }
            }

            return model ;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // Use Spring's ConversionService to convert the selected value from String to the type of the parameter in the setter method
    private static final ConversionService conversion = new DefaultConversionService();

    private Object convertValue( final String value, final Method m) {
        Preconditions. checkArgument(m.getParameterTypes().length > 0);

        // Only set the first parameter
        return conversion .convert(value, m.getParameterTypes()[0]);
    }

    private String parseValue( final Document doc, final Method m) {
        final String selector = m.getAnnotation(Selector .class).value();

        final Elements elems = doc.select(selector);

        if (elems.size() > 0) {
            // no support for multiple selected elements yet. Just get the first element.
            final Element elem = elems.get(0);

            // Check which value annotation is present and retrieve data depending on the type of annotation
            if (m.isAnnotationPresent(TextValue .class)) {
                return elem.text();
            } else if (m.isAnnotationPresent(HtmlValue.class)) {
                return elem.html();
            } else if (m.isAnnotationPresent(AttributeValue. class)) {
                return elem.attr(m.getAnnotation(AttributeValue .class).name());
            }
        }

        return null ;
    }
}

Spring FactoryBean

While working on a Spring application, I noticed that there was a type discrepancy between the bean declaration in the application context and the class type of the to-be-injected property in Java.

The velocityEngine bean declared in the application context was of type VelocityEngineFactoryBean.

    <bean id="velocityEngine" 
        class="org.springframework.ui.velocity.VelocityEngineFactoryBean">
        <property name="resourceLoaderPath" value="/email_templates/"/>
    </bean>

While setVelocityEngine() of the Java class accepts an object of type VelocityEngine.

import org.apache.velocity.app.VelocityEngine;

@Component
public class Emailer {

    private VelocityEngine velocityEngine;

    @Autowired
    public void setVelocityEngine(VelocityEngine velocityEngine) {
        this.velocityEngine = velocityEngine;
    }
}

As it turns out, the FactoryBean is Spring’s way of instantiating more complex objects. To properly create a FactoryBean, a class must implement org.springframework.beans.factory.FactoryBean<T>. There are 3 methods of note in this interface:

  • T getObject() should return an instance of the object. For the case of the example above, this is an instance of VelocityEngine
  • Class<?> getObjectType() should return the type of the object that will be created by the FactoryBean. This can be null if the type is not known in advance.
  • boolean isSingleton() should return true if there should only be one copy of the object. This allows Spring to cache the instance of the created object.