Home > Web Front-end > JS Tutorial > body text

Detailed explanation of Node asynchronous programming mechanism

小云云
Release: 2018-01-09 17:17:37
Original
1305 people have browsed it

This article mainly introduces the mechanism of Node asynchronous programming. The editor thinks it is quite good. Now I will share it with you and give you a reference. Let’s follow the editor to take a look, I hope it can help everyone.

This article introduces Node asynchronous programming and shares it with everyone. The details are as follows:

The current main solutions for asynchronous programming are:

  • Event release/ Subscription mode

  • Promise/Deferred mode

  • Process control library

Event publishing /Subscription mode

Node itself provides the events module, which can easily implement event publishing/subscription


//订阅
emmiter.on("event1",function(message){
  console.log(message);
})
//发布
emmiter.emit("event1","I am mesaage!");
Copy after login

The listener can be very flexible Adding and deleting makes it easy to associate and decouple events from specific processing logic

The event publishing/subscription model is often used to decouple business logic, and event publishers do not need to pay attention to the detection of subscriptions. How the listener implements business logic does not even need to pay attention to how many listeners exist. Data can be transferred flexibly through messages.

The following HTTP is a typical application scenario


var req = http.request(options,function(res){
  res.on('data',function(chunk){
    console.log('Body:'+ chunk);
  })
  res.on('end',function(){
    //TODO
  })
})
Copy after login

If more than 10 listeners are added to an event, you will get a warning , you can remove this restriction by calling emmite.setMaxListeners(0)

Inherit the events module


var events = require('events');
function Stream(){
  events.EventEmiiter.call(this);
}
util.inherits(Stream,events.EventEmitter);
Copy after login

Use events Queue solves the avalanche problem

The so-called avalanche problem is the cache failure under high access volume and large concurrency. At this time, a large number of requests are integrated into the database at the same time, and the database cannot withstand such a large amount of requests at the same time. query requests, which will further affect the overall response speed of the website

Solution:


var proxy = new events.EventEmitter();
var status = "ready"; 
var seletc = function(callback){
  proxy.once("selected",callback);//为每次请求订阅这个查询时间,推入事件回调函数队列
  if(status === 'ready'){ 
    status = 'pending';//设置状态为进行中以防止引起多次查询操作
    db.select("SQL",function(results){
      proxy.emit("selected",results); //查询操作完成后发布时间
      status = 'ready';//重新定义为已准备状态
    })
  }
}
Copy after login

Multiple asynchronous Collaboration plan between

The relationship between events and listeners in the above situations is one-to-many, but in asynchronous programming, there will also be a many-to-one situation between events and listeners.

Here is a brief introduction using the template reading, data reading and localized resource reading required to render the page as an example


var count = 0 ;
var results = {};
var done = function(key,value){
  result[key] = value;
  count++;
  if(count === 3){
    render(results);
  }
}
fs.readFile(template_path,"utf8",function(err,template){
  done('template',template)
})
db.query(sql,function(err,data){
  done('data',data);
})
l10n.get(function(err,resources){
  done('resources',resources)
})
Copy after login

Partial function solution


var after = function(times,callback){
  var count = 0, result = {};
  return function(key,value){
    results[key] = value;
    count++;
    if(count === times){
      callback(results);
    }
  }
}
var done = after(times,render);
var emitter = new events.Emitter();
emitter.on('done',done);  //一个侦听器
emitter.on('done',other);  //如果业务增长,可以完成多对多的方案

fs.readFile(template_path,"utf8",function(err,template){
  emitter.emit('done','template',template);
})
db.query(sql,function(err,data){
  emitter.emit('done','data',data);
})
l10n.get(function(err,resources){
  emitter.emit('done','resources',resources)
})
Copy after login

Introduction of EventProxy module solution


var proxy = new EventProxy();
proxy.all('template','data','resources',function(template,data,resources){
  //TODO
})
fs.readFile(template_path,'utf8',function(err,template){
  proxy.emit('template',template);
})
db.query(sql,function(err,data){
  proxy.emit('data',data);
})
l10n.get(function(err,resources){
  proxy.emit('resources',resources);
})
Copy after login

Promise/Deferred Mode

When using events in the above way, the execution process needs to be preset, which is determined by the operating mechanism of the publish/subscribe mode.


$.get('/api',{
  success:onSuccess,
  err:onError,
  complete:onComplete
})
//需要严谨设置目标
Copy after login

So is there a way to execute the asynchronous call first and delay the delivery processing? The next thing to talk about is the way to deal with this situation: Promise/Deferred pattern

Promise/A

Promise/A proposal makes this for a single asynchronous operation Abstract definition:

  • Promise operation will only be in one of three states: incomplete state, completed state and failed state.

  • The state of Promise will only transform from uncompleted state to completed state or failed state, and cannot be reversed. Completed state and failed state cannot be converted into each other

  • Once the state of Promise is converted, it cannot be changed.

A Promise object only needs to have then()

  • Callback methods that accept completion and error states

  • Optionally supports progress event callback as the third method

  • Then() method only accepts function objects, other objects will be ignored

  • Thethen() method continues to return the Promise object to implement chain calls

Simulate a Promise implementation through Node's events module


var Promise = function(){
  EventEmitter.call(this)
}
util.inherits(Promise,EventEmitter);

Promise.prototype.then = function(fulfilledHandler,errHandler,progeressHandler){
  if(typeof fulfilledHandler === 'function'){
    this.once('success',fulfilledHandler); //实现监听对应事件
  }
  if(typeof errorHandler === 'function'){
    this.once('error',errorHandler)
  }
  if(typeof progressHandler === 'function'){
    this.on('progress',progressHandler);
  }
  return this;
}
Copy after login

