Hello readers of 2coffee.dev, it's been a long time since I last sat down to chat and inquire about everyone's health. Today, the weather is a bit chilly, feeling as if winter is very close, just waiting for that invisible door to swing open and let the winter winds rush in. A few days ago, probably due to the sudden change in weather, one side of my nose got completely blocked, making it hard to breathe, yet all it took was diligently drinking fresh ginger with warm water every morning and evening, and after just three days everything returned to normal.
For many months, I haven't published any new articles because work has been extremely busy. I want to share with you that I just went through a comprehensive system upgrade for the project at my workplace. There was so much to do, a lot of new knowledge to acquire, and many times struggling with tons of data accumulated over the years. So far, everything seems to be settling down, and work is much less hectic than before.
Normally, if you want to handle a large amount of user data, how would you do it? Let's take a specific example! That is migrating user data to be compatible with the new version of the application. It's similar to adding, removing, or modifying some fields of data, for instance... I chose to write a script and run it at an appropriate time. That might not be the best way, but it surely fits the current situation of our project. Typically, the logic in the script needs to execute quickly and accurately as desired. Of course, with millions of records, there will definitely be some irregularities that throw exceptions or even cause errors. Therefore, it is necessary to control errors and understand the causes, thus finding ways to resolve them.
As usual, at the beginning of the week, I received the task of processing data on over 4 million user records. With such a large volume of data while ensuring the criteria above, we chose to handle it sequentially, or in other words, adhere to the publisher-consumer model, communicating through a message queue. The data is pushed in by the publisher and retrieved for processing by the consumer. This model is suitable as it allows control over the processing process. For example, controlling the input data and processing logic at the consumer. If the system is strong enough, multiple consumers can be created to expedite the processing. If an error occurs, the record causing that error can be pushed to a different queue, which can later be retrieved for statistics and reprocessing of the erroneous message.
Since similar tasks like this have occurred many times before, I have a framework to implement the publisher-consumer model. A part of the publisher's code responsible for pushing messages into the queue looks like this.
async function myPublisher() {
await connectDatabases();
await connectRabbit();
const cursor = mongoose.connection
.collection("users")
.find()
.sort({ _id: -1 })
.batchSize(10000);
while (await cursor.hasNext()) {
const user = await cursor.next();
publish(RABBITMQ_ROUTING_KEY, { userId: user._id.toString() });
console.log('publish', { userId: user._id.toString() });
}
}
The code above retrieves all users from MongoDB and calls the publish function to push a message containing { userId: user._id.toString() } to RabbitMQ.
The code in the publish function looks like this.
function publish(routingKey, msgObj) {
if (!channel) {
console.warn("[RabbitMQ] Channel not ready, retrying...");
return setTimeout(() => publish(routingKey, msgObj), 500);
}
const msgBuffer = Buffer.from(JSON.stringify(msgObj));
channel.publish(EXCHANGE, routingKey, msgBuffer, { persistent: true });
}
Before that, the connectRabbit function must be called to initialize the connection, and the content of that function looks like this.
async function connectRabbit() {
try {
connection = await amqp.connect(RABBITMQ_URL);
connection.on("error", (err) => {
console.error("[RabbitMQ] Connection error:", err.message);
setTimeout(connectRabbit, 1000); // Reconnect
});
connection.on("close", () => {
console.warn("[RabbitMQ] Connection closed. Reconnecting...");
setTimeout(connectRabbit, 1000); // Reconnect
});
channel = await connection.createChannel();
await channel.assertExchange(EXCHANGE, "direct", { durable: true });
console.log("[RabbitMQ] Connected and channel created");
return channel;
} catch (err) {
console.error("[RabbitMQ] Connection failed:", err.message);
setTimeout(connectRabbit, 1000); // Retry
}
}
The channel in the publish function is initialized from connectRabbit. publish is a synchronous function, so I thought that each time it is called, the message is pushed to the RabbitMQ server, and RabbitMQ routes it to the appropriate queue based on the routing key. I assumed this was correct until the queried data became too large, reaching over 4 million records. When running the publisher script, the console log messages console.log('publish', { userId: user._id.toString() }) kept appearing on the screen, but upon checking, no messages were found in the queue. This was indeed contrary to the design aimed at bandwidth to help Rabbit process a large number of messages at incredible speed. I persisted in waiting until the loop reached a critical number when... boom: Error.
The error message was very brief, akin to a warning that the write process failed and the connection was interrupted. After thinking for a while, I guessed the problem lay in why the loop was continuously running, messages were continuously pushed, but the queue was not receiving any messages? If I could resolve that issue, I was confident that the aforementioned error would no longer appear. In previous instances, the data may not have been large enough, or I hadn't noticed because the publisher's run time was too short, thus the loop ran faster than the time taken to observe the queue.
After some contemplation, it turned out that the issue was in the publish function. The publish function did not operate the way I thought; each time it was called, it did not immediately push the message to RabbitMQ but waited for the appropriate moment to send it. When calling publish, the function returns true or false. true conveys "Okay, continue sending the next message, I can handle it," while false indicates "Stop! Wait for me to finish processing the current message," at which point I need to wait for the drain event before continuing to call the publish function. RabbitMQ refers to this as Flow control.
Thus, it is clear that the code above was not adhering to this rule. It continuously called publish en masse, leaving Rabbit without time to process the messages. Therefore, the correct approach should be to check the true/false status each time publish is called. So, the publish function needs to be modified as follows.
function publish(routingKey, msgObj) {
if (!channel) {
console.warn("[RabbitMQ] Channel not ready, retrying...");
return setTimeout(() => publish(routingKey, msgObj), 500);
}
const msgBuffer = Buffer.from(JSON.stringify(msgObj));
return channel.publish(EXCHANGE, routingKey, msgBuffer, { persistent: true });
}
Now the code in the publisher looks like this.
async function myPublisher() {
await connectDatabases();
await connectRabbit();
const cursor = mongoose.connection
.collection("users")
.find()
.sort({ _id: -1 })
.batchSize(10000);
while (await cursor.hasNext()) {
const user = await cursor.next();
const canContinue = publish(RABBITMQ_ROUTING_KEY, { userId: user._id.toString() });
console.log('publish', { userId: user._id.toString() });
if (!canContinue) {
console.log("Buffer full, waiting...");
await new Promise((resolve) => channel.once("drain", resolve));
}
}
}
Now, simply rerunning the publisher will allow messages to appear in the queue sequentially in real-time. That's it!
Through this article, I have shared the issues encountered when handling large amounts of data with RabbitMQ in the publisher-consumer model. When the data exceeded 4 million records, failing to comply with RabbitMQ's flow control mechanism led to errors, interrupting the message processing process. This stemmed from the publish function being continuously called en masse without checking the true/false status, resulting in RabbitMQ's buffer being overloaded and the system disconnecting. To resolve this, I modified the logic in the publisher to check the status of publish and wait for the drain event when necessary, ensuring that the message pushing process proceeded smoothly and efficiently.
The above is just one of the ways to handle the issues being faced. What about you? Are you using RabbitMQ? How are you handling large amounts of data? Please leave a comment below the article. Thank you!