Xin chào các độc giả của 2coffee.dev, đã rất lâu rồi tôi mới có dịp ngồi lại, trò chuyện, hỏi thăm tình hình sức khoẻ của mọi người. Hôm nay trời se lạnh, cảm giác như mùa đông đã đến thật gần như chỉ chờ cánh cửa vô hình bật mở bung ra là luồng gió mùa ồ ạt tràn xuống. Cách đây ít hôm, chắc do sự thay đổi thời tiết đột ngột mà một trong hai bên mũi bị tắc cứng, thở không ra hơi, ấy thế mà chỉ cần chăm chỉ uống gừng tươi với nước ấm vào mỗi sáng sớm với chiều tà thì chỉ sau 3 hôm mọi thứ bình thường trở lại.
Nhiều tháng vừa qua tôi không ra thêm bài viết mới nào bởi vì công việc quá bận rộn. Tiết lộ cho các bạn biết là tôi vừa trải qua quá trình nâng cấp hệ thống toàn diện cho dự án ở nơi làm việc. Có rất nhiều điều phải làm, nhiều kiến thức mới và nhiều lần vật lộn với hàng tấn dữ liệu tích tụ qua nhiều năm. Cho đến giờ thì mọi thứ có vẻ tạm ổn, công việc đỡ dồn dập hơn nhiều so với trước đây.
Bình thường nếu muốn xử lý một lượng lớn dữ liệu của người dùng thì các bạn sẽ làm thế nào? Lấy một ví dụ cụ thể đi! Đó là migrate dữ liệu người dùng nhằm tương thích với phiên bản mới của ứng dụng. Việc đó giống như là thêm, bớt hoặc sửa trường dữ liệu nào đó chẳng hạn... Tôi chọn cách viết một đoạn script rồi lựa thời điểm thích hợp mà chạy. Đó không phải là cách tốt nhất nhưng ắt hẳn phù hợp với tình hình hiện tại với dự án của chúng tôi. Thông thường logic trong script cần phải thực hiện nhanh và chính xác như những gì mong muốn. Tất nhiên trong hàng triệu bản ghi, ắt sẽ có cái bất quy tắc, đẩy ra ngoại lệ hoặc thậm chí là lỗi. Vì vậy cần phải kiểm soát được lỗi và nắm bắt cả nguyên nhân, từ đó tìm cách giải quyết.
Như thường lệ hôm đầu tuần tôi nhận nhiệm vụ xử lý dữ liệu trên hơn 4 triệu bản ghi của người dùng. Với lượng dữ liệu lớn như thế mà đảm bảo được các tiêu chí ở trên thì chúng tôi lựa chọn phong cách xử lý lần lượt, hay nói cách khác là tuân theo mô hình publisher - consumer, giao tiếp với nhau thông qua message queue. Dữ liệu được publisher đẩy vào và được consumer lấy ra xử lý. Mô hình này phù hợp ở chỗ có thể kiểm soát được quá trình xử lý. Ví dụ kiểm soát được dữ liệu đầu vào và logic xử lý ở consumer. Nếu hệ thống đủ mạnh, hoàn toàn có thể tạo ra nhiều consumer tham gia vào đẩy nhanh quá trình xử lý. Nếu xảy ra lỗi, đẩy bản ghi gây ra lỗi đó vào một hàng đợi khác, sau này có thể lấy ra và thống kê cũng như xử lý lại thông điệp bị lỗi đó.
Vì công việc tương tự thế này diễn ra nhiều lần trước đây nên tôi có một bộ khung để triển khai mô hình publisher - consumer. Một phần đoạn mã của publisher có nhiệm vụ đẩy thông điệp vào hàng đợi trông giống như sau.
async function myPublisher() {
await connectDatabases();
await connectRabbit();
const cursor = mongoose.connection
.collection("users")
.find()
.sort({ _id: -1 })
.batchSize(10000);
while (await cursor.hasNext()) {
const user = await cursor.next();
publish(RABBITMQ_ROUTING_KEY, { userId: user._id.toString() });
console.log('publish', { userId: user._id.toString() });
}
}
Đoạn mã trên tìm ra tất cả users trong mongo rồi gọi hàm publish để đẩy message có nội dung { userId: user._id.toString() } lên rabbit.
Đoạn mã trong hàm publish trông giống như sau.
function publish(routingKey, msgObj) {
if (!channel) {
console.warn("[RabbitMQ] Channel not ready, retrying...");
return setTimeout(() => publish(routingKey, msgObj), 500);
}
const msgBuffer = Buffer.from(JSON.stringify(msgObj));
channel.publish(EXCHANGE, routingKey, msgBuffer, { persistent: true });
}
Trước đó thì phải gọi hàm connectRabbit để khởi tạo kết nối, nội dung của hàm đó trông giống như sau.
async function connectRabbit() {
try {
connection = await amqp.connect(RABBITMQ_URL);
connection.on("error", (err) => {
console.error("[RabbitMQ] Connection error:", err.message);
setTimeout(connectRabbit, 1000); // Reconnect
});
connection.on("close", () => {
console.warn("[RabbitMQ] Connection closed. Reconnecting...");
setTimeout(connectRabbit, 1000); // Reconnect
});
channel = await connection.createChannel();
await channel.assertExchange(EXCHANGE, "direct", { durable: true });
console.log("[RabbitMQ] Connected and channel created");
return channel;
} catch (err) {
console.error("[RabbitMQ] Connection failed:", err.message);
setTimeout(connectRabbit, 1000); // Retry
}
}
channel trong hàm publish được khởi tạo từ connectRabbit. publish là một hàm đồng bộ vì vậy theo suy nghĩ thì mỗi lần gọi nó message được đẩy lên máy chủ rabbit và rabbit dựa vào routing key để điều hướng nó vào hàng đợi thích hợp. Cứ đinh ninh điều đó là đúng cho đến khi dữ liệu truy vấn ra quá lớn, lên đến hơn 4 triệu bản ghi. Khi chạy script của publisher, những dòng thông báo từ console.log('publish', { userId: user._id.toString() }) liên tục xuất hiện trên màn hình nhưng khi kiểm tra thì không thấy hàng đợi xuất hiện message nào. Điều này quả là đi ngược lại với thiết kế tập trung vào băng thông để giúp rabbit xử lý lượng lớn tin nhắn với tốc độ đáng kinh ngạc. Kiên trì đợi cho đến khi vòng lặp đạt đến con số tới hạn thì... bùm: Lỗi.
Dòng lỗi đó hết sức ngắn gọn, nó như một lời cảnh báo rằng quá trình ghi thất bại và ngắt kết nối. Suy nghĩ một lúc thì đoán vấn đề nằm ở chỗ tại sao vòng lặp liên tục chạy, message liên tục đẩy lên nhưng hàng đợi lại không nhận được tin nhắn? Nếu giải quyết được vấn đề đó thì tôi tin chắc rằng lỗi kia sẽ không xuất hiện nữa. Những lần trước có thể dữ liệu chưa đủ lớn, hoặc do không để ý vì thời gian chạy publisher quá ngắn nên thời gian vòng lặp chạy nhanh hơn thời gian quan sát hàng đợi.
Loay hoay một lúc thì hoá ra vấn đề nằm ở hàm publish. Hàm publish không hoạt động theo cách mà tôi đang nghĩ, mỗi lần gọi, nó không đẩy message lên rabbit ngay lập tức mà chờ cho đến thời điểm thích hợp thì mới đẩy tin nhắn đi. Khi gọi publish, hàm trả về giá trị true hoặc false. true mang thông điệp "Ok, hãy gửi tiếp tin nhắn đi nào, tôi xử lý được", còn false tức "Dừng lại! Hãy đợi tôi xử lý xong phần tin nhắn trước", khi đó cần phải chờ sự kiện drain trước khi tiếp tục gọi hàm publish. RabbitMQ gọi nó là Flow control.
Như vậy rõ ràng đoạn mã trên đang không tuân theo quy tắc này. Nó liên tục gọi publish một cách ồ ạt khiến rabbit không có thời gian mà xử lý tin nhắn. Như vậy cách làm đúng phải là kiểm tra trạng thái true/false mỗi khi gọi publish. Như vậy cần phải sửa lại hàm publish như sau.
function publish(routingKey, msgObj) {
if (!channel) {
console.warn("[RabbitMQ] Channel not ready, retrying...");
return setTimeout(() => publish(routingKey, msgObj), 500);
}
const msgBuffer = Buffer.from(JSON.stringify(msgObj));
return channel.publish(EXCHANGE, routingKey, msgBuffer, { persistent: true });
}
Khi đó đoạn mã trong publisher trông giống như sau.
async function myPublisher() {
await connectDatabases();
await connectRabbit();
const cursor = mongoose.connection
.collection("users")
.find()
.sort({ _id: -1 })
.batchSize(10000);
while (await cursor.hasNext()) {
const user = await cursor.next();
const canContinue = publish(RABBITMQ_ROUTING_KEY, { userId: user._id.toString() });
console.log('publish', { userId: user._id.toString() });
if (!canContinue) {
console.log("Buffer full, waiting...");
await new Promise((resolve) => channel.once("drain", resolve));
}
}
}
Bây giờ chỉ cần chạy lại publisher thì message lần lượt xuất hiện trong hàng đợi theo thời gian thực. Thế là xong!
Qua bài viết, tôi đã chia sẻ vấn đề gặp phải khi xử lý lượng lớn dữ liệu với RabbitMQ trong mô hình publisher-consumer. Khi dữ liệu vượt quá 4 triệu bản ghi, việc không tuân theo cơ chế kiểm soát dòng (Flow control) của RabbitMQ đã khiến hệ thống gặp lỗi, gây gián đoạn quá trình xử lý tin nhắn. Điều này xuất phát từ việc hàm publish liên tục được gọi một cách ồ ạt mà không kiểm tra trạng thái true/false, dẫn đến việc bộ đệm của RabbitMQ bị quá tải và hệ thống ngắt kết nối. Để khắc phục, tôi đã sửa đổi logic trong publisher để kiểm tra trạng thái của publish và chờ sự kiện drain khi cần thiết, đảm bảo quá trình đẩy tin nhắn diễn ra trơn tru và hiệu quả.
Trên đây chỉ là một trong các cách để xử lý vấn đề đang gặp phải. Còn bạn thì sao? Có đang sử dụng RabbitMQ không? Bạn đang xử lý một lượng lớn dữ liệu như thế nào? Hãy để lại bình luận dưới bài viết nhé. Xin cảm ơn.