Dieses Mal werde ich Ihnen einen Anwendungsfall der asynchronen Steuerung von Node.js vorstellen. Was sind die Vorsichtsmaßnahmen bei der Verwendung der asynchronen Steuerung von Node.js?
Wenn Sie nicht über umfassende Erfahrung mit der Verwendung von Funktions- Rückrufen verfügen, ist es immer noch etwas schwierig, diese Inhalte zu lesen. Aufgrund der einzigartigen asynchronen Eigenschaften von Node.js trat das Problem der „Rückrufhölle“ auf. In diesem Artikel habe ich ausführlicher beschrieben, wie das Problem des asynchronen Flusses gelöst werden kann.
Der Artikel wird lang sein, und dies ist eine Erklärung des asynchronen Streaming-Musters. In diesem Artikel wird ein einfaches Web-Spider-Beispiel verwendet. Seine Funktion besteht darin, den Webseiteninhalt der angegebenen URL zu crawlen und im Projekt zu speichern. Am Ende des Artikels finden Sie die Quellcode-Demo des gesamten Artikels.
1. NativeJavaScriptModus
Dieser Artikel ist nicht für Anfänger gedacht, daher werden die meisten grundlegenden Inhalte weggelassen:
(spider_v1.js)
const request = require("request"); const fs = require("fs"); const mkdirp = require("mkdirp"); const path = require("path"); const utilities = require("./utilities"); function spider(url, callback) { const filename = utilities.urlToFilename(url); console.log(`filename: ${filename}`); fs.exists(filename, exists => { if (!exists) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { callback(err); } else { mkdirp(path.dirname(filename), err => { if (err) { callback(err); } else { fs.writeFile(filename, body, err => { if (err) { callback(err); } else { callback(null, filename, true); } }); } }); } }); } else { callback(null, filename, false); } }); } spider(process.argv[2], (err, filename, downloaded) => { if (err) { console.log(err); } else if (downloaded) { console.log(`Completed the download of ${filename}`); } else { console.log(`${filename} was already downloaded`); } });
Der Ablauf des obigen Codes sieht ungefähr so aus:
URL in Dateinamen umwandeln
Dies ist eine sehr einfache Version des Spiders. Er kann nur den Inhalt einer URL crawlen. Sehen Sie, wie problematisch der obige Rückruf ist. Dann beginnen wir mit der Optimierung.
Zunächst einmal kann die if else-Methode ganz einfach optimiert werden:
/// before if (err) { callback(err); } else { callback(null, filename, true); } /// after if (err) { return callback(err); } callback(null, filename, true);
Wenn der Code so geschrieben wird, gibt es eine Verschachtelungsebene weniger, aber erfahrene Programmierer werden denken, dass das Schreiben auf diese Weise Fehler zu sehr hervorhebt. Der Fokus unserer Programmierung sollte auf der Verarbeitung der richtigen Daten liegen, und das gibt es auch Anforderungen an die Lesbarkeit.
Eine weitere Optimierung ist die Funktionsaufteilung. In der Spider-Funktion im obigen Code können die heruntergeladene Datei und die gespeicherte Datei aufgeteilt werden.
(spider_v2.js)
const request = require("request"); const fs = require("fs"); const mkdirp = require("mkdirp"); const path = require("path"); const utilities = require("./utilities"); function saveFile(filename, contents, callback) { mkdirp(path.dirname(filename), err => { if (err) { return callback(err); } fs.writeFile(filename, contents, callback); }); } function download(url, filename, callback) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { return callback(err); } saveFile(filename, body, err => { if (err) { return callback(err); } console.log(`Downloaded and saved: ${url}`); callback(null, body); }); }) } function spider(url, callback) { const filename = utilities.urlToFilename(url); console.log(`filename: ${filename}`); fs.exists(filename, exists => { if (exists) { return callback(null, filename, false); } download(url, filename, err => { if (err) { return callback(err); } callback(null, filename, true); }) }); } spider(process.argv[2], (err, filename, downloaded) => { if (err) { console.log(err); } else if (downloaded) { console.log(`Completed the download of ${filename}`); } else { console.log(`${filename} was already downloaded`); } });
Der obige Code ist im Grunde das Ergebnis einer nativen Optimierung, aber die Funktion dieses Spiders ist zu einfach. Wir müssen jetzt alle URLs auf einer bestimmten Webseite crawlen, was zu seriellen und parallelen Problemen führt.
(spider_v3.js)
const request = require("request"); const fs = require("fs"); const mkdirp = require("mkdirp"); const path = require("path"); const utilities = require("./utilities"); function saveFile(filename, contents, callback) { mkdirp(path.dirname(filename), err => { if (err) { return callback(err); } fs.writeFile(filename, contents, callback); }); } function download(url, filename, callback) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { return callback(err); } saveFile(filename, body, err => { if (err) { return callback(err); } console.log(`Downloaded and saved: ${url}`); callback(null, body); }); }) } /// 最大的启发是实现了如何异步循环遍历数组 function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); function iterate(index) { if (index === links.length) { return callback(); } spider(links[index], nesting - 1, err => { if (err) { return callback(err); } iterate((index + 1)); }) } iterate(0); } function spider(url, nesting, callback) { const filename = utilities.urlToFilename(url); fs.readFile(filename, "utf8", (err, body) => { if (err) { if (err.code !== 'ENOENT') { return callback(err); } return download(url, filename, (err, body) => { if (err) { return callback(err); } spiderLinks(url, body, nesting, callback); }); } spiderLinks(url, body, nesting, callback); }); } spider(process.argv[2], 2, (err, filename, downloaded) => { if (err) { console.log(err); } else if (downloaded) { console.log(`Completed the download of ${filename}`); } else { console.log(`${filename} was already downloaded`); } });
Der obige Code verfügt über zwei weitere Kernfunktionen als der vorherige Code. Erstens erhält er die Links in einem bestimmten Körper über die Hilfsklasse:
const links = utilities.getPageLinks(currentUrl, body);
Ich werde die interne Implementierung nicht erklären. Der andere Kerncode ist:
/// 最大的启发是实现了如何异步循环遍历数组 function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); function iterate(index) { if (index === links.length) { return callback(); } spider(links[index], nesting - 1, err => { if (err) { return callback(err); } iterate((index + 1)); }) } iterate(0); }
Man kann sagen, dass der kleine Code oben ein Muster ist, das die native Implementierung der asynchronen Serialisierung verwendet. Darüber hinaus wird das Konzept der Verschachtelung eingeführt. Über dieses Attribut kann die Crawling-Ebene gesteuert werden.
Zu diesem Zeitpunkt haben wir die serielle Funktion vollständig implementiert. Unter Berücksichtigung der Leistung müssen wir die parallele Crawling-Funktion entwickeln.
(spider_v4.js)
const request = require("request"); const fs = require("fs"); const mkdirp = require("mkdirp"); const path = require("path"); const utilities = require("./utilities"); function saveFile(filename, contents, callback) { mkdirp(path.dirname(filename), err => { if (err) { return callback(err); } fs.writeFile(filename, contents, callback); }); } function download(url, filename, callback) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { return callback(err); } saveFile(filename, body, err => { if (err) { return callback(err); } console.log(`Downloaded and saved: ${url}`); callback(null, body); }); }) } /// 最大的启发是实现了如何异步循环遍历数组 function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } let completed = 0, hasErrors = false; function done(err) { if (err) { hasErrors = true; return callback(err); } if (++completed === links.length && !hasErrors) { return callback(); } } links.forEach(link => { spider(link, nesting - 1, done); }); } const spidering = new Map(); function spider(url, nesting, callback) { if (spidering.has(url)) { return process.nextTick(callback); } spidering.set(url, true); const filename = utilities.urlToFilename(url); /// In this pattern, there will be some issues. /// Possible problems to download the same url again and again。 fs.readFile(filename, "utf8", (err, body) => { if (err) { if (err.code !== 'ENOENT') { return callback(err); } return download(url, filename, (err, body) => { if (err) { return callback(err); } spiderLinks(url, body, nesting, callback); }); } spiderLinks(url, body, nesting, callback); }); } spider(process.argv[2], 2, (err, filename, downloaded) => { if (err) { console.log(err); } else if (downloaded) { console.log(`Completed the download of ${filename}`); } else { console.log(`${filename} was already downloaded`); } });
Auch dieser Code ist sehr einfach und hat zwei Kerninhalte. Eine Möglichkeit besteht darin, Parallelität zu implementieren:
/// 最大的启发是实现了如何异步循环遍历数组 function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } let completed = 0, hasErrors = false; function done(err) { if (err) { hasErrors = true; return callback(err); } if (++completed === links.length && !hasErrors) { return callback(); } } links.forEach(link => { spider(link, nesting - 1, done); }); }
Man kann sagen, dass der obige Code ein Muster zum Erreichen der Parallelität ist. Dies wird durch Schleifendurchquerung erreicht. Ein weiterer Kern besteht darin, dass es bei der Verwendung von fs.exists Probleme geben wird und dieselbe Datei möglicherweise wiederholt heruntergeladen wird. Die Lösung lautet hier:
Verwenden Sie Map, um eine bestimmte URL zwischenzuspeichern. Die URL sollte als Schlüssel verwendet werden
Jetzt haben wir eine neue Anforderung, die maximale Anzahl gleichzeitiger Parallelitäten zu begrenzen. Deshalb stellen wir hier ein Konzept vor, das meiner Meinung nach das wichtigste ist: Warteschlange.
(task-Queue.js)
class TaskQueue { constructor(concurrency) { this.concurrency = concurrency; this.running = 0; this.queue = []; } pushTask(task) { this.queue.push(task); this.next(); } next() { while (this.running < this.concurrency && this.queue.length) { const task = this.queue.shift(); task(() => { this.running--; this.next(); }); this.running++; } } } module.exports = TaskQueue;
Der obige Code ist der Implementierungscode der Warteschlange. Es ist ersichtlich, dass die Aufgabe sofort ausgeführt werden muss, wenn sie zur Warteschlange hinzugefügt wird sofort, aber es bedeutet, dass next sofort aufgerufen wird.
(spider_v5.js)
const request = require("request"); const fs = require("fs"); const mkdirp = require("mkdirp"); const path = require("path"); const utilities = require("./utilities"); const TaskQueue = require("./task-Queue"); const downloadQueue = new TaskQueue(2); function saveFile(filename, contents, callback) { mkdirp(path.dirname(filename), err => { if (err) { return callback(err); } fs.writeFile(filename, contents, callback); }); } function download(url, filename, callback) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { return callback(err); } saveFile(filename, body, err => { if (err) { return callback(err); } console.log(`Downloaded and saved: ${url}`); callback(null, body); }); }) } /// 最大的启发是实现了如何异步循环遍历数组 function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } let completed = 0, hasErrors = false; links.forEach(link => { /// 给队列出传递一个任务,这个任务首先是一个函数,其次该函数接受一个参数 /// 当调用任务时,触发该函数,然后给函数传递一个参数,告诉该函数在任务结束时干什么 downloadQueue.pushTask(done => { spider(link, nesting - 1, err => { /// 这里表示,只要发生错误,队列就会退出 if (err) { hasErrors = true; return callback(err); } if (++completed === links.length && !hasErrors) { callback(); } done(); }); }); }); } const spidering = new Map(); function spider(url, nesting, callback) { if (spidering.has(url)) { return process.nextTick(callback); } spidering.set(url, true); const filename = utilities.urlToFilename(url); /// In this pattern, there will be some issues. /// Possible problems to download the same url again and again。 fs.readFile(filename, "utf8", (err, body) => { if (err) { if (err.code !== 'ENOENT') { return callback(err); } return download(url, filename, (err, body) => { if (err) { return callback(err); } spiderLinks(url, body, nesting, callback); }); } spiderLinks(url, body, nesting, callback); }); } spider(process.argv[2], 2, (err, filename, downloaded) => { if (err) { console.log(`error: ${err}`); } else if (downloaded) { console.log(`Completed the download of ${filename}`); } else { console.log(`${filename} was already downloaded`); } });
Um die Anzahl der Parallelitäten zu begrenzen, stellen Sie daher einfach die Aufgabendurchquerung in die Warteschlange in der SpiderLinks-Methode. Das ist relativ einfach.
Bisher haben wir natives JavaScript verwendet, um einen Webspider mit relativ vollständigen Funktionen zu implementieren, der sowohl seriell als auch gleichzeitig sein kann und auch die Anzahl der Parallelitäten steuern kann.
2. Asynchrone Bibliothek verwenden
把不同的功能放到不同的函数中,会给我们带来巨大的好处,async库十分流行,它的性能也不错,它内部基于callback。
(spider_v6.js)
const request = require("request"); const fs = require("fs"); const mkdirp = require("mkdirp"); const path = require("path"); const utilities = require("./utilities"); const series = require("async/series"); const eachSeries = require("async/eachSeries"); function download(url, filename, callback) { console.log(`Downloading ${url}`); let body; series([ callback => { request(url, (err, response, resBody) => { if (err) { return callback(err); } body = resBody; callback(); }); }, mkdirp.bind(null, path.dirname(filename)), callback => { fs.writeFile(filename, body, callback); } ], err => { if (err) { return callback(err); } console.log(`Downloaded and saved: ${url}`); callback(null, body); }); } /// 最大的启发是实现了如何异步循环遍历数组 function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } eachSeries(links, (link, cb) => { "use strict"; spider(link, nesting - 1, cb); }, callback); } const spidering = new Map(); function spider(url, nesting, callback) { if (spidering.has(url)) { return process.nextTick(callback); } spidering.set(url, true); const filename = utilities.urlToFilename(url); fs.readFile(filename, "utf8", (err, body) => { if (err) { if (err.code !== 'ENOENT') { return callback(err); } return download(url, filename, (err, body) => { if (err) { return callback(err); } spiderLinks(url, body, nesting, callback); }); } spiderLinks(url, body, nesting, callback); }); } spider(process.argv[2], 1, (err, filename, downloaded) => { if (err) { console.log(err); } else if (downloaded) { console.log(`Completed the download of ${filename}`); } else { console.log(`${filename} was already downloaded`); } });
在上边的代码中,我们只使用了async的三个功能:
const series = require("async/series"); // 串行 const eachSeries = require("async/eachSeries"); // 并行 const queue = require("async/queue"); // 队列
由于比较简单,就不做解释了。async中的队列的代码在(spider_v7.js)中,和上边我们自定义的队列很相似,也不做更多解释了。
3.Promise
Promise是一个协议,有很多库实现了这个协议,我们用的是ES6的实现。简单来说promise就是一个约定,如果完成了,就调用它的resolve方法,失败了就调用它的reject方法。它内有实现了then方法,then返回promise本身,这样就形成了调用链。
其实Promise的内容有很多,在实际应用中是如何把普通的函数promise化。这方面的内容在这里也不讲了,我自己也不够格
(spider_v8.js)
const utilities = require("./utilities"); const request = utilities.promisify(require("request")); const fs = require("fs"); const readFile = utilities.promisify(fs.readFile); const writeFile = utilities.promisify(fs.writeFile); const mkdirp = utilities.promisify(require("mkdirp")); const path = require("path"); function saveFile(filename, contents, callback) { mkdirp(path.dirname(filename), err => { if (err) { return callback(err); } fs.writeFile(filename, contents, callback); }); } function download(url, filename) { console.log(`Downloading ${url}`); let body; return request(url) .then(response => { "use strict"; body = response.body; return mkdirp(path.dirname(filename)); }) .then(() => writeFile(filename, body)) .then(() => { "use strict"; console.log(`Downloaded adn saved: ${url}`); return body; }); } /// promise编程的本质就是为了解决在函数中设置回调函数的问题 /// 通过中间层promise来实现异步函数同步化 function spiderLinks(currentUrl, body, nesting) { let promise = Promise.resolve(); if (nesting === 0) { return promise; } const links = utilities.getPageLinks(currentUrl, body); links.forEach(link => { "use strict"; promise = promise.then(() => spider(link, nesting - 1)); }); return promise; } function spider(url, nesting) { const filename = utilities.urlToFilename(url); return readFile(filename, "utf8") .then( body => spiderLinks(url, body, nesting), err => { "use strict"; if (err.code !== 'ENOENT') { /// 抛出错误,这个方便与在整个异步链的最后通过呢catch来捕获这个链中的错误 throw err; } return download(url, filename) .then(body => spiderLinks(url, body, nesting)); } ); } spider(process.argv[2], 1) .then(() => { "use strict"; console.log('Download complete'); }) .catch(err => { "use strict"; console.log(err); });
可以看到上边的代码中的函数都是没有callback的,只需要在最后catch就可以了。
在设计api的时候,应该支持两种方式,及支持callback,又支持promise
function asyncpision(pidend, pisor, cb) { return new Promise((resolve, reject) => { "use strict"; process.nextTick(() => { const result = pidend / pisor; if (isNaN(result) || !Number.isFinite(result)) { const error = new Error("Invalid operands"); if (cb) { cb(error); } return reject(error); } if (cb) { cb(null, result); } resolve(result); }); }); } asyncpision(10, 2, (err, result) => { "use strict"; if (err) { return console.log(err); } console.log(result); }); asyncpision(22, 11) .then((result) => console.log(result)) .catch((err) => console.log(err));
4.Generator
Generator很有意思,他可以让暂停函数和恢复函数,利用thunkify和co这两个库,我们下边的代码实现起来非常酷。
(spider_v9.js)
const thunkify = require("thunkify"); const co = require("co"); const path = require("path"); const utilities = require("./utilities"); const request = thunkify(require("request")); const fs = require("fs"); const mkdirp = thunkify(require("mkdirp")); const readFile = thunkify(fs.readFile); const writeFile = thunkify(fs.writeFile); const nextTick = thunkify(process.nextTick); function* download(url, filename) { console.log(`Downloading ${url}`); const response = yield request(url); console.log(response); const body = response[1]; yield mkdirp(path.dirname(filename)); yield writeFile(filename, body); console.log(`Downloaded and saved ${url}`); return body; } function* spider(url, nesting) { const filename = utilities.urlToFilename(url); let body; try { body = yield readFile(filename, "utf8"); } catch (err) { if (err.code !== 'ENOENT') { throw err; } body = yield download(url, filename); } yield spiderLinks(url, body, nesting); } function* spiderLinks(currentUrl, body, nesting) { if (nesting === 0) { return nextTick(); } const links = utilities.getPageLinks(currentUrl, body); for (let i = 0; i < links.length; i++) { yield spider(links[i], nesting - 1); } } /// 通过co就自动处理了回调函数,直接返回了回调函数中的参数,把这些参数放到一个数组中,但是去掉了err信息 co(function* () { try { yield spider(process.argv[2], 1); console.log('Download complete'); } catch (err) { console.log(err); } });
相信看了本文案例你已经掌握了方法,更多精彩请关注php中文网其它相关文章!
推荐阅读:
Das obige ist der detaillierte Inhalt vonAnwendungsfälle der asynchronen Steuerung für Node.js. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!