The callback function is stored through then(), and the next step is to wait for the success, error, and progress events to be triggered. The object that implements this function is called a Deferred object, that is, a delay object.


var Deferred = function(){
  this.state = 'unfulfilled';
  this.promise = new Promise();
}
Deferred.prototype.resolve = function(obj){ //当异步完成后可将resolve作为回调函数,触发相关事件
  this.state = 'fulfilled';
  this.promise.emit('success',obj);
}
Deferred.prototype.reject = function(err){
  this.state = 'failed';
  this.promise.emit('error',err);
}
Deferred.prototype.progress = function(data){
  this.promise.emit('progress',data)
}
Copy after login

Therefore, a typical response object can be encapsulated


res.setEncoding('utf8');
res.on('data',function(chunk){
  console.log("Body:" + chunk);
})
res.on('end',function(){
  //done
})
res.on('error',function(err){
  //error
}
Copy after login

and converted into


res.then(function(){
  //done
},function(err){
  //error
},function(chunk){
  console.log('Body:' + chunk);
})
Copy after login

To complete the above conversion, you first need to encapsulate the res object and promisify the data, end, error and other events


var promisify = function(res){
  var deferred = new Deferred(); //创建一个延迟对象来在res的异步完成回调中发布相关事件
  var result = ''; //用来在progress中持续接收数据
  res.on('data',function(chunk){ //res的异步操作,回调中发布事件
    result += chunk;
    deferred.progress(chunk);
  })
  res.on('end',function(){    
    deferred.resolve(result);
  })
  res.on('error',function(err){
    deferred.reject(err);
  });
  return deferred.promise   //返回deferred.promise,让外界不能改变deferred的状态,只能让promise的then()方法去接收外界来侦听相关事件。
}

promisify(res).then(function(){
  //done
},function(err){
  //error
},function(chunk){
  console.log('Body:' + chunk);
})
Copy after login

Above, it encapsulates the immutable part of the business in Deferred and hands over the variable part to Promise

Multiple asynchronous collaboration in Promise


Deferred.prototype.all = function(promises){
  var count = promises.length; //记录传进的promise的个数
  var that = this; //保存调用all的对象
  var results = [];//存放所有promise完成的结果
  promises.forEach(function(promise,i){//对promises逐个进行调用
    promise.then(function(data){//每个promise成功之后,存放结果到result中,count--,直到所有promise被处理完了,才出发deferred的resolve方法,发布事件,传递结果出去
      count--;
      result[i] = data;
      if(count === 0){
        that.resolve(results);
      }
    },function(err){
      that.reject(err);
    });
  });
  return this.promise; //返回promise来让外界侦听这个deferred发布的事件。
}

var promise1 = readFile('foo.txt','utf-8');//这里的文件读取已经经过promise化
var promise2 = readFile('bar.txt','utf-8');
var deferred = new Deferred();
deferred.all([promise1,promise2]).thne(function(results){//promise1和promise2的then方法在deferred内部的all方法所调用,用于同步所有的promise
  //TODO
},function(err){
  //TODO
})
Copy after login

Promise that supports sequence execution

Try to transform the code to implement chain calls


var Deferred = function(){
  this.promise = new Promise()
}

//完成态
Deferred.prototype.resolve = function(obj){
  var promise = this.promise;
  var handler;
  while((handler = promise.queue.shift())){
    if(handler && handler.fulfilled){
      var ret = handler.fulfilled(obj);
      if(ret && ret.isPromise){
        ret.queue = promise.queue;
        this.promise = ret;
        return;
      }
    }
  }
}

//失败态
Deferred.prototype.reject = function(err){
  var promise = this.promise;
  var handler;
  while((handler = promise.queue.shift())){
    if(handler && handler.error){
      var ret = handler.error(err);
      if(ret && ret.isPromise){
        ret.queue = promise.queue;
        this.promise = ret;
        return
      }
    }
  }
}

//生成回调函数
Deferred.prototype.callback = function(){
  var that = this;
  return function(err,file){
    if(err){
      return that.reject(err);
    }
    that.resolve(file)
  }
}

var Promise = function(){
  this.queue = []; //队列用于存储待执行的回到函数
  this.isPromise = true;
};
Promise.prototype.then = function(fulfilledHandler,errorHandler,progressHandler){
  var handler = {};
  if(typeof fulfilledHandler === 'function'){
    handler.fulfilled = fulfilledHandler;
  }
  if(typeof errorHandler === 'function'){
    handler.error = errorHandler;
  }
  this.queue.push(handler);
  return this;
}

var readFile1 = function(file,encoding){
  var deferred = new Deferred();
  fs.readFile(file,encoding,deferred.callback());
  return deferred.promise;
}
var readFile2 = function(file,encoding){
  var deferred = new Deferred();
  fs.readFile(file,encoding,deferred.callback());
  return deferred.promise;
}

readFile1('file1.txt','utf8').then(function(file1){
  return readFile2(file1.trim(),'utf8')
}).then(function(file2){
  console.log(file2)
})
Copy after login

Related recommendations:

Introduction to 4 methods of asynchronous programming in Javascript

Explanation of asynchronous programming Promise in es6

javascript asynchronous programming callback function and manager usage examples detailed explanation

The above is the detailed content of Detailed explanation of Node asynchronous programming mechanism. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template