software-design4 Min Read

How to Implement Exponential Backoff in Rabbitmq Using AMQP in Node.js

Gorav Singal

August 23, 2022

TL;DR

Use per-retry dead-letter exchanges with increasing TTLs to implement exponential backoff in RabbitMQ. Each retry publishes to a delay queue that routes back to the main queue after the TTL expires.

How to Implement Exponential Backoff in Rabbitmq Using AMQP in Node.js

Exponential Backoff in Rabbitmq

Please make sure to read first, why we need the Exponential Backoff in event driven systems.

Nodejs Code using AMQP

Message Class

Lets first wrap our message,

'use strict';

class Message {
    constructor(channel, message) {
        this._channel = channel;
        this._message = message;
    }

    async ack() {
        await this._channel.ack(this._message);
    }

    async nack() {
        await this._channel.nack(this._message);
    }

    get properties() {
        return this._message.properties;
    }

    get content() {
        return this._message.content;
    }
}

module.exports = Message;

Rabbitmq Controller Class

Lets define our controller class, which will create connections and manage queues.

'use strict';

const amqp = require('amqp-connection-manager');
const Message = require('./message');

const QUEUE_DELAY = 'x-gyanbyte-delay';
const QUEUE_TOTAL_DELAY = 'x-gyanbyte-total-delay';
const QUEUE_TOTAL_RETRIES = 'x-gyanbyte-total-retires';

function randomIntFromInterval(min, max) {
    return Math.floor(Math.random() * (max - min + 1) + min);
}

class Controller {
    init(config) {
        this.config = config;
        
        this._queueName = config.queueName;
        this._retrySettings = config.retry;
        this._prefetch = config.prefetch;
        this._retryOnException = config.retryOnException;

        this._connection_string = config.connection_string;

        return this.connect();
    }

    connect() {
        if (!this._queueName) {
            return Promise.reject(new Error('No queue name was specified'));
        }
        this._queueName = `gyanbyte_${this._queueName}`;
        this.retryQueueName = `gyanbyte_${this._queueName}_retry`;
        this.inputExchangeName = `snitch_exchange_${this._queueName}`;
        this.retryExchangeName = `snitch_exchange_${this.queueName}_retry`;
        return Promise.resolve()
            .then(() => {
                return amqp.connect(this._connection_string, {
                    connectionOptions: {frameMax: 0x10000}, heartbeatIntervalInSeconds: 60
                });
            })
            .then((conn) => {
                console.log('info', 'Connected to rabbitmq');
                //Try and open a channel
                return conn.createChannel({json: true,
                    setup: ((channel) => {
                        console.log('info', 'input channel created, creating exchange', this.inputExchangeName);
                        return channel.assertExchange(this.inputExchangeName, 'direct', {durable: true})
                            .then(async () => {
                                console.log('info', 'Lets create the input queue', this._queueName);
                                await channel.assertQueue(this._queueName, {
                                    durable: true,
                                    persistent: true,
                                    maxLength: 1000000
                                });
                            })
                            .then(async () => {
                                console.log('info', 'Bind queue and exchange', this._queueName, this.inputExchangeName);
                                //Bind our queue to the earlier created exchange, the exchange will receive message and put them in the queue for us
                                await channel.bindQueue(this._queueName, this.inputExchangeName, '');
                            })
                            .then(async () => {
                                console.log('info', 'Create retry exchange', this.retryExchangeName);
                                //Lets create a dead letter and exponential backoff exchange
                                await channel.assertExchange(this.retryExchangeName, 'direct', {
                                    durable: true
                                });
                            })
                            .then(async () => {
                                console.log('info', 'Lets create the retry queue with dead letter delivery to', this.retryQueueName, this.inputExchangeName);
                                //Create a another queue with an dead-letter delivery to our main exchange. We will expect messages to be put here with an expiry
                                // and will expire and then retired later from main queue
                                await channel.assertQueue(this.retryQueueName, {
                                    durable: true,
                                    persistent: true,
                                    deadLetterExchange: this.inputExchangeName,
                                    maxLength: 1000000
                                });
                            })
                            .then(async () => {
                                console.log('info', 'Bind', this.retryQueueName, this.retryExchangeName, '');
                                await channel.bindQueue(this.retryQueueName, this.retryExchangeName, '');
                            })
                            .then(async () => {
                                console.log('info', `Starting queue consumer for queue: ${this._queueName}`);
                                channel.prefetch(this._prefetch);
                                
                                console.log('info', 'wait for the input to arrive on', this._queueName);
                                await channel.consume(this._queueName, this._consumeMessage.bind(this), {noAck: false});
                            });
                    })
                });
            })
            .then((channel) => {
                this._channel = channel;
                channel.on('disconnect', (err) => {
                    console.log('warn', err);
                });
            });
    }

