Saturday, June 29, 2013

Easy Messaging with STOMP over WebSockets using ActiveMQ and HornetQ

Messaging is an extremely powerful tool for building distributed software systems of different levels. Typically, at least in Java ecosystem, the client (front-end) never interacts with message broker (or exchange) directly but does it by invoking server-side (back-end) services. Or client may not even be aware that there's messaging solution in place.

With Websockets gaining more and more adoption, and wide support of the text-oriented protocols like STOMP (used to communicate with message broker or exchange) are going to make a difference. Today's post will try to explain how simple it is to expose two very popular JMS implementations, Apache ActiveMQ and JBoss HornetQ, to be available to web front-end (JavaScript) using STOMP over Websockets.

Before digging into the code, one might argue that it's not a good idea to do that. So what's the purpose? The answer really depends:

  • you are developing prototype / proof of concept and need easy way to integrate publish/subscribe or peer-to-peer messaging
  • you don't want / need to build sophisticated architecture and the simplest solution which works is just enough
The scalability, fail-over and a lot of other very important decisions are not taken into consideration here but definitely should be if you are developing robust and resilient architecture.

So let's get started. As always, it's better to start with problem we're trying to solve: we would like to develop simple publish/subscribe solution where web client written in JavaScript will be able to send messages and listen for a specific topic. Whenever any message has been received, client just shows simple alert window. Please note that we need to use modern browser which supports Websockets, such as Google Chrome or Mozilla Firefox.

For both our examples client's code remains the same and so let's start with that. The great starting point is STOMP Over WebSocket article which introduces the stomp.js module and here is our index.html:




Extremely simple code but few details are worth to explain. First, we are looking for Websockets endpoint at ws://localhost:61614/stomp. It's sufficient for local deployment but better to replace localhost with real IP address or host name. Secondly, once connected, client subscribes to the topic (only interested in messages with priority: 9) and publishes the message to this topic immediately after. From client prospective, we are done.

Let's move on to message broker and our first one in list is Apache ActiveMQ. To make the example simple, we will embed Apache ActiveMQ broker into simple Spring application without using configuration XML files. As source code is available on GitHub, I will skip the POM file snippet and just show the code:

package com.example.messaging;

import java.util.Collections;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.hooks.SpringContextHook;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {
    @Bean( initMethod = "start", destroyMethod = "stop" )
    public BrokerService broker() throws Exception {
        final BrokerService broker = new BrokerService();    
        broker.addConnector( "ws://localhost:61614" ); 
        broker.setPersistent( false );
        broker.setShutdownHooks( Collections.< Runnable >singletonList( new SpringContextHook() ) );
  
        final ActiveMQTopic topic = new ActiveMQTopic( "jms.topic.test" );
        broker.setDestinations( new ActiveMQDestination[] { topic }  );
  
        final ManagementContext managementContext = new ManagementContext();
        managementContext.setCreateConnector( true );
        broker.setManagementContext( managementContext );
  
        return broker;
    }
}

As we can see, the ActiveMQ broker is configured with ws://localhost:61614 connector which assumes using STOMP protocol. Also, we are creating JMS topic with name jms.topic.test and enabling JMX management instrumentation. And to run it, simple Starter class:

package com.example.messaging;

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class Starter  {
    public static void main( String[] args ) {
        ApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class );
    }
}

Now, having it up and running, let's open index.html file in browser, we should see something like that:

Simple! For curious readers, ActiveMQ uses Jetty 7.6.7.v20120910 for Websockets support and won't work with latest Jetty distributions.

Moving on, with respect to HornetQ the implementations looks a bit different though not very complicated as well. As Starter class remains the same, the only change is the configuration:

