Description
1) When the list of destinations is known ahead of time and is static, you can add an element
to the route that will consume messages from a source endpoint and then send the message
out to a list of destinations. (accounting dept & production dept).
2) Using this multicasting technique, we can improve the speed of operations by sending
orders to accounting and production at the same time.
Example :
The incoming order are in the folder file:/venkatjavasource/camel/from.
First all these orders will submit to jms:xmlOrders jms queue.
From here all these orders are multicast to jms:accounting accounting queue and
jms:production production queue.

A multicast sends a message to a number of specified recipients.
1) Code that do multicast is,
from("jms:xmlOrders").multicast().to("jms:accounting", "jms:production");
We can mention JMS to list using comma separator.
UsingMulticastingInCamelRouters.java
package com.venkatjavasource.camel;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;
public class UsingMulticastingInCamelRouters {
public static void main(String[] args) throws Exception {
// Create CamelContext
CamelContext context = new DefaultCamelContext();
// Connect to embedded ActiveMQ JMS broker
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"vm://localhost");
context.addComponent("jms",
JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
// add our route to the CamelContext
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
// Content-based router
from("file:/venkatjavasource/camel/from?noop=false").choice()
.when(header("CamelFileName").endsWith(".xml"))
.to("jms:xmlOrders")
.when(header("CamelFileName").regex("^.*(csv|csl)$"))
.to("jms:csvOrders").otherwise().to("jms:badOrders");
from("jms:xmlOrders").multicast().to("jms:accounting",
"jms:production");
// test that our route is working
from("jms:accounting").process(new Processor() {
public void process(Exchange exchange) throws Exception {
System.out.println("Accounting received order: "
+ exchange.getIn().getHeader("CamelFileName"));
}
});
from("jms:production").process(new Processor() {
public void process(Exchange exchange) throws Exception {
System.out.println("Production received order: "
+ exchange.getIn().getHeader("CamelFileName"));
}
});
}
});
// start the route and let it do its work
context.start();
Thread.sleep(2000);
// stop the CamelContext
context.stop();
}
}
Demo :
We have one order with file name Order_xml.xml in file:/venkatjavasource/camel/from

Select “Run As” -> “Java Application”, Order_xml.xml order will be submitted to accounting & production jms queues at a time.
Out Put :
Accounting received order: Order_xml.xml
Production received order: Order_xml.xml
The order will be removed from source directory and back up has taken in same source directory.

PARALLEL MULTICASTING :
1) This will set up the multicast to distribute messages to the destinations in parallel.
from("jms:xmlOrders").multicast().parallelProcessing()
.to("jms:accounting", "jms:production");
2) We can mention default thread pool size also,
ExecutorService executor = Executors.newFixedThreadPool(16);
from("jms:xmlOrders").multicast()
.parallelProcessing().executorService(executor)
.to("jms:accounting", "jms:production");
3) By default, the multicast will continue sending messages to destinations even if one
fails.
STOPPING THE MULTICAST ON EXCEPTION :
1) We can use stopOnException feature of the multicast, to stop sending messages to destinations if it fails.
from("jms:xmlOrders")
.multicast().stopOnException()
.parallelProcessing().executorService(executor)
.to("jms:accounting", "jms:production");
UsingMulticastingInCamelRouters.java
package com.venkatjavasource.camel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;
public class UsingMulticastingInCamelRouters {
public static void main(String[] args) throws Exception {
// Create CamelContext
CamelContext context = new DefaultCamelContext();
// Connect to embedded ActiveMQ JMS broker
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"vm://localhost");
context.addComponent("jms",
JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
// add our route to the CamelContext
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
// Content-based router
from("file:/venkatjavasource/camel/from?noop=false").choice()
.when(header("CamelFileName").endsWith(".xml"))
.to("jms:xmlOrders")
.when(header("CamelFileName").regex("^.*(csv|csl)$"))
.to("jms:csvOrders").otherwise().to("jms:badOrders");
ExecutorService executor = Executors.newFixedThreadPool(14);
from("jms:xmlOrders").multicast().stopOnException()
.parallelProcessing().executorService(executor)
.to("jms:accounting", "jms:production");
// test that our route is working
from("jms:accounting").process(new Processor() {
public void process(Exchange exchange) throws Exception {
System.out.println("Accounting received order: "
+ exchange.getIn().getHeader("CamelFileName"));
}
});
from("jms:production").process(new Processor() {
public void process(Exchange exchange) throws Exception {
System.out.println("Production received order: "
+ exchange.getIn().getHeader("CamelFileName"));
}
});
}
});
// start the route and let it do its work
context.start();
Thread.sleep(2000);
// stop the CamelContext
context.stop();
}
}
*** Venkat – Happy leaning ****