A comprehensive guide to Vert.x Core Eventbus

Hi! Vert.x Core library has a component called EventBus that allows different parts of your application to communicate with each other. It is done irrespective of what language they are written in (Java, Kotlin, Scala, Groovy etc), and whether they’re in the same Vert.x instance, or in a different Vert.x instance. Moreover it can be extend with bridges, like TCP bridge or SockJS to communicate with programs outside of Vert.x ecosystem.

In this post I would like to dive deeply in this topic. We will observe the pattern of eventbus itself, check its components and then move to concerete Vertx implementation. We will explore various messaging options and anatomy of Message object.

Concept of event bus

Before we start to explore Vert.x EventBus, we need to assure that this not a trademark of Vertx. Eventbus is a design pattern, that is often used in reactive applications in order to implement messaging between components. In this section we would discover this pattern and see how Vert.x deals with it.

Definition

From technical point of view, Event bus is a design pattern that enables messages to be delivered between components without requiring the components to register itself to others. We can say that Event bus is a special case of publish-subscriber design pattern. Take a look on the graph below that represents the structure of typical event bus:

Graph 1. An Eventbus pattern

We can observe following elements: publisher, bus itself, message and subscriber. All of them are up to vary degree exist in Vert.x. In the framework, Eventbus messaging is implemented inside Vertx Core library, so you don’t need any other dependencies to enable communication. Eventbus requires you to access Vertx instance, so you need to pass its reference to the component, but you Vertx does not force you to use it only with Vert.x elements, like verticles. Let see which components we can use.

Publisher

First element is publisher. It is responsible for publishing a message to the message queue. Vertx Eventbus offers us 3 ways to send messages: publish(), send() and request(), which we will explore in details later in the post. Vert.x does publishing with MessageProducer object or directly with eventbus. Take a look on a code snippet:

// 1. Use Eventbus directly
JsonObject payload = new JsonObject().put("id", id).put("name", name);
eventbus.publish("address", payload);

// 2. Obtain message producer
MessageProducer messageProducer = eventbus("address");
messasgeProducer.write(payload).close();

It is a developer’s choice which approach to use, as Vert.x offers several ways to send messages.

Bus

In the framework, bus is a light-weight distributed messaging system which allows different parts of applications, or different applications and services to communicate with each in a loosely coupled way. It is represented as EventBus instance. There is a single event bus instance for every Vert.x instance and you can obtain in a following way:

EventBus eventBus = vertx.eventBus();

An interesting thing about Vert.x Eventbus, that is allows you to use bridges, that extend its communication possibilities. For example, TCP EventBus bridge that is built on top of TCP, meaning that any application that can create TCP sockets can interact with a remote Vert.x instance using its event bus. Another way to communicate with Eventbus outside of Java application is an usage of SockJS. In this approach you need to register SockJS handler, that enables your application to communicate with JS applications.

Technically, in order to use Eventbus messaging you need to perform following steps:

  1. Register handlers for sender and receiver
  2. Send message from sender
  3. Receive message in receiver
  4. Unregister handlers (Optional)

Take a look on a code snippet below, that represents these steps:

final String address = "my.address";

// Register receiver
MessageConsumer<JsonObject> consumer = eventBus.consumer(address);
consumer.handler(res->{
	// do something with message
	JsonObject payload = res.body();
	System.out.println(payload.encode());
})

// Register sender
eventBus.send(address, new JsonObject.put("id", id).put("name", name));

NB that by default EventBus transports data in a form of Object. You need to register a custom codec if you want to transport your own POJOs on EventBus or specify it in MessageConsumer’s type.

Message

From technical point of view, messages represent data that we transfer by EventBus. Vert.x understands it as a message that is received from the event bus in a handler. It is represented by Message class and is more than just data. View it as a container. We will later describe an anatomy of message in details.

Subscriber

Subscriber (or consumer) is an element that is responsible to process the message and it has a knowledge on how to act for that message. In Vert.x subscribers are called consumers, and as we learned before they are represented by MessageConsumer class. Each of consumer has an address, that can be any string (although it is highly recommended to establish some kind of convention in a project).

Sending messages

Vertx EventBus has three ways to send messages. In this section we will explore them in details.

publish()

First way to send messages is publish(). This method implements publish-subsribe pattern. Take a look on a graph below:

Graph 2. Publish-subscribe messaging

Publish-subscribe pattern is also called a broadcasting, because the sender broadcasts the message to an each of multiple receivers. in publish-subscribe channel, we deliver the message to all receivers that have the specific broadcast address. To publish messages with Vertx EventBus you need to use following code:

String address = "address";
JsonObject payload = new JsonObject();
// add data

eventBus.publish(address, payload);

This will deliver messages to all consumers with this address. Other model is sending message to a specific consumer.

send()

This approach represents point-to-point messaging pattern. In other words, this method ensures that only one consumer receives the message. In case, we have multiple consumers with same address, only one can receive the message. Check a graph below, that represents this pattern:

Graph 3. Point-to-point messaging

If we move into practice, so how Vert.x assures that only one consumer receives the message? Well, if there is more than one handler is registered for the address, one will be chosen using a non-strict round-robin algorithm. To use this method we need to utilize send():

eventBus.send(address, payload);

