RXSwift基础

不言
Libérer: 2023-02-28 15:48:02
original
2139 Les gens l'ont consulté


一个观察者(Observer)订阅一个可观察序列(Observable)。观察者对Observable发射的数据或数据序列作出响应

为什么发用RxSwift

一个程序通常包含着大量的各种事件的产生以及对应的处理逻辑,各种响应方法使代码更加的混乱和复杂,而RxSwift是一个统一的处理各种响应事件的方式

  • Observable的创建和订阅

  • Subjects的使用

  • Combination:Observable的混合操作

  • Transforming:Observable的转换操作

  • Filtering:Observable消息元素的过滤操作

  • 对Observable元素做运算操作

  • Connectable操作

  • 错误处理

  • debug

消息的订阅方式

这些都是Observable的方法,参数都是闭包,闭包是观察者

  1. subscribe(on:(Event) -> void):订阅所有消息(Next, Error, and Completed)

  2. subscribeNext((Element) -> void):只订阅Next

  3. subscribeError((ErrorType) -> void):只订阅Error

  4. subscribeCompleted(() -> Void):只订阅Completed

  5. subscribe(onNext:(Element) -> void, onError:(ErrorType) -> void, onCompleted:() -> Void, onDisposed:() -> Void)订阅多个消息

释放分配的资源

订阅者可以通过调用.dispose()来释放分配的资源,但通过DisposeBag来管理或者通过takeUntil来自动释放更好

let disposeBag = DisposeBag()subscription.addDisposableTo(disposeBag)
Copier après la connexion

sequence    .takeUntil(self.rx_deallocated)    .subscribe {        print($0)    }
Copier après la connexion

Observable的创建和订阅

Observable序列分为两类:

  • 冷:只有当有观察者订阅这个序列时,序列才发射值

  • 热:序列创建时就开始发射值

never()创建即不会完成也不会发消息的Observable

let disposeBag = DisposeBag()let neverSequence = Observable.never()let neverSequenceSubscription = neverSequence        .subscribe { _ in            print("This will never be printed")    }neverSequenceSubscription.addDisposableTo(disposeBag)
Copier après la connexion

empty()创建只会发送一次完成消息的Observable

let disposeBag = DisposeBag()Observable.empty()        .subscribe { event in            print(event)        }        .addDisposableTo(disposeBag)
Copier après la connexion

output: Completed

just()创建只包含一个元素的Observable,在发送一次Next消息后便发送Completed消息

let disposeBag = DisposeBag()Observable.just(":red_circle:")    .subscribe { event in        print(event)    }    .addDisposableTo(disposeBag)
Copier après la connexion

注:如果传递null给just,它将返回一个发送null消息的Observable,不要传入错误的参数,否则将会得到一个空的Observable

output:

Next(:red_circle:)

Completed

of()创建可以包含任意个元素的Observable,连续相同的元素会被忽略

let disposeBag = DisposeBag()Observable.of(":dog:", ":cat:", ":mouse:", ":hamster:")       .subscribeNext { element in           print(element)       }       .addDisposableTo(disposeBag)
Copier après la connexion

output:

:dog:

:cat:

:mouse:

:hamster:

create()可以创建自定义的Observable,在最原始的Observable基础上创建Observable

let disposeBag = DisposeBag()let myJust = { (element: String) -> Observablein        return Observable.create { observer in            observer.on(.Next(element))            observer.on(.Completed)            return NopDisposable.instance        }    } myJust(":red_circle:")    .subscribe { print($0) }    .addDisposableTo(disposeBag)
Copier après la connexion

output:

Next(:red_circle:)

Completed

range()创建一个发送一个范围的整数的Observable,发送完后发送Completed

let disposeBag = DisposeBag()Observable.range(start: 1, count: 10)        .subscribe { print($0) }        .addDisposableTo(disposeBag)
Copier après la connexion

output:

Next(1)

Next(2)

Next(3)

Next(4)

Next(5)

Next(6)

Next(7)

Next(8)

Next(9)

Next(10)

Completed

