如何在Web應用程序中使用隊列 – Node.js和Redis教程
消息隊列通過為通常的請求-響應過程提供額外的分支來幫助解決這個問題。這個額外的分支有助于確保用戶能夠立即得到響應,并且可以暫時完成耗時的流程。大家都高高興興回家了。
本文將重點介紹什么是消息隊列以及如何通過構建一個非常簡單的應用程序來開始使用它們。您應該熟悉 Node.js 的基礎知識,并且應該在本地或云實例上安裝 Redis。在此處了解如何安裝 Redis 。
(更|多優(yōu)質內(nèi)|容:java567 點 c0m)
什么是隊列?
隊列是一種數(shù)據(jù)結構,允許您按順序存儲實體。隊列使用先進先出 (FIFO) 原則。
計算機科學中的隊列概念與日常生活中人們排隊取東西的隊列概念相同。您從后面加入隊列,等到輪到您時,然后在有人照顧后從前面離開隊列。
在計算機科學中,當 API 請求等流程正在運行時,您需要從當前流程中刪除某個任務(例如發(fā)送電子郵件),您可以將其推送到隊列并繼續(xù)該流程。
什么是工作?
作業(yè)是隊列上使用的任何數(shù)據(jù),通常是類似 JSON 的對象。
正如本文封面圖片所示,您可以將工作視為機場隊列中的每個人。每個人都拿著一個公文包,其中包含特定數(shù)據(jù)和其他說明(護照,可能還需要醫(yī)療文件),這些說明在輪到他們時會有所幫助。
加入隊列的新人將從后面進入(作為最后一個人),人們將從前面受到照顧。這也是處理作業(yè)的方式,每個作業(yè)都包含將用于其處理的數(shù)據(jù)。新的工作是從后面添加的,而工作是從前面刪除的。
什么是工作生產(chǎn)者?
作業(yè)生產(chǎn)者是將作業(yè)添加到隊列的任何代碼段。在現(xiàn)實生活中,這將是機場的保安人員,為人們指引方向,告訴他們?yōu)榱瞬煌哪康亩尤肽膫€隊列。
作業(yè)生產(chǎn)者可以獨立于作業(yè)消費者而存在。這意味著在微服務設置中,特定服務可能只關心將作業(yè)添加到隊列中,而不關心之后如何處理它們。
什么是工人(工作消費者)?
工作人員或作業(yè)使用者是可以執(zhí)行作業(yè)的進程或函數(shù)。將工作人員想象為銀行出納員,負責在銀行排隊的人。當?shù)谝粋€人進來時,他們作為隊列中唯一的人加入隊列。然后收銀員招呼他們,隊列就被清空了。
收銀員要求該人提供用于處理交易的具體詳細信息。當收銀員接待該顧客時,可能還有另外四名顧客在排隊。他們將一直排隊,直到收銀員處理完第一位顧客,然后再叫下一位顧客。這與隊列工作人員的過程相同 - 他們選擇隊列中的第一個作業(yè)并處理它。
什么是失敗的工作?
很多時候,某些作業(yè)可能會在處理過程中失敗。
以下是作業(yè)可能失敗的一些原因:
輸入數(shù)據(jù)無效或丟失:當要處理的作業(yè)所需的數(shù)據(jù)丟失時,作業(yè)將失敗。例如,如果沒有收件人的電子郵件地址,發(fā)送電子郵件的作業(yè)將會失敗。
超時:如果作業(yè)花費的時間比平常長,隊列機制可能會導致作業(yè)失敗。這可能是由于作業(yè)的依賴性問題或其他原因造成的,但通常您不希望單個作業(yè)永遠運行。
網(wǎng)絡或基礎設施問題:這些問題幾乎超出您的控制范圍,但它們確實會發(fā)生。例如,數(shù)據(jù)庫連接錯誤會導致作業(yè)失敗。
依賴性問題:有時一項工作需要依賴一些外部資源才能正常運行。每當這些其他資源不可用或不成功時,作業(yè)就會失敗。
當作業(yè)失敗時,您可以配置隊列機制來重試它們。您可以立即重試該作業(yè),也可以在計算出的時間后重試。您可以設置最大嘗試次數(shù),建議這樣做。如果沒有,您最終運行的作業(yè)將永遠失敗。
為什么使用隊列?
隊列對于在微服務之間創(chuàng)建強大的通信通道非常有用。多個服務可以使用同一個隊列。不同的服務可能負責解決不同的問題。當服務完成其任務時,它可以將作業(yè)推送到另一個有工作人員等待該作業(yè)的服務。該服務將接收它并對數(shù)據(jù)執(zhí)行任何需要的操作。
隊列對于從進程中卸載繁重的任務也很有用。正如您將在本文中看到的,可以將發(fā)送電子郵件等耗時的任務放入隊列中,以避免減慢響應時間。
隊列有助于避免單點故障。能夠失敗并可以重試的進程最好使用隊列進行處理,在一段時間后可以重試。
如何構建使用隊列的簡單應用程序
在本文中,我們將使用 Node.js 和Redis構建一個簡單的項目。我們將使用Bull庫,因為它簡化了構建隊列系統(tǒng)所涉及的許多復雜性。該項目將有一個端點來發(fā)送電子郵件。
創(chuàng)建一個新的 Node.js 項目
?mkdir nodejs-queue-project
?cd nodejs-queue-project
?npm init -y
上面的命令將創(chuàng)建一個名為 的新文件夾nodejs-queue-project并package.json在其中創(chuàng)建一個文件。該package.json文件應如下所示:
?{
? ?"name": "nodejs-queue-project",
? ?"version": "1.0.0",
? ?"description": "",
? ?"main": "index.js",
? ?"scripts": {
? ? ?"test": "echo \"Error: no test specified\" && exit 1"
? ?},
? ?"keywords": [],
? ?"author": "",
? ?"license": "ISC"
?}
安裝所需的依賴項
?npm i express @types/express @types/node body-parser ts-node ts-lint typescript nodemon nodemailer @types/nodemailer
上面的命令將安裝項目所需的不同包和依賴項。
安裝后,您可以更新scripts您的部分package.json以獲取dev命令。您的整個package.json文件現(xiàn)在應該如下所示:
?{
? ?"name": "nodejs-queue-project",
? ?"version": "1.0.0",
? ?"description": "",
? ?"main": "index.js",
? ?"scripts": {
? ? ?"dev": "nodemon src/app.ts"
? ?},
? ?"keywords": [],
? ?"author": "",
? ?"license": "ISC",
? ?"dependencies": {
? ? ?"@types/express": "^4.17.17",
? ? ?"@types/node": "^20.3.3",
? ? ?"@types/nodemailer": "^6.4.8",
? ? ?"body-parser": "^1.20.2",
? ? ?"express": "^4.18.2",
? ? ?"nodemailer": "^6.9.3",
? ? ?"nodemon": "^2.0.22",
? ? ?"ts-lint": "^4.5.1",
? ? ?"ts-node": "^10.9.1",
? ? ?"typescript": "^5.1.6"
? ?}
?}
上面的文件顯示了所有已安裝的依賴項。當您使用腳本時,該npm run dev命令將運行dev。
如何構建端點
要做的第一件事是創(chuàng)建一個名為 的新文件夾src。該文件夾將包含您的所有代碼文件。它將包含的第一個文件是應用程序的根文件 -文件app.ts中定義的文件package.json。
我們將使用該app.ts文件導入所需的包并創(chuàng)建一個具有單個端點的簡單服務器來發(fā)送電子郵件,如下所示:
?import express from "express";
?import bodyParser from "body-parser";
?import nodemailer from "nodemailer";
?
?const app = express();
?
?app.use(bodyParser.json());
?
?app.post("/send-email", async (req, res) => {
? ?const { from, to, subject, text } = req.body;
?
? ?// Use a test account as this is a tutorial
? ?const testAccount = await nodemailer.createTestAccount();
?
? ?const transporter = nodemailer.createTransport({
? ? ?host: "smtp.ethereal.email",
? ? ?port: 587,
? ? ?secure: false,
? ? ?auth: {
? ? ? ?user: testAccount.user,
? ? ? ?pass: testAccount.pass,
? ? ?},
? ? ?tls: {
? ? ? ?rejectUnauthorized: false,
? ? ?},
? ?});
?
? ?console.log("Sending mail to %s", to);
?
? ?let info = await transporter.sendMail({
? ? ?from,
? ? ?to,
? ? ?subject,
? ? ?text,
? ? ?html: `<strong>${text}</strong>`,
? ?});
?
? ?console.log("Message sent: %s", info.messageId);
? ?console.log("Preview URL: %s", nodemailer.getTestMessageUrl(info));
?
? ?res.json({
? ? ?message: "Email Sent",
? ?});
?});
?
?app.listen(4300, () => {
? ?console.log("Server started at //localhost:4300");
?});
?npm run dev`現(xiàn)在,您可以通過在終端中運行來啟動服務器。您應該在終端中看到一條消息。`Server started at //localhost:4300
npm 運行開發(fā)消息
您現(xiàn)在可以使用 Postman 等工具測試端點:
Postman 的端點測試
如屏幕截圖所示,請求花費了近 4 秒。這對于端點來說非常慢。如果您查看終端,您還應該看到一個 URL,您可以在其中預覽已發(fā)送的電子郵件。
打開鏈接即可查看電子郵件的外觀。
郵件內(nèi)容
如何創(chuàng)建隊列
為了使該過程更快,可以將電子郵件排隊以便稍后發(fā)送,并立即將響應發(fā)送給用戶。
為此,請安裝該bull庫及其@types庫,因為我們將使用它來創(chuàng)建隊列。那是:
?npm i bull @types/bull
使用創(chuàng)建新隊列bull就像使用Bull隊列名稱實例化一個新對象一樣簡單:
?// This goes at the top of your file
?import Bull from 'bull';
?
?const emailQueue = new Bull("email");
當僅使用隊列名稱創(chuàng)建隊列時,它會嘗試使用默認的 Redis 連接 URL:localhost:6379。如果您更喜歡使用不同的 URL,只需將第二個對象Bull作為選項對象傳遞給該類即可:
?const emailQueue = new Bull("email", {
? ?redis: "localhost:6379",
?});
此時,您可以創(chuàng)建一個簡單的函數(shù)來充當作業(yè)生產(chǎn)者,并在每次請求到來時將作業(yè)添加到隊列中。
?type EmailType = {
? ?from: string;
? ?to: string;
? ?subject: string;
? ?text: string;
?};
?
?const sendNewEmail = async (email: EmailType) => {
? ?emailQueue.add({ ...email });
?};
這個新創(chuàng)建的函數(shù)sendNewEmail接受一個對象,其中包含要發(fā)送的類型為 的新電子郵件的詳細信息EmailType。包括電子郵件的發(fā)件人電子郵件地址 ( from)、收件人電子郵件地址 ( to)subject以及電子郵件的內(nèi)容 ( text)。然后它將新作業(yè)推送到隊列中。
您現(xiàn)在可以使用此功能,而不是在請求期間發(fā)送電子郵件。修改端點以執(zhí)行此操作:
?app.post("/send-email", async (req, res) => {
? ?const { from, to, subject, text } = req.body;
?
? ?await sendNewEmail({ from, to, subject, text });
?
? ?console.log("Added to queue");
?
? ?res.json({
? ? ?message: "Email Sent",
? ?});
?});
此時,代碼更簡單,過程也更快。該請求只需要大約 40m——比以前快了大約 100 倍。
使用 Postman 進行端點測試
此時,電子郵件已添加到隊列中。它將保留在隊列中直到被處理。該作業(yè)可以由同一應用程序或另一個服務(如果在微服務設置中)處理。
如何處理工作
如果郵件從未離開隊列,則該循環(huán)是不完整且無用的。我們將創(chuàng)建一個作業(yè)使用者來處理作業(yè)并清除隊列。
Job我們可以通過為接受對象并發(fā)送電子郵件的函數(shù)創(chuàng)建邏輯來做到這一點:
?const processEmailQueue = async (job: Job) => {
? ?// Use a test account as this is a tutorial
? ?const testAccount = await nodemailer.createTestAccount();
?
? ?const transporter = nodemailer.createTransport({
? ? ?host: "smtp.ethereal.email",
? ? ?port: 587,
? ? ?secure: false,
? ? ?auth: {
? ? ? ?user: testAccount.user,
? ? ? ?pass: testAccount.pass,
? ? ?},
? ? ?tls: {
? ? ? ?rejectUnauthorized: false,
? ? ?},
? ?});
?
? ?const { from, to, subject, text } = job.data;
?
? ?console.log("Sending mail to %s", to);
?
? ?let info = await transporter.sendMail({
? ? ?from,
? ? ?to,
? ? ?subject,
? ? ?text,
? ? ?html: `<strong>${text}</strong>`,
? ?});
?
? ?console.log("Message sent: %s", info.messageId);
? ?console.log("Preview URL: %s", nodemailer.getTestMessageUrl(info));
? ?
? ?return nodemailer.getTestMessageUrl(info);
?};
上面的函數(shù)接受一個Job對象。該對象具有有用的屬性,可顯示作業(yè)的狀態(tài)和數(shù)據(jù)。在這里,我們使用該data屬性。
此時,我們所擁有的只是一個函數(shù)。它不會自動獲取作業(yè),因為它不知道要使用哪個隊列。
在將其連接到隊列之前,您可以通過發(fā)送一些請求來繼續(xù)將一些作業(yè)添加到隊列中。您可以通過在以下位置運行此命令來檢查當前排隊的電子郵件作業(yè)redis-cli:
?LRANGE bull:email:wait 0 -1
這會檢查電子郵件等待列表,并返回ids等待作業(yè)的列表。
Redis 命令行界面
我創(chuàng)造了一些工作只是為了展示工人的實際工作方式。
現(xiàn)在,通過添加以下代碼行將工作線程連接到隊列:
?emailQueue.process(processEmailQueue);
這就是您的app.ts文件現(xiàn)在應該處理的內(nèi)容:
?import express from "express";
?import bodyParser from "body-parser";
?import nodemailer from "nodemailer";
?import Bull, { Job } from "bull";
?
?const app = express();
?
?app.use(bodyParser.json());
?
?const emailQueue = new Bull("email", {
? ?redis: "localhost:6379",
?});
?
?type EmailType = {
? ?from: string;
? ?to: string;
? ?subject: string;
? ?text: string;
?};
?
?const sendNewEmail = async (email: EmailType) => {
? ?emailQueue.add({ ...email });
?};
?
?const processEmailQueue = async (job: Job) => {
? ?// Use a test account as this is a tutorial
? ?const testAccount = await nodemailer.createTestAccount();
?
? ?const transporter = nodemailer.createTransport({
? ? ?host: "smtp.ethereal.email",
? ? ?port: 587,
? ? ?secure: false,
? ? ?auth: {
? ? ? ?user: testAccount.user,
? ? ? ?pass: testAccount.pass,
? ? ?},
? ? ?tls: {
? ? ? ?rejectUnauthorized: false,
? ? ?},
? ?});
?
? ?const { from, to, subject, text } = job.data;
?
? ?console.log("Sending mail to %s", to);
?
? ?let info = await transporter.sendMail({
? ? ?from,
? ? ?to,
? ? ?subject,
? ? ?text,
? ? ?html: `<strong>${text}</strong>`,
? ?});
?
? ?console.log("Message sent: %s", info.messageId);
? ?console.log("Preview URL: %s", nodemailer.getTestMessageUrl(info));
?};
?
?emailQueue.process(processEmailQueue);
?
?app.post("/send-email", async (req, res) => {
? ?const { from, to, subject, text } = req.body;
?
? ?await sendNewEmail({ from, to, subject, text });
?
? ?console.log("Added to queue");
?
? ?res.json({
? ? ?message: "Email Sent",
? ?});
?});
?
?app.listen(4300, () => {
? ?console.log("Server started at //localhost:4300");
?});
保存后,您會注意到服務器重新啟動并立即開始發(fā)送郵件。這是因為工作人員看到隊列并立即開始處理。
服務器發(fā)送排隊的電子郵件
現(xiàn)在,生產(chǎn)者和工人都活躍起來。每個新的 API 請求都將被推送到隊列中,并且工作線程將立即處理它,除非已經(jīng)有一些待處理的作業(yè)。