
Overview
- This plugin provides access to major core functionalities of the
com.rabbitmq:amqp-client and
dev.kourier:amqp-client libraries.
Features
- Integrated with coroutines and has a separate dispatcher.
- Includes a built-in connection/channel management system.
- Gives the possibility to interact directly with the java library.
- Seamlessly integrates with the Kotlin DSL, making it readable, maintainable, and easy to use.
Table of Contents
Choose a distribution
This library is available in multiple distributions. Choose the one that best fits your needs:
Default Distribution
This distribution is an alias to the Java Client distribution for the JVM platform, and to the Kourier Client
distribution for Kotlin Native platforms.
dependencies {
implementation("io.github.damirdenis-tudor:ktor-server-rabbitmq:<version>")
}
Java Client Distribution
This distribution uses the official RabbitMQ Java client library
(com.rabbitmq:amqp-client) under the hood, and is available only for the JVM platform.
dependencies {
implementation("io.github.damirdenis-tudor:ktor-server-rabbitmq-java:<version>")
}
Kourier (Pure Kotlin & Kotlin Multiplatform) Client Distribution
This distribution uses the pure Kotlin Kourier client library
(dev.kourier:amqp-client) under the hood, and is available for both JVM and Kotlin Native platforms.
dependencies {
implementation("io.github.damirdenis-tudor:ktor-server-rabbitmq-kourier:<version>")
}
Usage
Installation
install(RabbitMQ) {
uri = "amqp://<user>:<password>@<address>:<port>"
defaultConnectionName = "<default_connection>"
connectionAttempts = 20
attemptDelay = 10
dispatcherThreadPollSize = 4
tlsEnabled = false
}
Queue binding example
rabbitmq {
queueBind {
queue = "demo-queue"
exchange = "demo-exchange"
routingKey = "demo-routing-key"
queueDeclare {
queue = "demo-queue"
durable = true
}
exchangeDeclare {
exchange = "demo-exchange"
type = "direct"
}
}
}
Producer example
rabbitmq {
repeat(10) {
basicPublish {
exchange = "demo-exchange"
routingKey = "demo-routing-key"
message { "Hello World!" }
}
}
}
Consumer Example
rabbitmq {
basicConsume {
autoAck = true
queue = "demo-queue"
deliverCallback<String> { message ->
logger.info("Received message: $message")
}
}
}
Consumer Example with coroutinePollSize
rabbitmq {
connection(id = "consume") {
basicConsume {
autoAck = true
queue = "demo-queue"
dispacher = Dispacher.IO
coroutinePollSize = 1_000
deliverCallback<String> { message ->
logger.info("Received message: $message")
delay(30)
}
}
}
}
Consumer Example with coroutinePollSize
rabbitmq {
connection(id = "consume") {
basicConsume {
autoAck = true
queue = "demo-queue"
dispacher = Dispacher.IO
coroutinePollSize = 1_000
deliverCallback<String> { message ->
logger.info("Received message: $message")
delay(30)
}
}
}
}
Library Calls Example
rabbitmq {
libChannel(id = 2) {
basicPublish("demo-queue", "demo-routing-key", null, "Hello!".toByteArray())
val consumer = object : DefaultConsumer(channel) {
override {
}
}
basicConsume(, , consumer)
}
}
rabbitmq {
libConnection(id = "lib-connection") {
val channel = createChannel()
channel.basicPublish("demo-queue", "demo-routing-key", null, "Hello!".toByteArray())
consumer = : DefaultConsumer(channel) {
{
}
}
channel.basicConsume(, , consumer)
}
}
Multiple Instances Example
Multiple Connections Example
Custom Coroutine Scope Example
val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
println("ExceptionHandler got $throwable")
}
val rabbitMQScope = CoroutineScope(SupervisorJob() + exceptionHandler)
install(RabbitMQ) {
connectionAttempts = 3
attemptDelay = 10
uri = rabbitMQContainer.amqpUrl
scope = rabbitMQScope
}
rabbitmq {
connection(id = "consume") {
basicConsume {
autoAck =
queue =
dispacher = Dispacher.IO
coroutinePollSize =
deliverCallback<String> { message ->
Exception()
}
}
}
}
Serialization Fallback Example
Dead Letter Queue Example
Logging
- In order to set a logging level to this library add this line in
logback.xml file:
<logger name="io.github.damir.denis.tudor.ktor.server.rabbitmq" level="DEBUG"/>