repeatElement()创建一个可以重复发送消息的Observable,可以指定重复的次数,未指定即无限发送

let disposeBag = DisposeBag()Observable.repeatElement(":red_circle:")        .take(3)        .subscribeNext { print($0) }        .addDisposableTo(disposeBag)
Copier après la connexion

output:

:red_circle:

:red_circle:

:red_circle:

注:take可以用于所有Observable指定限制元素个数

generate()创建一个可以指定规则的Observable,会发送所有满足规则的元素

let disposeBag = DisposeBag() Observable.generate(            initialState: 0,            condition: { $0 < 3 },            iterate: { $0 + 1 }        )        .subscribeNext { print($0) }        .addDisposableTo(disposeBag)
Copier après la connexion

iterate:每次condition之后都会对当前值做一次相应迭代运算

output:

0

1

2

deferred()序列为每一个订阅者创建一个全新的Observable

let disposeBag = DisposeBag()var count = 1let deferredSequence = Observable.deferred {        print("Creating \(count)")        count += 1        return Observable.create { observer in            print("Emitting...")            observer.onNext(":dog:")            observer.onNext(":cat:")            observer.onNext(":monkey_face:")            return NopDisposable.instance        }    }deferredSequence    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)deferredSequence    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)
Copier après la connexion

注:deferred序列只有在一个观察者订阅它的时候才执行它的创建Observable方法,产生一个全新的Observable**

output:

Creating 1

Emitting…

:dog:

:cat:

:monkey_face:

Creating 2

Emitting…

:dog:

:cat:

:monkey_face:

error()创建一个不发送元素的Observable,然后立即发送error并终止

let disposeBag = DisposeBag()Observable.error(Error.Test)        .subscribe { print($0) }        .addDisposableTo(disposeBag)
Copier après la connexion

output: Error(Test)

doOn()在发送元素消息前对每一个元素做指定的操作,然后返回操作前的元素消息

let disposeBag = DisposeBag()Observable.of(":apple:", ":pear:", ":tangerine:", ":lemon:")        .doOn { print("Intercepted:", $0) }        .subscribeNext { print($0) }        .addDisposableTo(disposeBag)
Copier après la connexion

注: doOn(onNext:onError:onCompleted:)为不同订阅方式分别指定

output:

Intercepted: Next(:apple:)

:apple:

Intercepted: Next(:pear:)

:pear:

Intercepted: Next(:tangerine:)

:tangerine:

Intercepted: Next(:lemon:)

:lemon:

Intercepted: Completed

toObservable()通过Array,Dictionary,或Set等SequenceType创建一个Observable

Subjects的使用

Subjects理解为observer和Observable之间的桥梁或代理,即扮演着observer又扮演着Observable,规定了添加的observer如何接收消息

PublishSubject向所有订阅者广播从订阅之后的事件

let disposeBag = DisposeBag()let subject = PublishSubject()subject.addObserver("1").addDisposableTo(disposeBag)subject.onNext(":dog:")subject.onNext(":cat:")subject.addObserver("2").addDisposableTo(disposeBag)subject.onNext(":a:")subject.onNext(":b:")
Copier après la connexion

output:

Subscription: 1 Event: Next(:dog:)

Subscription: 1 Event: Next(:cat:)

Subscription: 1 Event: Next(:a:)

Subscription: 2 Event: Next(:a:)

Subscription: 1 Event: Next(:b:)

Subscription: 2 Event: Next(:b:)

ReplaySubject有一个缓存机制,可以在创建时通过指定bufferSize指定缓存大小或调用buffer方法指定更详细的缓存条件来指定新添加的订阅者可以接收多少订阅前的消息

let disposeBag = DisposeBag()let subject = ReplaySubject.create(bufferSize: 1)subject.addObserver("1").addDisposableTo(disposeBag)subject.onNext(":dog:")subject.onNext(":cat:")subject.addObserver("2").addDisposableTo(disposeBag)subject.onNext(":a:")subject.onNext(":b:")
Copier après la connexion

output:

Subscription: 1 Event: Next(:dog:)

