Monday, May 15, 2023

Asynchronous Messaging In Java? JMS, Kafka, and RabbitMQ Examples Tutorials

Hello fellow Java developers? Are you tired of those pesky synchronous messages clogging up your code like old grease in a drainpipe? Fear not, because today we're talking about asynchronous messaging in Java – the ultimate solution for decoupling your processes and improving the performance of your applications.  But before we dive into the juicy details, let's start with the basics. What is Asynchronous Messaging? Asynchronous messaging is a method of communication between two or more processes like Microservices, where the sender sends a message and doesn't wait for a response before continuing with its own tasks. This is in contrast to synchronous messaging, where the sender must wait for a response before continuing.

Asynchronous messaging allows for better scalability and performance, as it allows processes to operate independently without being blocked by each other. It's especially useful in applications with high levels of concurrency, where multiple processes may need to communicate with each other simultaneously.


This article is in continuation of my series on essential Microservices Design patterns where I have explained SAGACQRS, and Event Sourcing Pattern for Java developers. If you have not read them you can also check check them out to learn more about Microservices architecture. 

Asynchronous Messaging In Java? JMS, Kafka, and RabbitMQ Examples Tutorials



Asynchronous Messaging In Java with Examples 

There are several ways to implement asynchronous messaging in Java, including using the Java Message Service (JMS), messaging libraries like Apache Kafka or RabbitMQ, and the new CompletableFuture API introduced in Java 8.


1. Java Message Service (JMS)

The Java Message Service (JMS) is a Java API that allows applications to create, send, receive, and read messages. It supports both synchronous and asynchronous messaging, but we're interested in the latter, of course.

To use JMS for asynchronous messaging in Java, you'll need to set up a JMS provider (e.g. Apache ActiveMQ) and create a connection factory and destination (e.g. queue or topic). Then, you can use the JMS API to send and receive messages asynchronously.

Here's an example of how to send a message asynchronously using JMS:

ConnectionFactory connectionFactory 
= new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("myQueue");

MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("Hello, world!");

producer.send(message);

connection.close();


And here's an example of how to receive a message asynchronously using JMS:

ConnectionFactory connectionFactory 
 = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("myQueue");

MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {

    @Override

    public void onMessage(Message message) {

        if (message instanceof TextMessage) {

            TextMessage textMessage = (TextMessage) message;

            try {

                System.out.println("Received message: " + textMessage.getText());

            } catch (JMSException e) {

                e.printStackTrace();

            }

        }

    }

});


connection.close


2. Apache Kafka

Apache Kafka is a distributed streaming platform that is used for building real-time data pipelines and streaming applications. It is designed for high throughput and low latency, making it well-suited for asynchronous messaging.

To use Kafka for asynchronous messaging in Java, you'll need to set up a Kafka cluster and create a producer and consumer. The producer can then send messages to a topic, and the consumer can asynchronously receive messages from the topic.

Here's an example of how to send a message asynchronously using Kafka:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

producer.send(new ProducerRecord<>("myTopic", "Hello, world!"));

producer.close();


And here's an example of how to receive a message asynchronously using Kafka:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "myGroup");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", 
 "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("myTopic"));

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  for (ConsumerRecord<String, String> record : records) {
    System.out.printf("offset = %d, key = %s, value = %s%n", 
            record.offset(), record.key(), record.value());
  }
}

consumer.close();

Here is a nice diagram from Uber's system design to showcase how to achieve aysnc messaging using Kafka

e how to achieve aysnc messaging using Kafka



3. RabbitMQ

RabbitMQ is an open-source message broker that implements the Advanced Message Queuing Protocol (AMQP). It is designed to be lightweight and easy to use, making it a popular choice for asynchronous messaging.

To use RabbitMQ for asynchronous messaging in Java, you'll need to set up a RabbitMQ server and create a connection, channel, and queue. The producer can then send messages to the queue, and the consumer can asynchronously receive messages from the queue.

Here's an example of how to send a message asynchronously using RabbitMQ:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare("myQueue", false, false, false, null);
channel.basicPublish

Here's an example of how to receive a message asynchronously using RabbitMQ:

onnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare("myQueue", false, false, false, null);

Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, 
     AMQP.BasicProperties properties, byte[] body)
      throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println("Received message: " + message);
  }
};

channel.basicConsume("myQueue", true, consumer);

connection.close();

Here is a diagram which shows how you can implement Asynchronous messaging using RabbitMQ




4. CompletableFuture API

The CompletableFuture API was introduced in Java 8 as a way to represent the result of an asynchronous computation. It allows you to perform operations asynchronously and combine them in a flexible way.

To use the CompletableFuture API for asynchronous messaging in Java, you can use the supplyAsync method to create a CompletableFuture that executes a task asynchronously, and the thenAcceptAsync method to execute a task when the CompletableFuture completes.

Here's an example of how to send and receive a message asynchronously using the CompletableFuture API:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  // send message asynchronously
  return "Hello, world!";
});

future.thenAcceptAsync(message -> {
  // receive message asynchronously
  System.out.println("Received message: " + message);
});


Conclusion

That's all about what is Asynchronous Messaging and how to use JMS, Kafka,  RabbitMQ and CompletableFuture API to implement in Java. Asynchronous messaging is a powerful tool for improving the performance and scalability of your Java applications. Whether you use the Java Message Service, Apache Kafka, RabbitMQ, or the CompletableFuture API, there are many options for implementing asynchronous messaging in Java.

So go ahead and give it a try – your code (and your users) will thank you!

I hope this article on asynchronous messaging in Java was helpful and entertaining! Let me know if you have any questions or need further clarification on any of the concepts covered.

Other Java Microservices articles and tutorials you may like:


Thanks for reading this article so far. If you like this article about Asynchronous messaging in Java and Microservice architecture and how to use it then please share them with your friends and colleagues. If you have any questions, feedback, or other fee courses to add to this list, please feel free to suggest.

P. S. - If you are new to Microservice architecture and want to learn more about Microservice Architecture and solutions from scratch and looking for free resources then I highly recommend you to check out my post about 7 free Microservice courses. It contains free Udemy and Coursera and courses to learn Microservice architecture from scratch.  

No comments:

Post a Comment

Feel free to comment, ask questions if you have any doubt.