package com.example.hornetq;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.JournalType;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.jms.server.config.JMSConfiguration;
import org.hornetq.jms.server.config.TopicConfiguration;
import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
import org.hornetq.jms.server.embedded.EmbeddedJMS;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {
    @Bean( initMethod = "start", destroyMethod = "stop" )
    public EmbeddedJMS broker() throws Exception {
        final ConfigurationImpl configuration = new ConfigurationImpl();
        configuration.setPersistenceEnabled( false );
        configuration.setJournalType( JournalType.NIO );
        configuration.setJMXManagementEnabled( true );
        configuration.setSecurityEnabled( false );
  
        final Map< String, Object > params = new HashMap<>();
        params.put( TransportConstants.HOST_PROP_NAME, "localhost" );
        params.put( TransportConstants.PROTOCOL_PROP_NAME, "stomp_ws" );
        params.put( TransportConstants.PORT_PROP_NAME, "61614" );
  
        final TransportConfiguration stomp = new TransportConfiguration( NettyAcceptorFactory.class.getName(), params );
        configuration.getAcceptorConfigurations().add( stomp );
        configuration.getConnectorConfigurations().put( "stomp_ws", stomp );
  
        final ConnectionFactoryConfiguration cfConfig = new ConnectionFactoryConfigurationImpl( "cf", true, "/cf" );
        cfConfig.setConnectorNames( Collections.singletonList( "stomp_ws" ) );
  
        final JMSConfiguration jmsConfig = new JMSConfigurationImpl();
        jmsConfig.getConnectionFactoryConfigurations().add( cfConfig );
  
        final TopicConfiguration topicConfig = new TopicConfigurationImpl( "test", "/topic/test" );
        jmsConfig.getTopicConfigurations().add( topicConfig );
  
        final EmbeddedJMS jmsServer = new EmbeddedJMS();
        jmsServer.setConfiguration( configuration );
        jmsServer.setJmsConfiguration( jmsConfig );
  
        return jmsServer;
    }
}

The complete source code is on GitHub. After running Starter class and openning index.html in browser, we should see very similar results:

HornetQ configuration looks a bit more verbose, however there are no additional dependencies involved except brilliant Netty framework.

For my own curiosity, I replaced the ActiveMQ broker with Apollo implementation. Though I succeeded with making it works as expected, I found the API to be very cumbersome, at least in current version 1.6, so I haven't covered it in this post.

All sources are available on GitHub: Apache ActiveMQ example and JBoss HornetQ example

8 comments:

christian posta said...

Great write up.

Can you tell a little more about your config with Apollo? Let me know what you found to be cumbersome and I can help you out. Or change it to be more friendly :)

Cheers!

Andriy Redko said...

Hi Christian,

Thank you very much for the feedback (as your blog follower, I really appreciate that). Yes, I found Apollo API to be cumbersome and verbose. I don't mean it's bad but I think it's not expressive and intuitive enough. The example I've taken as a foundation is this one: https://github.com/apache/activemq-apollo/blob/trunk/apollo-distro/src/main/release/examples/java-embedded-broker/src/main/java/example/EmbeddedBroker.java

Though everything worked out fine, I think Apollo API could be better. I understand there are a reasons for that and I would love to make my contributions.

Thank you very much.

Best Regards,
Andriy Redko

jmesnil said...

Hi,

Great introduction article.

There is only a minor issue in it.

When you call client.subscribe(destination, callback, { priority: 9}); this does *not* mean that the callback will be called only for messages with priority = 9.
The last argument is the headers sent by the SUBSCRIBE frame and will not have any effect.

What you want is to pass a selector[1] header like this:

client.subscribe(destination, callback, { selector: "priority=9" });

Please note that this is not specified in the STOMP protocol and may or may not be supported by the messaging brokers (both STOMP and ActiveMQ does support it).

Andriy Redko said...

Hi Jeff,

Thank you very much for such a great clarification. That's definitely something I should have payed more attention to.

Thank you!

Best Regards,
Andriy Redko

Zaw Min Tun said...

Thanks for the great post.
Is it possible to get the topic starting from the very beginning every time I connect to the topic?

Andriy Redko said...

Hi Zaw,

Thanks a lot for the comments. As far as I know it is possible to do that in STOMP with durable subscriptions. I haven't done it personally but you may look at https://activemq.apache.org/how-do-durable-queues-and-topics-work.html and http://activemq.apache.org/apollo/documentation/stomp-manual.html#Topic_Durable_Subscriptions for more details.

Thanks.

Best Regards,
Andriy Redko

Francis Farmer said...

Hi Andriy,
Great post. I'm trying to create something similar (stomp.js as the client, angularjs, activemq as broker), but with wss (or stomp over ssl because i'm using the messages via https). I didn't use Spring to configure wss, but activemq has pretty straight foreward directions in the conf of their xml files. However, I don't think stomp.js is set up to handle wss. Have you had any experience using a secure websocket with stomp.js?

Andriy Redko said...

Hi Francis,

Thank you very much for your comment. Yes, you are right, it seems like stomp.js does not support secured websockets (wss). One of the options I know about is using Spring + SockJS but because you do not use Spring, it might not work for you. Would be nice to see wss:// protocol support but it seems like the stomp.js project is not actively developed anymore.

Thank you.

Best Regards,
Andriy Redko