Subscription: 1 Event: Next(:cat:)

Subscription: 2 Event: Next(:cat:)

Subscription: 1 Event: Next(:a:)

Subscription: 2 Event: Next(:a:)

Subscription: 1 Event: Next(:b:)

Subscription: 2 Event: Next(:b:)

BehaviorSubject向新的订阅者发送一条最近的事件,如果没有事件则发送一条默认的消息

let disposeBag = DisposeBag()let subject = BehaviorSubject(value: ":red_circle:")subject.addObserver("1").addDisposableTo(disposeBag)subject.onNext(":dog:")subject.onNext(":cat:")subject.addObserver("2").addDisposableTo(disposeBag)subject.onNext(":a:")subject.onNext(":b:")subject.addObserver("3").addDisposableTo(disposeBag)subject.onNext(":pear:")subject.onNext(":tangerine:")
Copier après la connexion

note:以上都不会自动发送Completed当它们被释放的时候

output:

Subscription: 1 Event: Next(:red_circle:)

Subscription: 1 Event: Next(:dog:)

Subscription: 1 Event: Next(:cat:)

Subscription: 2 Event: Next(:cat:)

Subscription: 1 Event: Next(:a:)

Subscription: 2 Event: Next(:a:)

Subscription: 1 Event: Next(:b:)

Subscription: 2 Event: Next(:b:)

Subscription: 3 Event: Next(:b:)

Subscription: 1 Event: Next(:pear:)

Subscription: 2 Event: Next(:pear:)

Subscription: 3 Event: Next(:pear:)

Subscription: 1 Event: Next(:tangerine:)

Subscription: 2 Event: Next(:tangerine:)

Subscription: 3 Event: Next(:tangerine:)

Variable与BehaviorSubject的区别是在完成的时候它会自动发送一条Completed消息和调用deist

let disposeBag = DisposeBag()let variable = Variable(":red_circle:")variable.asObservable().addObserver("1").addDisposableTo(disposeBag)variable.value = ":dog:"variable.value = ":cat:"variable.asObservable().addObserver("2").addDisposableTo(disposeBag)variable.value = ":a:"variable.value = ":b:"
Copier après la connexion

注:variable.asObservable()实际是获取variable中的BehaviorSubject。variable也没有onNext,而是通过value来获取或添加元素,它会添加元素到BehaviorSubject

output:

Subscription: 1 Event: Next(:red_circle:)

Subscription: 1 Event: Next(:dog:)

Subscription: 1 Event: Next(:cat:)

Subscription: 2 Event: Next(:cat:)

Subscription: 1 Event: Next(:a:)

Subscription: 2 Event: Next(:a:)

Subscription: 1 Event: Next(:b:)

Subscription: 2 Event: Next(:b:)

Subscription: 1 Event: Completed

Subscription: 2 Event: Completed

Combination:Observable的混合操作

startWith()分为原Observable和新Observable,并且在发送原Observable元素前会先发送完新Observable元素,有点像栈

let disposeBag = DisposeBag() Observable.of(":dog:", ":cat:", ":mouse:", ":hamster:")     .startWith(":one:")     .startWith(":two:")     .startWith(":three:", ":a:", ":b:")     .subscribeNext { print($0) }     .addDisposableTo(disposeBag)
Copier après la connexion

output:

:three:

:a:

:b:

:two:

:one:

:dog:

:cat:

:mouse:

:hamster:

http://reactivex.io/documentation/operators/startwith.html

merge()按顺序混合多个Observable为一个新Observable

let disposeBag = DisposeBag()let subject1 = PublishSubject()let subject2 = PublishSubject()let subject3 = PublishSubject()Observable.of(subject1, subject2, subject3)    .merge()    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)subject1.onNext(":a:")subject1.onNext(":b:")subject2.onNext("①")subject2.onNext("②")subject1.onNext(":ab:")subject3.onNext(":cat:")subject2.onNext("③")
Copier après la connexion

output:

:a:

:b:

:ab:

http://reactivex.io/documentation/operators/merge.html