    async _publishToExchange(message) {
        await this._channel.publish(this.inputExchangeName, '', message, {persistent: true})
            .then((response) => {
                if (!response) {
                    console.log('warn', 'cannot publish message; channel\'s buffer is full');
                    //Lets wait for drain event and send this message back into the queue
                    return new Promise((resolve) => {
                        this._channel._channel.once('drain', () => {
                            console.log('debug', 'Channel was successfully drained, so we can accept more data.');
                            resolve();
                        });
                    });
                }
                return response;
            });
    }

    async publishMessage(message) {
        await this._publishToExchange(message);
    }

    _consumeMessage(message) {
        let _message = new Message(this._channel, message);
        
        console.log('Message received');
        return new Promise((resolve, reject) => {
            try {
                const messageContent = JSON.parse(message.content);
                if (messageContent.op == 'error') {
                    return reject(new Error('My error'));
                }
                console.log(messageContent);
            } catch(error) {
                reject(error);
            }
            resolve();
        })
        .then(() => {
            _message.ack()
        })
        .catch((err) => {
            console.log('error', err);
            if (this._retryOnException) {
                return this._pushToDelayedQueue(message);
            }
            return _message.nack();
        });
    }

    _pushToDelayedQueue(message) {
        if (this._channel === null) {
            return;
        }
        //We will not nack this but instead it ack and put it in the retry later queue
        let expiration = 0;
        let totalDelay = 0;
        let totalRetries = 0;

        let config = this._retrySettings;

        //Sometimes these headers are missing and needs to be checked if they exists or not.
        if (message.properties.headers !== undefined) {
            expiration = message.properties.headers[QUEUE_DELAY] || expiration;
            totalDelay = message.properties.headers[QUEUE_TOTAL_DELAY] || totalDelay;
            totalRetries = message.properties.headers[QUEUE_TOTAL_RETRIES] || totalRetries;
        }
        if (expiration === 0) {
            expiration = config.initialWait;
        } else {
            if ((config.maximumWait !== -1 && totalDelay > config.maximumWait) ||
                (config.maxRetries !== -1 && totalRetries >= config.maxRetries)) {
                let object = JSON.parse(message.content.toString());
                console.log('warn', 'Rejecting task as it has between retried too beyond maximum attempts', object.id);
                return this._channel.ack(message);
            }
            expiration *= config.factor;
            expiration = Math.floor(expiration);
        }

        let randomizedValue = 0;
        if (config.randomizeBy > 0) {
            randomizedValue = randomIntFromInterval(0, config.randomizeBy);
        }

        let nextTotalDelay = totalDelay + expiration;
        if (config.maximumWaitCeil !== -1 && expiration > config.maximumWaitCeil) {
            expiration = config.maximumWaitCeil;
            nextTotalDelay = config.maximumWaitCeil;
        }

        console.log('expiration', expiration);
        //Lets send it to the retry exchange
        return this._channel.publish(this.retryExchangeName, '', JSON.parse(message.content), {
            expiration: expiration + randomizedValue,
            headers: {
                QUEUE_DELAY: expiration,
                QUEUE_TOTAL_DELAY: nextTotalDelay,
                QUEUE_TOTAL_RETRIES: totalRetries + 1
            }
        })
            .then(() => this._channel.ack(message));
    }
}