It also has an overloaded version, that permits us to pass DeliveryOptions to specify delivery timeout, put headers or define a custom codec:

DeliveryOptions options = new DeliveryOptions();
options.setSendTimeout(50000);
eventBus.send(address, payload, options);

However, this method only sends a message. In case you want to process a reply, you need to use the next request() method. NB In the past, send() method had a reply handler, but now it is deprecated. Use request() method instead if you need to process an answer from consumer.

request()

Graph 4. Request-reply messaging

This method implements request-reply pattern, so is used not only to send a message, but also to process a reply from a consumer. It has a reply handler, that will be called if the recipient subsequently replies to the message. Take a look on a code snippet below:

eventBus.request(address, payload, reply->{
	if (reply.succeded()){
		JsonObject data = JsonObject.mapFrom(reply.body());
		System.out.println(data.encode());
	}
});

Reply handler receives an answer as a Message object, so in order to access a payload, use body() method. Also, note as with other sending methods, request() has an overloaded version that permits you to specify delivery options:

DeliveryOptions options = new DeliveryOptions();
// set options

eventBus.request(address, payload, options, reply->{
	// ...
});

To sum up, these three methods are used in order to send messages with EventBus. Compare to it, there is only one way to receive messages, that works for all three sending methods.

Consuming messages

Consuming messages in Vert.x requires only one method consumer() that is used to register a consumer handler on specified address. Take a look on a code snippet below:

final String address = "my.address";

//..

eventBus.consumer(address, res->{
	JsonObject payload = JsonObject.mapFrom(res.body());
	System.out.println(payload.encode());
});

From techical point of view, MessageConsumer is responsible for receiving messages. You need to register a consumer with eventbus with specific address and set a handler that provide you a message object. NB that Message object in Vertx is more like a container, that have more than actual payload – we observe it in the next section. You also can provide a handler for reply, for example to assert that reply was delivered successfully:

eventbus.consumer(address, res->{
	// receive a message
	JsonObject payload = JsonObject.mapFrom(res.body());
	JsonObject reply = new JsonObject().put("result", "ok");
	res.replyAndRequest(reply, res2->{
		if (res2.succeded()){
			// reply was received...
		} else {
			// reply failed
		}
	});
});

An anatomy of message

The Message<T> interface represents a message that is received from the event bus in a handler. It is true to say, that it is more than a message: this object represents a container for messages, that is transported via eventbus. Technically it has other properties. In this section we will explore it in details.

Address

The first property comes in our explanation is address() method. It returns String value with the address, where the message was sent to. As an example, take a look on the code snippet below:

eventbus.send("my.address", payload);

eventbus.consumer("my.address", res->{
	String address = res.address();
	System.out.println(address);
	// prints "my.address"
});

Headers

You can specify headers for message. Headers are stored inside MultiMap, as key-value pair with both String object for key and value. Headers can be empty.

We add headers to message using DeliveryOptions object, that we pass to sending methods. We can add headers using addHeader() method:

DeliveryOptions options = new DeliveryOptions();
options.addHeader("key", "value");
eventbus.send("address", payload, options);

Body

This is an actual body of message, or in other words that payload that we send. By default, Vertx uses Object payload, although it possible to specify a custom message codec to send your POJOs via Eventbus. In my own experience, I would like to suggest you to avoid custom codecs. I advise you to use plain JSON objects as payload, this would make your architecture less coupled and eliminate useless dependencies.

However, if you are required to use custom message objects, you can implement it as following:

  1. Implement a MessageCodec interface for your custom objects
  2. Register a new message codec with eventbus using eventbus.registerCodec(codec);
  3. Specify a codec in delivery options: deliveryOptions.setCodecName(myCodec.name());

After these steps you can send your objects without conversion to JsonObject:

MyClassMessageCodec myClassMessageCodec;

eventbus.registerCodec(myClassMessageCodec);

DeliveryOptions options = new DeliveryOptions();
options.setCodecName(myClassMessageCodec.name());

MyClass myClass = new MyClass;
eventbus.send("address", myClass, options);

NB Usually the eventbus only allows a certain set of message types to be sent across the event bus, including primitive types, boxed primitive types, byte[], JsonObject, JsonArray, Buffer.

Reply

The final component of messages is reply. If the message was sent specifying a reply handler, that handler will be called when it has received a reply. If the message wasn’t sent specifying a receipt handler this method does nothing. This method sets an object to send back to sender. You can also specify DeliveryOptions in its overloaded version:

message.reply(payload);
message.reply(payload, options);

Conclusion

In this post we discussed a topic of Vert.x Eventbus. It is a part of Vertx Core library that allows different parts of your application to communicate with each other. It is done irrespective of what language they are written in. We explored the pattern itself, checked it components, talked about different messaging options and finally discussed an anatomy of Message, that is more than actual message.

Hope that this post was useful for you. As usual, if you have any questions, don’t hesitate to contact me or leave a comment below. Have a nice day!

References

  • Nicolais Frankel My case for Event Bus (2015) read here
  • Saad Hassan Messaging Pattern For High Availability and Handling Failures in Microservices (2019) Hackernoon, read here
  • Tim Wen Typical EventBus Design Patterns (2014) read here

Leave a Reply

Your email address will not be published. Required fields are marked *