zip()相当于并排的将多个Observable合并成一个新Observable

let disposeBag = DisposeBag()let stringSubject = PublishSubject()let intSubject = PublishSubject()Observable.zip(stringSubject, intSubject) { stringElement, intElement in    "\(stringElement) \(intElement)"    }    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)stringSubject.onNext(":a:")stringSubject.onNext(":b:")intSubject.onNext(1)intSubject.onNext(2)stringSubject.onNext(":ab:")intSubject.onNext(3)
Copier après la connexion

output:

:a: 1

:b: 2

:ab: 3

http://reactivex.io/documentation/operators/zip.html

combineLatest()总是将某个Observable发出的最新元素与其他Observable的最后发出的元素混合

let disposeBag = DisposeBag()let stringSubject = PublishSubject()let intSubject = PublishSubject()Observable.combineLatest(stringSubject, intSubject) { stringElement, intElement in        "\(stringElement) \(intElement)"    }    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)stringSubject.onNext(":a:")stringSubject.onNext(":b:")intSubject.onNext(1)intSubject.onNext(2)stringSubject.onNext(":ab:")
Copier après la connexion

output:

:b: 1

:b: 2

:ab: 2

http://reactivex.io/documentation/operators/combinelatest.html

在数组上的应用:

let disposeBag = DisposeBag()let stringObservable = Observable.just(":heart:")let fruitObservable = [":apple:", ":pear:", ":tangerine:"].toObservable()let animalObservable = Observable.of(":dog:", ":cat:", ":mouse:", ":hamster:")[stringObservable, fruitObservable, animalObservable].combineLatest {        "\($0[0]) \($0[1]) \($0[2])"    }    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)
Copier après la connexion

note:所有集合的类型必须一样

output:

:heart: :tangerine: :dog:

:heart: :tangerine: :cat:

:heart: :tangerine: :mouse:

:heart: :tangerine: :hamster:

switchLatest()可以将多个Observable序列合并成一个一维的Observable序列,只合并当前关注的Observable序列最近的消息

let disposeBag = DisposeBag()let subject1 = BehaviorSubject(value: ":soccer:️")let subject2 = BehaviorSubject(value: ":apple:")let variable = Variable(subject1)variable.asObservable()    .switchLatest()    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)subject1.onNext(":football:")subject1.onNext(":basketball:")variable.value = subject2subject1.onNext(":baseball:")subject1.onNext(":tennis:")subject2.onNext(":pear:")variable.value = subject1
Copier après la connexion

output:

:soccer:️

:football:

:basketball:

:apple:

:pear:

:tennis:

note::soccer:️ 被忽略

Transforming:Observable的转换操作

map()将闭包操作应用到一个被观察序列的所有元素上,然后返回一个新的被观察序列

let disposeBag = DisposeBag()Observable.of(1, 2, 3)    .map { $0 * $0 }    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)
Copier après la connexion

output:

1

4

9

http://reactivex.io/documentation/operators/map.html

scan()可以迭代的操作,并且可以设置一个初始的迭代值

let disposeBag = DisposeBag()Observable.of(10, 100, 1000)    .scan(1) { aggregateValue, newValue in        aggregateValue + newValue    }    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)
Copier après la connexion

output:

11

111

1111

http://reactivex.io/documentation/operators/scan.html

Filtering:Observable消息元素的过滤操作

filter()发出满足bool条件的元素

let disposeBag = DisposeBag()Observable.of(    ":cat:", ":rabbit:", ":dog:",    ":frog:", ":cat:", ":rabbit:",    ":hamster:", ":frog:", ":cat:")    .filter {        $0 == ":cat:"    }    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)
Copier après la connexion

output:

:cat:

:cat:

:cat:

http://reactivex.io/documentation/operators/filter.html

distinctUntilChanged()过滤掉连续的相同元素

let disposeBag = DisposeBag()Observable.of(":cat:", ":pig:", ":cat:", ":cat:", ":cat:", ":monkey_face:", ":cat:")    .distinctUntilChanged()    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)
Copier après la connexion

output:

