在沒有深度使用函數回呼的經驗的時候,去看這些內容還是有一點吃力的。由於Node.js獨特的非同步特性,才出現了「回調地獄」的問題,在這篇文章中,我比較詳細的記錄瞭如何解決異步流問題。本文主要介紹了淺談Node.js之非同步流控制,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟著小編過來看看吧,希望能幫助大家。
文章會很長,而且這篇是對非同步流模式的解釋。文中會使用一個簡單的網路蜘蛛的例子,它的作用是抓取指定URL的網頁內容並保存在專案中,在文章的最後,可以找到整篇文章中的原始碼demo。
1.原生JavaScript模式
本篇不針對初學者,因此會省略掉大部分的基礎內容的講解:
(spider_v1 .js)
const request = require("request"); const fs = require("fs"); const mkdirp = require("mkdirp"); const path = require("path"); const utilities = require("./utilities"); function spider(url, callback) { const filename = utilities.urlToFilename(url); console.log(`filename: ${filename}`); fs.exists(filename, exists => { if (!exists) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { callback(err); } else { mkdirp(path.dirname(filename), err => { if (err) { callback(err); } else { fs.writeFile(filename, body, err => { if (err) { callback(err); } else { callback(null, filename, true); } }); } }); } }); } else { callback(null, filename, false); } }); } spider(process.argv[2], (err, filename, downloaded) => { if (err) { console.log(err); } else if (downloaded) { console.log(`Completed the download of ${filename}`); } else { console.log(`${filename} was already downloaded`); } });
上邊的程式碼的流程大概是這樣的:
把url轉換成filename
判斷該檔案名稱是否存在,若存在直接返回,否則進入下一步
#發送請求,取得body
/// before if (err) { callback(err); } else { callback(null, filename, true); } /// after if (err) { return callback(err); } callback(null, filename, true);
(spider_v2.js)
const request = require("request"); const fs = require("fs"); const mkdirp = require("mkdirp"); const path = require("path"); const utilities = require("./utilities"); function saveFile(filename, contents, callback) { mkdirp(path.dirname(filename), err => { if (err) { return callback(err); } fs.writeFile(filename, contents, callback); }); } function download(url, filename, callback) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { return callback(err); } saveFile(filename, body, err => { if (err) { return callback(err); } console.log(`Downloaded and saved: ${url}`); callback(null, body); }); }) } function spider(url, callback) { const filename = utilities.urlToFilename(url); console.log(`filename: ${filename}`); fs.exists(filename, exists => { if (exists) { return callback(null, filename, false); } download(url, filename, err => { if (err) { return callback(err); } callback(null, filename, true); }) }); } spider(process.argv[2], (err, filename, downloaded) => { if (err) { console.log(err); } else if (downloaded) { console.log(`Completed the download of ${filename}`); } else { console.log(`${filename} was already downloaded`); } });
(spider_v3.js)
const request = require("request"); const fs = require("fs"); const mkdirp = require("mkdirp"); const path = require("path"); const utilities = require("./utilities"); function saveFile(filename, contents, callback) { mkdirp(path.dirname(filename), err => { if (err) { return callback(err); } fs.writeFile(filename, contents, callback); }); } function download(url, filename, callback) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { return callback(err); } saveFile(filename, body, err => { if (err) { return callback(err); } console.log(`Downloaded and saved: ${url}`); callback(null, body); }); }) } /// 最大的启发是实现了如何异步循环遍历数组 function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); function iterate(index) { if (index === links.length) { return callback(); } spider(links[index], nesting - 1, err => { if (err) { return callback(err); } iterate((index + 1)); }) } iterate(0); } function spider(url, nesting, callback) { const filename = utilities.urlToFilename(url); fs.readFile(filename, "utf8", (err, body) => { if (err) { if (err.code !== 'ENOENT') { return callback(err); } return download(url, filename, (err, body) => { if (err) { return callback(err); } spiderLinks(url, body, nesting, callback); }); } spiderLinks(url, body, nesting, callback); }); } spider(process.argv[2], 2, (err, filename, downloaded) => { if (err) { console.log(err); } else if (downloaded) { console.log(`Completed the download of ${filename}`); } else { console.log(`${filename} was already downloaded`); } });
上邊的程式碼相比之前的程式碼多了兩個核心功能,首先是透過輔助類別取得到了某個body中的links:
const links = utilities.getPageLinks(currentUrl, body);
/// 最大的启发是实现了如何异步循环遍历数组 function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); function iterate(index) { if (index === links.length) { return callback(); } spider(links[index], nesting - 1, err => { if (err) { return callback(err); } iterate((index + 1)); }) } iterate(0); }
可以說上邊這一小段程式碼,就是採用原生實作非同步串列的pattern了。除了這些之外,還引入了nesting的概念,透過這是這個屬性,可以控制抓取層次。
到這裡我們就完整的實現了串行的功能,考慮到性能,我們要開發並行抓取的功能。
const request = require("request"); const fs = require("fs"); const mkdirp = require("mkdirp"); const path = require("path"); const utilities = require("./utilities"); function saveFile(filename, contents, callback) { mkdirp(path.dirname(filename), err => { if (err) { return callback(err); } fs.writeFile(filename, contents, callback); }); } function download(url, filename, callback) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { return callback(err); } saveFile(filename, body, err => { if (err) { return callback(err); } console.log(`Downloaded and saved: ${url}`); callback(null, body); }); }) } /// 最大的启发是实现了如何异步循环遍历数组 function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } let completed = 0, hasErrors = false; function done(err) { if (err) { hasErrors = true; return callback(err); } if (++completed === links.length && !hasErrors) { return callback(); } } links.forEach(link => { spider(link, nesting - 1, done); }); } const spidering = new Map(); function spider(url, nesting, callback) { if (spidering.has(url)) { return process.nextTick(callback); } spidering.set(url, true); const filename = utilities.urlToFilename(url); /// In this pattern, there will be some issues. /// Possible problems to download the same url again and again。 fs.readFile(filename, "utf8", (err, body) => { if (err) { if (err.code !== 'ENOENT') { return callback(err); } return download(url, filename, (err, body) => { if (err) { return callback(err); } spiderLinks(url, body, nesting, callback); }); } spiderLinks(url, body, nesting, callback); }); } spider(process.argv[2], 2, (err, filename, downloaded) => { if (err) { console.log(err); } else if (downloaded) { console.log(`Completed the download of ${filename}`); } else { console.log(`${filename} was already downloaded`); } });
/// 最大的启发是实现了如何异步循环遍历数组 function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } let completed = 0, hasErrors = false; function done(err) { if (err) { hasErrors = true; return callback(err); } if (++completed === links.length && !hasErrors) { return callback(); } } links.forEach(link => { spider(link, nesting - 1, done); }); }
上邊的程式碼可以說是實作並發的一個pattern。利用迴圈遍歷來實現。另一個核心是,既然是並發的,那麼利用fs.exists 就會存在問題,可能會重複下載同一文件,這裡的解決方案是:
使用Map快取某一url,url應該作為key
現在我們又有了新的需求,要求限制同時並發的最大數,那麼在這裡就引進了一個我認為最重要的概念:隊列。
(task-Queue.js)
class TaskQueue { constructor(concurrency) { this.concurrency = concurrency; this.running = 0; this.queue = []; } pushTask(task) { this.queue.push(task); this.next(); } next() { while (this.running < this.concurrency && this.queue.length) { const task = this.queue.shift(); task(() => { this.running--; this.next(); }); this.running++; } } } module.exports = TaskQueue;
上邊的程式碼就是佇列的實作程式碼,核心是next() 方法,可以看出,當task加入佇列後,會立刻執行,這不是說這個任務一定馬上執行,而是指的是next會立刻呼叫。
(spider_v5.js)
const request = require("request"); const fs = require("fs"); const mkdirp = require("mkdirp"); const path = require("path"); const utilities = require("./utilities"); const TaskQueue = require("./task-Queue"); const downloadQueue = new TaskQueue(2); function saveFile(filename, contents, callback) { mkdirp(path.dirname(filename), err => { if (err) { return callback(err); } fs.writeFile(filename, contents, callback); }); } function download(url, filename, callback) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { return callback(err); } saveFile(filename, body, err => { if (err) { return callback(err); } console.log(`Downloaded and saved: ${url}`); callback(null, body); }); }) } /// 最大的启发是实现了如何异步循环遍历数组 function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } let completed = 0, hasErrors = false; links.forEach(link => { /// 给队列出传递一个任务,这个任务首先是一个函数,其次该函数接受一个参数 /// 当调用任务时,触发该函数,然后给函数传递一个参数,告诉该函数在任务结束时干什么 downloadQueue.pushTask(done => { spider(link, nesting - 1, err => { /// 这里表示,只要发生错误,队列就会退出 if (err) { hasErrors = true; return callback(err); } if (++completed === links.length && !hasErrors) { callback(); } done(); }); }); }); } const spidering = new Map(); function spider(url, nesting, callback) { if (spidering.has(url)) { return process.nextTick(callback); } spidering.set(url, true); const filename = utilities.urlToFilename(url); /// In this pattern, there will be some issues. /// Possible problems to download the same url again and again。 fs.readFile(filename, "utf8", (err, body) => { if (err) { if (err.code !== 'ENOENT') { return callback(err); } return download(url, filename, (err, body) => { if (err) { return callback(err); } spiderLinks(url, body, nesting, callback); }); } spiderLinks(url, body, nesting, callback); }); } spider(process.argv[2], 2, (err, filename, downloaded) => { if (err) { console.log(`error: ${err}`); } else if (downloaded) { console.log(`Completed the download of ${filename}`); } else { console.log(`${filename} was already downloaded`); } });
把不同的功能放到不同的函式中,會為我們帶來巨大的好處,async函式庫十分流行,它的性能也不錯,它內部基於callback。
const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
const series = require("async/series");
const eachSeries = require("async/eachSeries");
function download(url, filename, callback) {
console.log(`Downloading ${url}`);
let body;
series([
callback => {
request(url, (err, response, resBody) => {
if (err) {
return callback(err);
}
body = resBody;
callback();
});
},
mkdirp.bind(null, path.dirname(filename)),
callback => {
fs.writeFile(filename, body, callback);
}
], err => {
if (err) {
return callback(err);
}
console.log(`Downloaded and saved: ${url}`);
callback(null, body);
});
}
/// 最大的启发是实现了如何异步循环遍历数组
function spiderLinks(currentUrl, body, nesting, callback) {
if (nesting === 0) {
return process.nextTick(callback);
}
const links = utilities.getPageLinks(currentUrl, body);
if (links.length === 0) {
return process.nextTick(callback);
}
eachSeries(links, (link, cb) => {
"use strict";
spider(link, nesting - 1, cb);
}, callback);
}
const spidering = new Map();
function spider(url, nesting, callback) {
if (spidering.has(url)) {
return process.nextTick(callback);
}
spidering.set(url, true);
const filename = utilities.urlToFilename(url);
fs.readFile(filename, "utf8", (err, body) => {
if (err) {
if (err.code !== 'ENOENT') {
return callback(err);
}
return download(url, filename, (err, body) => {
if (err) {
return callback(err);
}
spiderLinks(url, body, nesting, callback);
});
}
spiderLinks(url, body, nesting, callback);
});
}
spider(process.argv[2], 1, (err, filename, downloaded) => {
if (err) {
console.log(err);
} else if (downloaded) {
console.log(`Completed the download of ${filename}`);
} else {
console.log(`${filename} was already downloaded`);
}
});
const series = require("async/series"); // 串行 const eachSeries = require("async/eachSeries"); // 并行 const queue = require("async/queue"); // 队列
其实Promise的内容有很多,在实际应用中是如何把普通的函数promise化。这方面的内容在这里也不讲了,我自己也不够格
(spider_v8.js)
const utilities = require("./utilities"); const request = utilities.promisify(require("request")); const fs = require("fs"); const readFile = utilities.promisify(fs.readFile); const writeFile = utilities.promisify(fs.writeFile); const mkdirp = utilities.promisify(require("mkdirp")); const path = require("path"); function saveFile(filename, contents, callback) { mkdirp(path.dirname(filename), err => { if (err) { return callback(err); } fs.writeFile(filename, contents, callback); }); } function download(url, filename) { console.log(`Downloading ${url}`); let body; return request(url) .then(response => { "use strict"; body = response.body; return mkdirp(path.dirname(filename)); }) .then(() => writeFile(filename, body)) .then(() => { "use strict"; console.log(`Downloaded adn saved: ${url}`); return body; }); } /// promise编程的本质就是为了解决在函数中设置回调函数的问题 /// 通过中间层promise来实现异步函数同步化 function spiderLinks(currentUrl, body, nesting) { let promise = Promise.resolve(); if (nesting === 0) { return promise; } const links = utilities.getPageLinks(currentUrl, body); links.forEach(link => { "use strict"; promise = promise.then(() => spider(link, nesting - 1)); }); return promise; } function spider(url, nesting) { const filename = utilities.urlToFilename(url); return readFile(filename, "utf8") .then( body => spiderLinks(url, body, nesting), err => { "use strict"; if (err.code !== 'ENOENT') { /// 抛出错误,这个方便与在整个异步链的最后通过呢catch来捕获这个链中的错误 throw err; } return download(url, filename) .then(body => spiderLinks(url, body, nesting)); } ); } spider(process.argv[2], 1) .then(() => { "use strict"; console.log('Download complete'); }) .catch(err => { "use strict"; console.log(err); });
可以看到上边的代码中的函数都是没有callback的,只需要在最后catch就可以了。
在设计api的时候,应该支持两种方式,及支持callback,又支持promise
function asyncpision(pidend, pisor, cb) { return new Promise((resolve, reject) => { "use strict"; process.nextTick(() => { const result = pidend / pisor; if (isNaN(result) || !Number.isFinite(result)) { const error = new Error("Invalid operands"); if (cb) { cb(error); } return reject(error); } if (cb) { cb(null, result); } resolve(result); }); }); } asyncpision(10, 2, (err, result) => { "use strict"; if (err) { return console.log(err); } console.log(result); }); asyncpision(22, 11) .then((result) => console.log(result)) .catch((err) => console.log(err));
4.Generator
Generator很有意思,他可以让暂停函数和恢复函数,利用thunkify和co这两个库,我们下边的代码实现起来非常酷。
(spider_v9.js)
const thunkify = require("thunkify"); const co = require("co"); const path = require("path"); const utilities = require("./utilities"); const request = thunkify(require("request")); const fs = require("fs"); const mkdirp = thunkify(require("mkdirp")); const readFile = thunkify(fs.readFile); const writeFile = thunkify(fs.writeFile); const nextTick = thunkify(process.nextTick); function* download(url, filename) { console.log(`Downloading ${url}`); const response = yield request(url); console.log(response); const body = response[1]; yield mkdirp(path.dirname(filename)); yield writeFile(filename, body); console.log(`Downloaded and saved ${url}`); return body; } function* spider(url, nesting) { const filename = utilities.urlToFilename(url); let body; try { body = yield readFile(filename, "utf8"); } catch (err) { if (err.code !== 'ENOENT') { throw err; } body = yield download(url, filename); } yield spiderLinks(url, body, nesting); } function* spiderLinks(currentUrl, body, nesting) { if (nesting === 0) { return nextTick(); } const links = utilities.getPageLinks(currentUrl, body); for (let i = 0; i < links.length; i++) { yield spider(links[i], nesting - 1); } } /// 通过co就自动处理了回调函数,直接返回了回调函数中的参数,把这些参数放到一个数组中,但是去掉了err信息 co(function* () { try { yield spider(process.argv[2], 1); console.log('Download complete'); } catch (err) { console.log(err); } });
总结
我并没有写promise和generator并发的代码。以上这些内容来自于这本书nodejs-design-patterns 。
相关推荐:
以上是Node.js異步流控制詳解的詳細內容。更多資訊請關注PHP中文網其他相關文章!