nodejs3 Min Read

Message Queues with RabbitMQ and SQS in Node.js

Gorav Singal

April 02, 2026

TL;DR

Use RabbitMQ for complex routing (topic/fanout exchanges, DLQs, priority queues) and SQS for simple, managed queuing. Always implement idempotent consumers, dead letter queues for failed messages, and exponential backoff for retries.

Message Queues with RabbitMQ and SQS in Node.js

Why Message Queues

Message queues decouple producers from consumers, enabling:

  • Async processing — Return 202 Accepted immediately, process later
  • Load leveling — Buffer traffic spikes instead of overwhelming downstream services
  • Reliability — Messages persist even if consumers crash
  • Fan-out — One event triggers multiple independent actions

RabbitMQ with amqplib

Exchange Types

RabbitMQ routes messages through exchanges to queues using different strategies.

RabbitMQ Exchange Types

Connection Setup

const amqp = require('amqplib');

class RabbitMQ {
  constructor() {
    this.connection = null;
    this.channel = null;
  }

  async connect() {
    this.connection = await amqp.connect(process.env.RABBITMQ_URL);
    this.channel = await this.connection.createChannel();

    // Prefetch: process one message at a time per consumer
    await this.channel.prefetch(1);

    this.connection.on('error', (err) => {
      console.error('RabbitMQ connection error:', err);
    });

    this.connection.on('close', () => {
      console.warn('RabbitMQ connection closed, reconnecting...');
      setTimeout(() => this.connect(), 5000);
    });
  }

  async close() {
    await this.channel?.close();
    await this.connection?.close();
  }
}

Publishing Messages

async function setupExchangeAndQueues(channel) {
  // Declare exchange
  await channel.assertExchange('orders', 'topic', { durable: true });

  // Declare queues with DLX (Dead Letter Exchange)
  await channel.assertExchange('orders.dlx', 'direct', { durable: true });
  await channel.assertQueue('orders.dead-letter', { durable: true });
  await channel.bindQueue('orders.dead-letter', 'orders.dlx', '');

  // Main queue with dead-letter routing
  await channel.assertQueue('orders.process', {
    durable: true,
    arguments: {
      'x-dead-letter-exchange': 'orders.dlx',
      'x-message-ttl': 30000, // 30s timeout
    },
  });

  await channel.bindQueue('orders.process', 'orders', 'order.created');

  // Notification queue
  await channel.assertQueue('orders.notify', { durable: true });
  await channel.bindQueue('orders.notify', 'orders', 'order.*');
}

// Publish a message
async function publishOrder(channel, order) {
  const message = Buffer.from(JSON.stringify({
    id: order.id,
    userId: order.userId,
    total: order.total,
    timestamp: Date.now(),
  }));

  channel.publish('orders', 'order.created', message, {
    persistent: true,         // Survives broker restart
    contentType: 'application/json',
    messageId: `order-${order.id}`,
    headers: {
      'x-retry-count': 0,
    },
  });
}

Consuming with Retry Logic

async function consumeOrders(channel) {
  channel.consume('orders.process', async (msg) => {
    if (!msg) return;

    const retryCount = (msg.properties.headers['x-retry-count'] || 0);
    const order = JSON.parse(msg.content.toString());

    try {
      await processOrder(order);
      channel.ack(msg);
      console.log(`Order ${order.id} processed successfully`);
    } catch (err) {
      console.error(`Order ${order.id} failed (attempt ${retryCount + 1}):`, err.message);

      if (retryCount < 3) {
        // Retry with exponential backoff
        const delay = Math.pow(2, retryCount) * 5000; // 5s, 10s, 20s

        // Republish with incremented retry count
        channel.publish('orders', 'order.created', msg.content, {
          ...msg.properties,
          headers: {
            ...msg.properties.headers,
            'x-retry-count': retryCount + 1,
            'x-delay': delay,
          },
        });
        channel.ack(msg); // Ack original to remove from queue
      } else {
        // Max retries exceeded → dead letter queue
        channel.nack(msg, false, false);
        console.error(`Order ${order.id} sent to DLQ after ${retryCount} retries`);
      }
    }
  });
}

Dead Letter Queue + Retry

AWS SQS

SQS is a fully managed queue — no infrastructure to maintain.

Setup with AWS SDK v3

const {
  SQSClient,
  SendMessageCommand,
  ReceiveMessageCommand,
  DeleteMessageCommand,
} = require('@aws-sdk/client-sqs');

const sqs = new SQSClient({ region: 'us-east-1' });
const QUEUE_URL = process.env.SQS_QUEUE_URL;

// Send message
async function sendToSQS(data) {
  await sqs.send(new SendMessageCommand({
    QueueUrl: QUEUE_URL,
    MessageBody: JSON.stringify(data),
    MessageAttributes: {
      EventType: {
        DataType: 'String',
        StringValue: 'order.created',
      },
    },
    // For FIFO queues:
    // MessageGroupId: 'orders',
    // MessageDeduplicationId: `order-${data.id}`,
  }));
}