:cat:

:pig:

:cat:

:monkey_face:

:cat:

http://reactivex.io/documentation/operators/distinct.html

elementAt()只发送指定下标的元素

let disposeBag = DisposeBag()Observable.of(":cat:", ":rabbit:", ":dog:", ":frog:", ":pig:", ":monkey_face:")    .elementAt(3)    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)
Copier après la connexion

output:

:frog:

http://reactivex.io/documentation/operators/elementat.html

single()不传参数则发送Observable的第一个元素,否则为满足条件表达式的第一个元素,如果没有发送一个确切的元素,将发送一个 Error消息

let disposeBag = DisposeBag()Observable.of(":cat:", ":rabbit:", ":dog:", ":frog:", ":pig:", ":monkey_face:")    .single{ $0 <= 6}//如果是==这种确切的判断,将没有Error消息而是Completed消息    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)
Copier après la connexion

output:

Next(1)

Error(Sequence contains more than one element.)

take()仅发送从第一个元素开始指定个数的元素

let disposeBag = DisposeBag()Observable.of(":cat:", ":rabbit:", ":dog:", ":frog:", ":pig:", ":monkey_face:")    .take(3)    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)
Copier après la connexion

output:

:cat:

:rabbit:

:dog:

http://reactivex.io/documentation/operators/take.html

takeLast()与take不同的是takeLast是从末尾开始

let disposeBag = DisposeBag()Observable.of(":cat:", ":rabbit:", ":dog:", ":frog:", ":pig:", ":monkey_face:")    .takeLast(3)    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)
Copier après la connexion

output:

:frog:

:pig:

:monkey_face:

http://reactivex.io/documentation/operators/takelast.html

takeWhile()发送从头开始的满足条件的元素

let disposeBag = DisposeBag()Observable.of(1, 2, 3, 4, 5, 6)    .takeWhile { $0 < 4 }    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)
Copier après la connexion

output:

1

2

3

takeUntil:在与之关联的另一Observable发送元素前发送元素

let disposeBag = DisposeBag()let sourceSequence = PublishSubject()let referenceSequence = PublishSubject()sourceSequence    .takeUntil(referenceSequence)    .subscribe { print($0) }    .addDisposableTo(disposeBag)sourceSequence.onNext(":cat:")sourceSequence.onNext(":rabbit:")sourceSequence.onNext(":dog:")referenceSequence.onNext(":red_circle:")sourceSequence.onNext(":frog:")sourceSequence.onNext(":pig:")sourceSequence.onNext(":monkey_face:")
Copier après la connexion

output:

Next(:cat:)

Next(:rabbit:)

Next(:dog:)

Completed

http://reactivex.io/documentation/operators/takeuntil.html

skip()与take相反,它是不发送

let disposeBag = DisposeBag()Observable.of(":cat:", ":rabbit:", ":dog:", ":frog:", ":pig:", ":monkey_face:")    .skip(2)    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)
Copier après la connexion

output:

:dog:

:frog:

:pig:

:monkey_face:

http://reactivex.io/documentation/operators/skip.html

skipWhile()与takeWhile相反,它是不发送

let disposeBag = DisposeBag()Observable.of(1, 2, 3, 4, 5, 6)    .skipWhile { $0 < 4 }    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)
Copier après la connexion

output:

4

5

6

skipWhileWithIndex()只是skipWhile基础上增加了一个下标index

let disposeBag = DisposeBag()Observable.of(":cat:", ":rabbit:", ":dog:", ":frog:", ":pig:", ":monkey_face:")    .skipWhileWithIndex { element, index in        index < 3    }    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)
Copier après la connexion

output:

:frog:

:pig:

:monkey_face:

skipUntil()与takeUntil相反,它是之后发送

let disposeBag = DisposeBag()let sourceSequence = PublishSubject()let referenceSequence = PublishSubject()sourceSequence    .skipUntil(referenceSequence)    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)sourceSequence.onNext(":cat:")sourceSequence.onNext(":rabbit:")sourceSequence.onNext(":dog:")referenceSequence.onNext(":red_circle:")sourceSequence.onNext(":frog:")sourceSequence.onNext(":pig:")sourceSequence.onNext(":monkey_face:")
Copier après la connexion

