I've been using Akka Camel and ActiveMQ recently, as part of a delayed worker-queue system. Given the lack of good googleable information about combining the two, I thought it would be useful if I explained briefly how to get Akka Camel and ActiveMQ to work together in the form of a quick example.
Producers and Consumers
Akka Camel uses the concept of producers and consumers, and makes it very easy to link them to ActiveMQ. Let's try publishing a message to an ActiveMQ queue, and then using a consumer to read the messages back.
First, let's implement Producer and Consumer actors. For an actor to produce messages, extend akka.camel.Producer
and implement endpointUri
. Likewise, to implement an actor to consume messages, extend akka.camel.Consumer
and implement the same endpointUri
. As the consumer will be receiving messages, you will also need to implement the standard actor receive
method.
import akka.actor._
import akka.camel._
class SimpleProducer() extends Actor with Producer with OneWay {
def endpointUri: String = "activemq:foo.bar"
}
class SimpleConsumer() extends Actor with Consumer {
def endpointUri: String = "activemq:foo.bar"
def receive = {
case msg: CamelMessage => println(msg)
}
}
These actors will communicate using the ActiveMQ queue named foo.bar
. The consumer will print any received messages to the console. As this is a publish-subscribe system, we also need to extend our SimpleProducer with the OneWay
trait. This tells Camel that our producer won't be participating in any request-reply messaging patterns.
Setting up Akka Camel
Now that we have a Producer and a Consumer, we need to wire a CamelExtension into an ActorSystem to tell it how to use ActiveMQ. In this example, ActiveMQ is running on localhost:61616
. The component name needs to match the protocol specified in the producer and consumer endpoints.
import akka.actor._
import akka.camel._
import org.apache.activemq.camel.component.ActiveMQComponent
import org.apache.activemq.ScheduledMessage._
val actorSystem = ActorSystem("CamelTesting")
val system = CamelExtension(actorSystem)
val amqUrl = s"nio://localhost:61616"
system.context.addComponent("activemq", ActiveMQComponent.activeMQComponent(amqUrl))
// create consumer and producer
val simpleConsumer = actorSystem.actorOf(Props[SimpleConsumer])
val simpleProducer = actorSystem.actorOf(Props[SimpleProducer])
Now we have linked Akka to ActiveMQ, let's send messages through it! As our producer is an akka-actor, we can send messages to it just like any other.
simpleProducer ! Message("first")
simpleProducer ! Message("second")
simpleProducer ! Message("third")
val delayedMessage = CamelMessage(Message("delayed fourth"), Map(AMQ_SCHEDULED_DELAY -> 3000))
simpleProducer ! delayedMessage
The fourth messages makes use of ActiveMQ's scheduled delay feature. To do this, we had to send a CamelMessage
with modified headers. All available CamelMessage options are available here. Unfortunately, this does leak knowledge of ActiveMQ outside of our SimpleProducer, but introducing a level of indirection would easily solve it.
Conclusion
We have sent messages through ActiveMQ using Akka-Camel, all in about 40 lines of code. A working example can be viewed on Github. As it is possible for our actors to hide their implementation from those around them, ActiveMQ can be worked into a system using Akka without much hassle at all.