Advanced Message Queuing Protocol (AMQP)

Since AMQP is gaining more a more popularity these days I decided to dive in to this new protocol and share with you my knowledge. The first questions that appeared in my head when I started learning AMQP were what’s the difference between AMQP vs JMS and why would I use AMQP if we have JMS?

amqp

First of all, the main difference is that JMS is an API and AMQP is protocol. JMS describes how to send message whereas AMQP describes how message should be constructed. In other words, with JMS you have possibility easily to replace your broker with another broker without or almost without changes in your code. But, due to the obvious reasons JMS is not the case when you need to communicate with different clients written on Objective-C, PHP or Python. This is where AMQP becomes handy because it was designed for interoperability between different vendors and platforms.

Messaging concept

The AMQP world on high level is composed of producers, consumers, brokers, exchanges, bindings and queues.

Screen Shot 2016-06-13 at 17.32.40

Producer publishes messages to exchanges. Exchanges then distribute messages among queues using bindings (rules) and at the end the broker deliver message to the corresponding consumer(s).

Producer/Consumer

Producers and consumers are essentially just clients that can be written on many known languages like Python, C#, Java etc.

Consumers may receive a message automatically or on demand.

Broker

I personally used and prefer RabbitMQ which is well documented and has large number of clients and developer tools covering a variety of the platforms and languages. However, I found there several minor imperfections like some classes doesn’t implement AutoCloseable interface (version: 3.6.2).

Also, you can have a look on other implementations like SwiftMQ or StormMQ.

Exchange

Exchanges receive messages from producers and then route the messages to the queues that are bound to exchanges using one of these routing algorithms:

  • Direct – the routing algorithm behind a direct exchange is simple – a message goes to the queues whose binding key exactly matches the routing key of the message.
  • Fan-out – no routing key needed , each queue or exchange bound to this type of exchange will get a copy of the message.
  • Header –  in this algorithm each massage has a combination of key-value pairs namely header that transmitted along with a message. In other words, a header is used to filter a messages. Header exchanges ignore routing key.
  • Topic – combines direct and fan-out algorithms. The messages are routed to the queues  by pattern matching. For instance, the messages with keys ‘news.sport’ and ‘news.auto’ will be routed to the queue ‘news.*’.
Binding

Bindings define the routing rules and relationship between an exchange and message queues. In order to make routing rules a queue has to be bound to exchanger. Every binding may have an optional key that is used to filter messages in exchange.

Depending on attributes a message will be dropped or returned back in case if the message is not routed to any queue.

Queue

The purpose of the queues is to store messages that are consumed by applications. The queues can durable, exclusive (meaning it will be deleted when connection is closed), auto-deletable (the queue will be deleted when all consumers are unsubscribed) and can have a names.

In order to use a queue it needs to be declared. You will get an exception if you are declaring a queue that already exists and properties of the queue are not the same.

Simple producer/consumer example

Producer:
package com.alexvolov.amqp;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class AmqpProducer {

    private final static String QUEUE_NAME = "com.alexvolov.amqp.queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();;
        Channel channel = connection.createChannel();

        factory.setHost("localhost");

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String message = "Hello!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));

        System.out.println("Sent: '" + message + "'");

        channel.close();
        connection.close();
    }

}
Consumer:
package com.alexvolov.amqp;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class AmqpConsumer {

    private final static String QUEUE_NAME = "com.alexvolov.amqp.queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received: '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

Leave a Reply