amqp-suite is a simple and efficient AMQP (Advanced Message Queuing Protocol) client wrapper for Node.js that handles connection management, message publishing, and consuming messages from queues with a topic exchange. This package abstracts complex connection handling and simplifies AMQP usage in applications by providing easy-to-use methods for connecting, publishing, consuming, and gracefully shutting down the connection.
Important
Contributions, feedback and suggestions are welcome.
If you have ideas for improvements, feature requests, or find any issues, please open an issue or submit a pull request.
Your input helps make this library better for everyone — get started here.
- Automatic Reconnection: Built-in retry logic for connection failures and drops.
- Simplified Pub/Sub: Designed for 'topic' exchanges to allow flexible routing.
- Structured Messaging: Automatic JSON serialization and deserialization.
- Error Handling: Graceful handling of malformed messages and channel crashes.
- Flow Control: Integrated prefetch support to prevent consumer saturation.
Using npm:
npm install amqp-suiteUsing yarn:
yarn add amqp-suiteUsing pnpm:
pnpm add amqp-suiteUsing bun:
bun add amqp-suiteOnce the package is installed, you can import the library using ES Modules:
import { AmqpClient } from "amqp-suite";Create an instance of AmqpClient and establish a connection to your RabbitMQ broker. This prepares the client to publish and consume messages.
import { AmqpClient } from "amqp-suite";
const amqpClient = new AmqpClient("amqp://localhost", "example-exchange");
await amqpClient.connect();The publish method automatically stringifies your message and sends it as a persistent buffer, ensuring it won’t be lost if the broker restarts.
await amqpClient.publish(
"example.events.hello_world", // Routing Key
{
message: "Hello World!",
},
{} // Options
);The consume method automatically creates queues, binds them to the exchange, and handles acknowledgments (ack/nack). You only need to provide the queue name and the function that will process incoming messages.
await amqpClient.consume(
"example-queue", // Queue
(msg) => {
console.log("Received message:", msg);
},
{}, // Options
"example.events.hello_world" // Binding Key
);This diagram illustrates how a message is sent from the publisher, routed through the topic exchange, enqueued in the queue, and finally consumed by the consumer.
Here’s a full example that connects, publishes, consumes messages, and finally closes the connection.
import { AmqpClient } from "amqp-suite";
const amqpClient = new AmqpClient("amqp://localhost", "example-exchange");
await amqpClient.connect();
await amqpClient.publish(
"example.events.hello_world", // Routing Key
{
message: "Hello World!",
},
{} // Options
);
await amqpClient.consume(
"example-queue", // Queue
(msg) => {
console.log("Received message:", msg);
},
{}, // Options
"example.events.hello_world" // Binding Key
);
await amqpClient.close();Note: You can check the full example in examples/hello-world.
Creates a new instance of the AMQP client.
The client uses a durable topic exchange to enable flexible message routing using routing patterns.
-
amqpUrl(string) The AMQP connection URL. Example:amqp://user:pass@localhost:5672 -
exchange(string) The name of the topic exchange used for publishing and consuming messages. The exchange is asserted asdurable.
Establishes a connection to the AMQP broker and creates a channel. If the connection is lost unexpectedly, the client will automatically attempt to reconnect.
-
retries(number, optional) Maximum number of reconnection attempts during the initial connection. Default:5 -
delay(number, optional) Delay in milliseconds between reconnection attempts. Default:5000
- Prevents multiple simultaneous connection attempts.
- Automatically reconnects if the connection is closed by the broker.
- Reconnection attempts triggered after a connection drop do not reuse the original retry counter.
Promise<void>
Publishes a message to the configured topic exchange using the specified routing key.
Messages are automatically serialized to JSON and published as persistent by default.
-
routingKey(string) The routing key used to route the message. Example:user.events.create -
message(object) The message payload. It will be automatically serialized to JSON. -
options(object, optional) Additional publish options supported byamqplib. These options are merged with{ persistent: true }.
- If the channel is not initialized, the client will attempt to connect automatically.
- If the broker’s write buffer is full, the message may be temporarily buffered locally.
Promise<void>
Consumes messages from the specified queue and binds it to the exchange using the provided routing pattern.
The onMessage callback is executed for each received message.
-
queue(string) The name of the queue to consume messages from. The queue is asserted asdurable. -
onMessage(function) An asynchronous callback executed when a message is received.async (content, rawMessage) => { // message handling logic };
content: Parsed JSON message payload.rawMessage: The originalConsumeMessagefromamqplib.
-
options(object, optional) Consumer configuration options.prefetch(number): Limits the number of unacknowledged messages. Default:10
-
bindingKey(string, optional) The routing pattern used to bind the queue to the exchange. Default:#(matches all routing keys).
-
Messages are acknowledged (
ack) automatically after successful processing. -
If an error is thrown while processing a message:
- The message is negatively acknowledged (
nack) - The message is not requeued, preventing infinite retry loops for malformed messages.
- The message is negatively acknowledged (
Promise<void>
Gracefully closes the AMQP channel and connection.
- Prevents automatic reconnection during shutdown.
- Ensures resources are released cleanly.
Promise<void>
This project is licensed under the terms of the MIT License.