// Poll for messages
async function pollMessages() {
  while (true) {
    const { Messages } = await sqs.send(new ReceiveMessageCommand({
      QueueUrl: QUEUE_URL,
      MaxNumberOfMessages: 10,
      WaitTimeSeconds: 20,       // Long polling (reduces empty responses)
      VisibilityTimeout: 60,     // 60s to process before reappearing
      MessageAttributeNames: ['All'],
    }));

    if (!Messages?.length) continue;

    await Promise.allSettled(
      Messages.map(async (msg) => {
        try {
          const data = JSON.parse(msg.Body);
          await processOrder(data);

          // Delete after successful processing
          await sqs.send(new DeleteMessageCommand({
            QueueUrl: QUEUE_URL,
            ReceiptHandle: msg.ReceiptHandle,
          }));
        } catch (err) {
          console.error('Failed to process message:', err);
          // Message will reappear after VisibilityTimeout
        }
      })
    );
  }
}

SQS Standard vs FIFO

Feature Standard FIFO
Throughput Unlimited 3,000 msg/s (with batching)
Ordering Best-effort Strict FIFO
Delivery At-least-once Exactly-once
Deduplication None 5-minute window
Cost Lower Higher

Idempotent Consumers

Since messages can be delivered more than once, consumers must be idempotent.

async function processOrder(order) {
  // Check if already processed (using a unique idempotency key)
  const existing = await db.processedEvents.findOne({
    eventId: `order-${order.id}`,
  });

  if (existing) {
    console.log(`Order ${order.id} already processed, skipping`);
    return;
  }

  // Process the order
  await db.transaction(async (tx) => {
    await tx.orders.updateStatus(order.id, 'processing');
    await tx.inventory.reserve(order.items);
    await tx.processedEvents.create({
      eventId: `order-${order.id}`,
      processedAt: new Date(),
    });
  });
}

RabbitMQ vs SQS

Factor RabbitMQ SQS
Management Self-hosted Fully managed
Routing Exchanges, bindings, topics Simple queue
Protocol AMQP HTTP/REST
Latency ~1ms ~20-50ms
Cost Server costs Pay per request
Best for Complex routing, low latency Simple queuing on AWS

Choose RabbitMQ when you need complex routing patterns, low latency, or are not on AWS. Choose SQS when you want zero infrastructure management and are already on AWS.

Share

Related Posts

Node.js Architecture — Event Loop Deep Dive

Node.js Architecture — Event Loop Deep Dive

Why the Event Loop Matters Node.js runs JavaScript on a single thread, yet…

WebSockets with Socket.io in Node.js

WebSockets with Socket.io in Node.js

WebSocket vs HTTP Traditional HTTP follows a request/response model — the client…

Testing Node.js — Unit, Integration, and E2E

Testing Node.js — Unit, Integration, and E2E

Testing Strategy A solid testing strategy follows the testing pyramid — many…

Redis — Caching, Sessions, Pub/Sub in Node.js

Redis — Caching, Sessions, Pub/Sub in Node.js

Why Redis for Node.js Redis is an in-memory data store that serves as a cache…

Database Integration — PostgreSQL with Node.js

Database Integration — PostgreSQL with Node.js

Choosing Your PostgreSQL Client Node.js has three main approaches to working…

Performance Optimization and Profiling in Node.js

Performance Optimization and Profiling in Node.js

Profiling First, Optimize Second Never optimize blindly. Always profile to find…

Latest Posts

AI Video Generation in 2025 — Models, Costs, and How to Build a Cost-Effective Pipeline

AI Video Generation in 2025 — Models, Costs, and How to Build a Cost-Effective Pipeline

AI video generation went from “cool demo” to “usable in production” in 2024-202…

AI Models in 2025 — Cost, Capabilities, and Which One to Use

AI Models in 2025 — Cost, Capabilities, and Which One to Use

Choosing the right AI model is one of the most impactful decisions you’ll make…

AI Image Generation in 2025 — Models, Costs, and How to Optimize Spend

AI Image Generation in 2025 — Models, Costs, and How to Optimize Spend

Generating one image with AI costs between $0.002 and $0.12. That might sound…

AI Coding Assistants in 2025 — Every Tool Compared, and Which One to Actually Use

AI Coding Assistants in 2025 — Every Tool Compared, and Which One to Actually Use

Two years ago, AI coding meant one thing: GitHub Copilot autocompleting your…

AI Agents Demystified — It's Just Automation With a Better Brain

AI Agents Demystified — It's Just Automation With a Better Brain

Let’s cut through the noise. If you read Twitter or LinkedIn, you’d think “AI…

Supply Chain Security — Protecting Your Software Pipeline

Supply Chain Security — Protecting Your Software Pipeline

In 2024, a single malicious contributor nearly compromised every Linux system on…