nodejs|April 02, 2026|3 min read

Message Queues with RabbitMQ and SQS in Node.js

TL;DR

Use RabbitMQ for complex routing (topic/fanout exchanges, DLQs, priority queues) and SQS for simple, managed queuing. Always implement idempotent consumers, dead letter queues for failed messages, and exponential backoff for retries.

Why Message Queues

Message queues decouple producers from consumers, enabling:

  • Async processing — Return 202 Accepted immediately, process later
  • Load leveling — Buffer traffic spikes instead of overwhelming downstream services
  • Reliability — Messages persist even if consumers crash
  • Fan-out — One event triggers multiple independent actions

RabbitMQ with amqplib

Exchange Types

RabbitMQ routes messages through exchanges to queues using different strategies.

RabbitMQ Exchange Types

Connection Setup

const amqp = require('amqplib');

class RabbitMQ {
  constructor() {
    this.connection = null;
    this.channel = null;
  }

  async connect() {
    this.connection = await amqp.connect(process.env.RABBITMQ_URL);
    this.channel = await this.connection.createChannel();

    // Prefetch: process one message at a time per consumer
    await this.channel.prefetch(1);

    this.connection.on('error', (err) => {
      console.error('RabbitMQ connection error:', err);
    });

    this.connection.on('close', () => {
      console.warn('RabbitMQ connection closed, reconnecting...');
      setTimeout(() => this.connect(), 5000);
    });
  }

  async close() {
    await this.channel?.close();
    await this.connection?.close();
  }
}

Publishing Messages

async function setupExchangeAndQueues(channel) {
  // Declare exchange
  await channel.assertExchange('orders', 'topic', { durable: true });

  // Declare queues with DLX (Dead Letter Exchange)
  await channel.assertExchange('orders.dlx', 'direct', { durable: true });
  await channel.assertQueue('orders.dead-letter', { durable: true });
  await channel.bindQueue('orders.dead-letter', 'orders.dlx', '');

  // Main queue with dead-letter routing
  await channel.assertQueue('orders.process', {
    durable: true,
    arguments: {
      'x-dead-letter-exchange': 'orders.dlx',
      'x-message-ttl': 30000, // 30s timeout
    },
  });

  await channel.bindQueue('orders.process', 'orders', 'order.created');

  // Notification queue
  await channel.assertQueue('orders.notify', { durable: true });
  await channel.bindQueue('orders.notify', 'orders', 'order.*');
}

// Publish a message
async function publishOrder(channel, order) {
  const message = Buffer.from(JSON.stringify({
    id: order.id,
    userId: order.userId,
    total: order.total,
    timestamp: Date.now(),
  }));

  channel.publish('orders', 'order.created', message, {
    persistent: true,         // Survives broker restart
    contentType: 'application/json',
    messageId: `order-${order.id}`,
    headers: {
      'x-retry-count': 0,
    },
  });
}

Consuming with Retry Logic

async function consumeOrders(channel) {
  channel.consume('orders.process', async (msg) => {
    if (!msg) return;

    const retryCount = (msg.properties.headers['x-retry-count'] || 0);
    const order = JSON.parse(msg.content.toString());

    try {
      await processOrder(order);
      channel.ack(msg);
      console.log(`Order ${order.id} processed successfully`);
    } catch (err) {
      console.error(`Order ${order.id} failed (attempt ${retryCount + 1}):`, err.message);

      if (retryCount < 3) {
        // Retry with exponential backoff
        const delay = Math.pow(2, retryCount) * 5000; // 5s, 10s, 20s

        // Republish with incremented retry count
        channel.publish('orders', 'order.created', msg.content, {
          ...msg.properties,
          headers: {
            ...msg.properties.headers,
            'x-retry-count': retryCount + 1,
            'x-delay': delay,
          },
        });
        channel.ack(msg); // Ack original to remove from queue
      } else {
        // Max retries exceeded → dead letter queue
        channel.nack(msg, false, false);
        console.error(`Order ${order.id} sent to DLQ after ${retryCount} retries`);
      }
    }
  });
}

Dead Letter Queue + Retry

AWS SQS

SQS is a fully managed queue — no infrastructure to maintain.

Setup with AWS SDK v3

const {
  SQSClient,
  SendMessageCommand,
  ReceiveMessageCommand,
  DeleteMessageCommand,
} = require('@aws-sdk/client-sqs');

const sqs = new SQSClient({ region: 'us-east-1' });
const QUEUE_URL = process.env.SQS_QUEUE_URL;

// Send message
async function sendToSQS(data) {
  await sqs.send(new SendMessageCommand({
    QueueUrl: QUEUE_URL,
    MessageBody: JSON.stringify(data),
    MessageAttributes: {
      EventType: {
        DataType: 'String',
        StringValue: 'order.created',
      },
    },
    // For FIFO queues:
    // MessageGroupId: 'orders',
    // MessageDeduplicationId: `order-${data.id}`,
  }));
}

// Poll for messages
async function pollMessages() {
  while (true) {
    const { Messages } = await sqs.send(new ReceiveMessageCommand({
      QueueUrl: QUEUE_URL,
      MaxNumberOfMessages: 10,
      WaitTimeSeconds: 20,       // Long polling (reduces empty responses)
      VisibilityTimeout: 60,     // 60s to process before reappearing
      MessageAttributeNames: ['All'],
    }));

    if (!Messages?.length) continue;

    await Promise.allSettled(
      Messages.map(async (msg) => {
        try {
          const data = JSON.parse(msg.Body);
          await processOrder(data);

          // Delete after successful processing
          await sqs.send(new DeleteMessageCommand({
            QueueUrl: QUEUE_URL,
            ReceiptHandle: msg.ReceiptHandle,
          }));
        } catch (err) {
          console.error('Failed to process message:', err);
          // Message will reappear after VisibilityTimeout
        }
      })
    );
  }
}

SQS Standard vs FIFO

Feature Standard FIFO
Throughput Unlimited 3,000 msg/s (with batching)
Ordering Best-effort Strict FIFO
Delivery At-least-once Exactly-once
Deduplication None 5-minute window
Cost Lower Higher

Idempotent Consumers

Since messages can be delivered more than once, consumers must be idempotent.

async function processOrder(order) {
  // Check if already processed (using a unique idempotency key)
  const existing = await db.processedEvents.findOne({
    eventId: `order-${order.id}`,
  });

  if (existing) {
    console.log(`Order ${order.id} already processed, skipping`);
    return;
  }

  // Process the order
  await db.transaction(async (tx) => {
    await tx.orders.updateStatus(order.id, 'processing');
    await tx.inventory.reserve(order.items);
    await tx.processedEvents.create({
      eventId: `order-${order.id}`,
      processedAt: new Date(),
    });
  });
}

RabbitMQ vs SQS

Factor RabbitMQ SQS
Management Self-hosted Fully managed
Routing Exchanges, bindings, topics Simple queue
Protocol AMQP HTTP/REST
Latency ~1ms ~20-50ms
Cost Server costs Pay per request
Best for Complex routing, low latency Simple queuing on AWS

Choose RabbitMQ when you need complex routing patterns, low latency, or are not on AWS. Choose SQS when you want zero infrastructure management and are already on AWS.

Related Posts

Latest Posts