output:

:frog:

:pig:

:monkey_face:

http://reactivex.io/documentation/operators/skipuntil.html

对Observable元素做运算操作

toArray()将Observable序列转换成array并发送,然后终止

let disposeBag = DisposeBag()Observable.range(start: 1, count: 10)    .toArray()    .subscribe { print($0) }    .addDisposableTo(disposeBag)
Copier après la connexion

output:

Next([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

Completed

reduce()迭代运算,通过指定初始迭代值和运算符

let disposeBag = DisposeBag()Observable.of(10, 100, 1000)    .reduce(1, accumulator: +)    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)
Copier après la connexion

output: =

1111

http://reactivex.io/documentation/operators/reduce.html

concat()将一个Observable序列的内部Observable序列串联起来,且同一时间只操作一个序列,只有当前序列Completed后,才开始串联下一个序列的前一个元素及之后的元素

let disposeBag = DisposeBag()let subject1 = BehaviorSubject(value: ":apple:")let subject2 = BehaviorSubject(value: ":dog:")let variable = Variable(subject1)variable.asObservable()    .concat()    .subscribe { print($0) }    .addDisposableTo(disposeBag)subject1.onNext(":pear:")subject1.onNext(":tangerine:")variable.value = subject2subject2.onNext("I would be ignored")subject2.onNext(":cat:")subject1.onNext(":tropical_drink:")subject1.onCompleted()subject2.onNext(":mouse:")
Copier après la connexion

output:

Next(:apple:)

Next(:pear:)

Next(:tangerine:)

Next(:tropical_drink:)

Next(:cat:)

Next(:mouse:)

http://reactivex.io/documentation/operators/concat.html

Connectable操作

Connectable操作,Connectable Observable操作跟普通的Observable区别在于,Connectable Observable只有在它们的connect()方法调用后才开始发送元素,因此可以等到所有订阅者都订阅后才开始发送元素,有点像事务一样

publish()将一个普通序列转换成Connectable Observable序列

printExampleHeader(#function)let intSequence = Observable.interval(1, scheduler: MainScheduler.instance)    .publish()_ = intSequence    .subscribeNext { print("Subscription 1:, Event: \($0)") }delay(2) { intSequence.connect() }delay(4) {    _ = intSequence        .subscribeNext { print("Subscription 2:, Event: \($0)") }}delay(6) {    _ = intSequence        .subscribeNext { print("Subscription 3:, Event: \($0)") }
Copier après la connexion

output:

delay 2

Subscription 1:, Event: 0

Subscription 1:, Event: 1

Subscription 2:, Event: 1

Subscription 1:, Event: 2

Subscription 2:, Event: 2

Subscription 1:, Event: 3

Subscription 2:, Event: 3

Subscription 3:, Event: 3

Subscription 1:, Event: 4

Subscription 2:, Event: 4

Subscription 3:, Event: 4

replay()相对于publish增加了bufferSize指定对元素的缓存大小,这样新加入的订阅者可以获取相应个数的已发送的元素

printExampleHeader(#function)let intSequence = Observable.interval(1, scheduler: MainScheduler.instance)    .replay(5)_ = intSequence    .subscribeNext { print("Subscription 1:, Event: \($0)") }delay(2) { intSequence.connect() }delay(4) {    _ = intSequence        .subscribeNext { print("Subscription 2:, Event: \($0)") }}delay(8) {    _ = intSequence        .subscribeNext { print("Subscription 3:, Event: \($0)") }}
Copier après la connexion

output:

delay 2

Subscription 1:, Event: 0

Subscription 2:, Event: 0

Subscription 1:, Event: 1

Subscription 2:, Event: 1

Subscription 1:, Event: 2

Subscription 2:, Event: 2

Subscription 1:, Event: 3

Subscription 2:, Event: 3

Subscription 1:, Event: 4

Subscription 2:, Event: 4

Subscription 3:, Event: 0

Subscription 3:, Event: 1

Subscription 3:, Event: 2

Subscription 3:, Event: 3

Subscription 3:, Event: 4

Subscription 1:, Event: 5

Subscription 2:, Event: 5

Subscription 3:, Event: 5

multicast()需要传入一个subject,通过subject来管理向订阅者发送消息

printExampleHeader(#function)let subject = PublishSubject()_ = subject    .subscribeNext { print("Subject: \($0)") }let intSequence = Observable.interval(1, scheduler: MainScheduler.instance)    .multicast(subject)_ = intSequence    .subscribeNext { print("\tSubscription 1:, Event: \($0)") }delay(2) { intSequence.connect() }delay(4) {     _ = intSequence        .subscribeNext { print("\tSubscription 2:, Event: \($0)") }}delay(6) {     _ = intSequence        .subscribeNext { print("\tSubscription 3:, Event: \($0)") }}
Copier après la connexion

output:

delay 2


Subject: 0

Subscription 1:, Event: 0

Subject: 1

Subscription 1:, Event: 1

Subscription 2:, Event: 1

Subject: 2

Subscription 1:, Event: 2

Subscription 2:, Event: 2

Subject: 3

Subscription 1:, Event: 3

Subscription 2:, Event: 3

Subscription 3:, Event: 3

Subject: 4

Subscription 1:, Event: 4

Subscription 2:, Event: 4

Subscription 3:, Event: 4

Subject: 5

Subscription 1:, Event: 5

Subscription 2:, Event: 5

Subscription 3:, Event: 5

错误处理

catchErrorJustReturn()通过返回一个只发送一个元素的Observable序列来捕获错误信息,然后 Completed

let disposeBag = DisposeBag()let sequenceThatFails = PublishSubject()sequenceThatFails    .catchErrorJustReturn(":blush:")    .subscribe { print($0) }    .addDisposableTo(disposeBag)sequenceThatFails.onNext(":grimacing:")sequenceThatFails.onNext(":fearful:")sequenceThatFails.onNext(":rage:")sequenceThatFails.onNext(":red_circle:")sequenceThatFails.onError(Error.Test)
Copier après la connexion

output:

Next(:grimacing:)

Next(:fearful:)

Next(:rage:)

Next(:red_circle:)

Next(:blush:)

Completed

catchError()当捕获错误后会返回一个正常的Observable序列与之合并

let disposeBag = DisposeBag()let sequenceThatErrors = PublishSubject()let recoverySequence = PublishSubject()sequenceThatErrors    .catchError {        print("Error:", $0)        return recoverySequence    }    .subscribe { print($0) }    .addDisposableTo(disposeBag)sequenceThatErrors.onNext(":grimacing:")sequenceThatErrors.onNext(":fearful:")sequenceThatErrors.onNext(":rage:")sequenceThatErrors.onNext(":red_circle:")sequenceThatErrors.onError(Error.Test)recoverySequence.onNext(":blush:")
Copier après la connexion

output:

Next(:grimacing:)

Next(:fearful:)

Next(:rage:)

Next(:red_circle:)

Error: Test

Next(:blush:)

retry()当遇到error后发送一条error消息然后重新重头发送元素,通过传入一个整数可以指定重复次数

let disposeBag = DisposeBag()    var count = 1let sequenceThatErrors = Observable.create { observer in    observer.onNext(":apple:")    observer.onNext(":pear:")    observer.onNext(":tangerine:")    if count == 1 {        observer.onError(Error.Test)        print("Error encountered")        count += 1    }    observer.onNext(":dog:")    observer.onNext(":cat:")    observer.onNext(":mouse:")    observer.onCompleted()    return NopDisposable.instance}sequenceThatErrors    .retry()    .subscribeNext { print($0) }    .addDisposableTo(disposeBag)
Copier après la connexion

output:

:apple:

:pear:

:tangerine:

Error encountered

:apple:

:pear:

:tangerine:

:dog:

:cat:

:mouse:

Étiquettes associées:
source:php.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal