首頁 > Java > java教程 > 深入淺出RxJava_06[傳輸過程的條件&組合操作]的詳情

深入淺出RxJava_06[傳輸過程的條件&組合操作]的詳情

黄舟
發布: 2017-03-04 09:58:12
原創
1490 人瀏覽過


本教學基於RxJava1.x版本進行全面講解,後續課程將陸續更新,敬請關注…

下面的一系列函數都是用來判斷傳輸的資料是否符合某些條件。

  1. all - 判定是否Observable發射的所有資料都滿足某個條件,如果原始Observable的任何一項資料不滿足條件就回傳False

  2. #Amb - 給定2個Observable,只發送那個首先發送資料的Observable,後者會被忽略掉。

  3. Contains - 判定一個Observable是否發射一個特定的值

  4. ##DefaultIfEmpty - 發射來自原始Observable的值,如果原始Observable沒有發射任何值,就發射一個預設值

  5. SequenceEqual - 判定兩個Observables是否發射相同的資料序列

  6. SkipUntil - SkipUntil訂閱原始的Observable ,但忽略它的發射物,直到第二個Observable發射了一項數據那一刻,它開始發射原始Observable.

  7. SkipWhile - SkipWhile訂閱原始的Observable,但是忽略它的發射物,直到你指定的某個條件變成false的那一刻,它開始發射原始Observable。

  8. TakeUntil - 使用一個謂詞函數而不是第二個Observable來判定是否需要終止發射數據,它的行為類似於takeWhile。

  9. TakeWhile - TakeWhile發射原始Observable,直到你指定的某個條件不成立的那一刻,它停止發射原始Observable,並終止自己的Observable。

1.all

判定是否Observable發射的所有資料都滿足某個條件,如果原始Observable的任何一項資料不滿足條件就回傳False

如有群學生訊息,資料如下:

private ArrayList<Student> initStudents() {
    ArrayList<Student> persons = new ArrayList<>();
    persons.add(new Student("张三", 16));
    persons.add(new Student("李四", 21));
    persons.add(new Student("王二麻子", 18));
    return persons;
}
登入後複製

入學的時候要求歲數不大於20歲才能讀書,這樣,就可以使用all函數來判斷。

Observable<Student> observable = Observable.from(initStudents());
Observable<Boolean> booleanObservable = 
                observable.all(new Func1<Student, Boolean>() {
                    //设置判断规则
                    @Override
                    public Boolean call(Student student) {
                        return student.age<20;
                    }
                });
booleanObservable.subscribe(new Action1<Boolean>() {
    @Override
    public void call(Boolean aBoolean) {
        //这里打印了false  因为学生里面李四21岁
        Log.i(TAG, "call: "+aBoolean);
    }
});
登入後複製

2.Amb

給定兩個或多個Observables,它只發射首先發射數據或通知的那個Observable的所有數據,Amb將忽略和丟棄其它所有Observables的發射物。

Observable<String> o1 = Observable.just("a", "b", "c");
Observable<String> o2 = Observable.just("d", "f", "e");

//判断上面的2个被观察者对象哪个先发送数据 发送数据慢的被观察者将被忽略
Observable<String> observable = Observable.amb(o2,o1);

observable.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.i(TAG, "call: "+s);
    }
});
登入後複製

3.Contains

給Contains傳一個指定的值,如果原始Observable發射了那個值,它傳回的Observable會發射true,否則發射false。

Observable<String> o1 = Observable.just("a", "b", "c");
Observable<Boolean> observable = o1.contains("a");

observable.subscribe(new Action1<Boolean>() {
    @Override
    public void call(Boolean aBoolean) {
        //true
        Log.i(TAG, "call: "+aBoolean);
    }
}
登入後複製

4.DefaultIfEmpty

DefaultIfEmpty簡單的精確地發射原始Observable的值,如果原始Observable沒有發射任何資料正常終止(以onCompletedd的形式),DefaultIfEmpty返回的Observable就發射一個你提供的預設值。

Observable<String> o1 = Observable
        .create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                //这里没有发送任何onNext事件,而是调用onCompleted()事件
                subscriber.onCompleted();
            }
        });
    //如果上面的o1不发送任何onNext而结束 则发送一个默认的值d
    Observable<String> o2 = o1.defaultIfEmpty("d");
    o2.subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            //打印d
            Log.i(TAG, "call: "+s);
        }
    });
登入後複製

例如在被觀察者存取網路請求的時候,後台伺服器沒有回傳回數據,可以使用一個預設的數據來取代。

5.SequenceEqual

判定兩個Observables是否發射相同的資料序列,它會比較兩個Observable的發射物,如果兩個序列是相同的(相同的數據,相同的順序,相同的終止狀態),它就發射true,否則發射false。

Observable<Integer> o1 = Observable.just(1,2,3);
Observable<Integer> o2 = Observable.just(1,2,3);
Observable<Boolean> observable = Observable.sequenceEqual(o1, o2);

observable.subscribe(new Action1<Boolean>() {
    @Override
    public void call(Boolean aBoolean) {
        Log.i(TAG, "call: "+aBoolean);
    }
});
登入後複製

6.1 SkipUntil

SkipUntil訂閱原始的Observable,但忽略它的發射物,直到第二個Observable發射了一項資料那一刻,它開始發射原始Observable。