module.exports = Controller;

Main Runner Test Code

const RabbiqmqController = require('./rabbitmq_controller');

const config = {
    connection_string: 'amqp://guest:guest@localhost:5672',
    queueName: "test",
    retry: {
        factor: 1.2,
        initialWait: 5000,
        maximumWait: -1,
        randomizeBy: 2000,
        maxRetries: -1,
        maximumWaitCeil: -1
    },
    prefetch: 1,
    retryOnException: true
};

rabbitmqController = new RabbiqmqController();
rabbitmqController.init(config)
    .then(() => {

    })
    .catch((error) => {
        console.error(error);
    });

The code is self explanatory. We are just creating exchange/queues and retry queues. And, starting the consumer by default in this code. You might want to do something else.

Bonus

Docker-compose file for starting rabbitmq container:

version: '3.3'
services:
  rabbitmq:
    image: rabbitmq:3-management
    hostname: "mynode"
    ports:
       - "15672:15672"
       - "5672:5672"
    environment:
       RABBITMQ_DEFAULT_USER: "guest"
       RABBITMQ_DEFAULT_PASS: "guest"
       RABBITMQ_NODENAME: "mynode"
    volumes:
      - ./data:/var/lib/rabbitmq

Thanks for reading.

Share

Related Posts

Why Exponential Backoff in Rabbitmq or In Event-Driven Systems

Why Exponential Backoff in Rabbitmq or In Event-Driven Systems

Understanding Simple Message Workflow First, lets understand a simple workflow…

Message Queues with RabbitMQ and SQS in Node.js

Message Queues with RabbitMQ and SQS in Node.js

Why Message Queues Message queues decouple producers from consumers, enabling…

Node.js Architecture — Event Loop Deep Dive

Node.js Architecture — Event Loop Deep Dive

Why the Event Loop Matters Node.js runs JavaScript on a single thread, yet…

WebSockets with Socket.io in Node.js

WebSockets with Socket.io in Node.js

WebSocket vs HTTP Traditional HTTP follows a request/response model — the client…

Testing Node.js — Unit, Integration, and E2E

Testing Node.js — Unit, Integration, and E2E

Testing Strategy A solid testing strategy follows the testing pyramid — many…

Redis — Caching, Sessions, Pub/Sub in Node.js

Redis — Caching, Sessions, Pub/Sub in Node.js

Why Redis for Node.js Redis is an in-memory data store that serves as a cache…

Latest Posts

AI Video Generation in 2025 — Models, Costs, and How to Build a Cost-Effective Pipeline

AI Video Generation in 2025 — Models, Costs, and How to Build a Cost-Effective Pipeline

AI video generation went from “cool demo” to “usable in production” in 2024-202…

AI Models in 2025 — Cost, Capabilities, and Which One to Use

AI Models in 2025 — Cost, Capabilities, and Which One to Use

Choosing the right AI model is one of the most impactful decisions you’ll make…

AI Image Generation in 2025 — Models, Costs, and How to Optimize Spend

AI Image Generation in 2025 — Models, Costs, and How to Optimize Spend

Generating one image with AI costs between $0.002 and $0.12. That might sound…

AI Coding Assistants in 2025 — Every Tool Compared, and Which One to Actually Use

AI Coding Assistants in 2025 — Every Tool Compared, and Which One to Actually Use

Two years ago, AI coding meant one thing: GitHub Copilot autocompleting your…

AI Agents Demystified — It's Just Automation With a Better Brain

AI Agents Demystified — It's Just Automation With a Better Brain

Let’s cut through the noise. If you read Twitter or LinkedIn, you’d think “AI…

Supply Chain Security — Protecting Your Software Pipeline

Supply Chain Security — Protecting Your Software Pipeline

In 2024, a single malicious contributor nearly compromised every Linux system on…