This article will introduce you to the observable objects (Observable), observers (observer) and RxJS operators in Angular. I hope it will be helpful to everyone!
Observable (observable object)
Observable
(observable object) is inside the RxJS
library An object that can be used to handle asynchronous events, such as HTTP requests (in fact, in Angular, all HTTP requests return Observable). [Recommended tutorials: "angular tutorial"]
Perhaps, you have come into contact with something called promise
before. They are essentially the same: they are all produced The operator actively "push" products to consumers, while consumers passively receive them, but there is still a big difference between them: Observable
can send any number of values, and, after being subscribed Before, it would not execute ! This is a feature that promise
does not have.
-
Observable
is used to transmit messages between the sender and the receiver. You can regard these messages as streams
- When creating
Observable
object, you need to pass in a function as a parameter of the constructor. This function is called Subscriber function . This function is where the producer pushes messages to the consumer.
- Before being
subscribe
(subscribed) by the consumer, the subscriber function will not be executed until the subscribe()
function is called, which returns a subscription
object, There is a unsubscribe()
function inside, and consumers can refuse to receive messages at any time!
-
subscribe()
The function receives an observer(observer)
object as an input parameter
- The sending of the message can be synchronous or Asynchronous
observer (observer)
With an observable object (sender)
, you need an observer (receiver)
To observe observable objects, the observer must implement the observer
interface, which is an object containing three properties, all of which are functions, as follows:
Notification Type |
Description |
##next | Required. Use the received value as input parameter and execute under normal circumstances. May be executed zero or more times. |
error | Optional. Executed in case of error. Errors interrupt the execution of this observable object instance. |
complete | Optional. Executed when the transfer is completed. |
Subscription
Only when someone subscribes to an instance of Observable
will it start publishing values. When subscribing, you must first call the subscribe()
method of the observable object and pass it an observer object to receive notifications. As follows:
In order to demonstrate the principle of subscription, a new observable object needs to be created first. It has a constructor that can be used to create new instances, but to be more concise, you can also use some static methods defined on Observable
to create some commonly used simple observable objects:
-
of(...items)
: Returns an Observable
instance, which sends the values provided in the parameters one by one
in a synchronous manner.
-
from(iterable)
: Converts its argument to an Observable
instance. This method is usually used to convert an array into an observable object (which sends multiple values).
import { of } from "rxjs";
// 1、通过 of() 方法返回一个可观察对象,并准备将1,2,3三个数据发送出去
const observable = of(1, 2, 3);
// 2、实现 observer 接口,观察者
const observer = {
next: (num: number) => console.log(num),
error: (err: Error) => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
}
// 3、订阅。调用可观察对象的 subscribe() 方法订阅,subscribe() 方法中传入的对象就是一个观察者
observable.subscribe(observer);
Copy after login
The running results are as follows:
# The above subscription can be directly changed to the following: The parameter is not an object
observable.subscribe(
num => console.log(num),
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
);
Copy after login
Subscriber function
In the above example, the of()
method is used to create an observable object. This section uses the constructor to create an observable object.
Observable
The constructor can create any type of observable stream. When the subscribe()
method of the observable object is executed, this constructor will run the parameters it receives as the subscription function
. The subscription function will receive an Observer
object and publish the value to the next()
method of the observer.
// 1、自定义订阅者函数
function sequenceSubscriber(observer: Observer<number>) {
observer.next(1); // 发送数据
observer.next(2); // 发送数据
observer.next(3); // 发送数据
observer.complete();
return {unsubscribe() {}};
}
// 2、通过构造函数创建一个新的可观察对象,参数就是一个订阅者函数
const sequence = new Observable(sequenceSubscriber);
// 3、订阅
sequence.subscribe({
next(num) { console.log(num); }, // 接受数据
complete() { console.log('Finished sequence'); }
});
Copy after login
The running results are as follows:
The above example demonstrates how to customize the subscription function, so since you can customize the subscriber function, we can encapsulate the asynchronous code into the subscriber function of the observable object, and then send the data after the asynchronous code is executed. As follows:
import { Observable } from 'rxjs'
// 异步函数
function fn(num) {
return new Promise((reslove, reject) => {
setTimeout(() => {
num++
reslove(num)
}, 1000)
})
}
// 创建可观察对象,并传入订阅者函数
const observable = new Observable((x) => {
let num = 1
fn(num).then(
res => x.next(res) // 异步代码执行完成,发送数据
)
})
// 订阅,接收数据,可以改为链式调用
observable.subscribe(data => console.log(data)) // 2
Copy after login
Multicast
https://angular.cn/guide/observables#multicasting
RxJS Operator
We can use a series of RxJS operators
to perform a series of processing and conversion on these messages before they are received by the receiver, because these operators are all pure functions.
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
// 1、创建可观察对象,并发送数据
const nums = of(1, 2, 3);
// 2、创建函数以接受可观察对象
const squareValues = map((val: number) => val * val);
const squaredNums = squareValues(nums);
squaredNums.subscribe(x => console.log(x));
Copy after login
I don’t understand the above method and it is difficult to accept it. Generally, the following method is commonly used. Use pipe
to link multiple operators.
import { map, Observable, filter } from 'rxjs'
// 创建可观察对象,并传入订阅者函数
const observable = new Observable((x) => {
x.next(1)
x.next(2)
x.next(3)
x.next(4)
}).pipe(
map(value => value*100), // 操作符
filter(value => value == 200) // 操作符
)
.subscribe(data => console.log(data)) // 200
Copy after login
Error handling
RxJS
also provides the catchError
operator, which allows you to handle known errors in the pipeline.
Suppose you have an observable that makes API requests and then maps the responses returned by the server. If the server returns an error or the value does not exist, an error is generated. If you catch this error and provide a default value, the stream will continue processing those values without reporting an error. As follows:
import { map, Observable, filter, catchError, of } from 'rxjs'
const observable = new Observable((x) => {
x.next(1) // 发送数据 1 和 2
x.next(2)
}).pipe(
map(value => {
if (value === 1) { // 1、当发送的数据为 1 时,将其乘以 100
return value*100
} else { // 2、否则抛出错误
throw new Error('抛出错误');
}
}),
// 3、此处捕获错误并处理错误,对外发送数据 0
catchError((err) => {
console.log(err)
return of(0)
})
)
.subscribe(
data => console.log(data),
// 4、由于上面抛出的错误被 catchError 操作符处理(重新发送数据)了,所以这里能顺利订阅到数据而不报错
err => console.log('接受不到数据:', err)
)
Copy after login
The final running result is as follows:
For more programming-related knowledge, please visit: Programming Video! !
The above is the detailed content of A brief analysis of observable objects, observers and RxJS operators in Angular. For more information, please follow other related articles on the PHP Chinese website!