Observable<Integer> o1 = Observable.just(1,2,3);
Observable<Integer> o2 = o1.skipUntil(Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        //如果不发送如下信息  则上面的1,2,3都不会发送
        //如果发送了4 则直接发送1,2,3 但是4 5并没有发送
        subscriber.onNext(4);
        subscriber.onNext(5);
    }
}));

o2.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer value) {
        Log.i(TAG, "call: "+value);
    }
});
登入後複製

輸出:

call: 1
call: 2
call: 3
登入後複製

6.2 SkipUntil

SkipWhile訂閱原始的Observable,但忽略它的發射物,直到你指定的某個條件變成false的那一刻,它開始發射原始Observable。

Observable<Integer> o1 = Observable.just(1,2,3,4,5,6);
Observable<Integer> o2 = o1.skipWhile(new Func1<Integer, Boolean>() {
    @Override
    public Boolean call(Integer value) {
        //直到发送的数据"等于"4 才开始从4发送数据 这里打印4,5,6
        return value!=4;
    }
});

o2.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer value) {
        Log.i(TAG, "call: "+value);
    }
});
登入後複製

#輸出:

call: 4
call: 5
call: 6
登入後複製

7.1 TakeUntil

#使用一個takeUntil內部的函數來決定是否終止後面的資料繼續發送。程式碼如下:

Observable<Integer> o1 = Observable.just(1,2,3,4,5,6);
Observable<Integer> o2 = o1.takeUntil(new Func1<Integer, Boolean>() {
    @Override
    public Boolean call(Integer value) {
        //发送数据直到等于4才停止发送
        return value==4;
    }
});

o2.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer value) {
        Log.i(TAG, "call: "+value);
    }
});
登入後複製

上面的程式碼類似:

ArrayList<Integer> datas=new ArrayList<>();
for (int i = 1; i <= 6 ; i++) {
    datas.add(i);
}
int index=0;
while(datas.get(index)<=4){
    Log.i(TAG, "onCreate: "+datas.get(index));
    index++;
}
登入後複製

7.2 TakeWhile

TakeWhile發射原始Observable,直到你指定的某個條件不成立的那一刻,它停止發射原始Observable,並終止自己的Observable。

Observable<Integer> o1 = Observable.just(1,2,3,4,5,6);
Observable<Integer> o2 = o1.takeWhile(new Func1<Integer, Boolean>() {
    @Override
    public Boolean call(Integer integer) {
        //发送数据 直到发送的数据等于才停止发送
        return integer!=5;
    }
});

o2.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer value) {
        Log.i(TAG, "call: "+value);
    }
});
登入後複製

上面的程式碼印出了1~4,類似如下的while語句:

ArrayList<Integer> datas=new ArrayList<>();
for (int i = 1; i <= 6 ; i++) {
    datas.add(i);
}
int index=0;
while(datas.get(index)!=5){
    Log.i(TAG, "onCreate: "+datas.get(index));
    index++;
}
登入後複製


下面展示的函數可用來組合多個Observables。

  1. Merge - 交錯合併多個Observables的發射物

  2. StartWith - 在資料序列的開頭插入指定的項目

  3. #Zip - 根據函數提供的規則合併兩個observable的發射資料,發射資料的總數以發送最小的observable發射個數為準。

8.Merge

合併多個Observables的發射物。使用Merge運算元你可以將多個Observables的輸出合併,就好像它們是一個單一的Observable一樣。

Merge可能會讓合併的Observables發射的資料交錯(有一個類似的運算子Concat不會讓資料交錯,它會依序一個接著一個發射多個Observables的發射物)。

範例程式碼

Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);
Observable<Integer> evens = Observable.just(2, 4, 6);

Observable.merge(odds, evens)
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });
登入後複製

輸出

Next: 1
Next: 3
Next: 5
Next: 2
Next: 4
Next: 6
Sequence complete.
登入後複製

9.StartWith

在資料序列的開頭插入指定的項目

如果你想要一個Observable在發射資料之前先發射一個指定的資料序列,可以使用StartWith操作符。 (如果你想一個Observable發射的資料末尾追加一個資料序列可以使用Concat運算子。)

Observable<String> o1 = Observable.just("模拟网络请求");
Observable<String> o2 = o1.startWith("网络请求之前的准备工作");
o2.subscribe(new Action1<String>() {
    @Override
    public void call(String value) {
        Log.i(TAG, value);
    }
});
登入後複製

输出:

网络请求之前的准备工作
模拟网络请求
登入後複製

类似的函数有:

  • Javadoc: startWith(Iterable))

  • Javadoc: startWith(T)) (最多接受九个参数)

  • Javadoc: startWith(Observable))

10.Zip

假设被观察者o1发送的数据为1 2 ,被观察者o2发送的数据为3,4,5。那么zip压缩发送的数据个数以最低个也就是2个为准。并且 新发送的每个数据以Func2的返回数据为准。

  • Javadoc: zip(Observable1,Observable2,Func2));

示例代码如下:

Observable<Integer> o1 = Observable.just(1,2);
Observable<Integer> o2 = Observable.just(3,4,5);

Observable<Integer> zipObservable = Observable
        .zip(o1, o2, new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer o, Integer o2) {
                return o + o2;
            }
        });

zipObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer s) {
        Log.i(TAG, "call: "+s);
    }
});
登入後複製

输出如下:

call: 4 // 1+3
call: 6   // 2+4
登入後複製


 以上就是深入浅出RxJava_06[传输过程的条件&组合操作]的详情的内容,更多相关内容请关注PHP中文网(www.php.cn)!


來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板