RxJava Operator (3) Filtering
In the previous article, we learned about the conversion operator, which can convert data into the format we want, but if there is What about some data we want to filter out? At this time we need to use the filter operator, which is similar to where in SQL, so that the Observable only returns data that meets our conditions.
1. debounce
The debounce operator plays a role in limiting the flow. It can be understood as a valve. When you half-open the valve, water will flow out at a slower speed. The difference is that the water in the valve will not be wasted, but the data filtered by debounce will be discarded. In Rxjava, this operator is divided into two operators:
throttleWithTimeout
and
debounce. Let’s take a look at throttleWithTimeOut first. As shown in the figure below, this operator limits the flow through time. Each time the source Observable emits a piece of data, it will be timed. If the source Observable has new data before the set time ends, Once emitted, this data will be discarded and timing will be restarted. If data is emitted every time before the timer expires, then this current limit will go to the extreme: only the last data will be emitted. <br><img src="http://www.bkjia.com/uploads/allimg/151205/1151453N3-0.png" style="max-width:90%" style="max-width:90%" alt=""><br><br> First we create an Observable and emit data every 100 milliseconds. When the data to be emitted is a multiple of 3, the next data is delayed to 300 milliseconds before emitting. <br><br><p></p><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="code"><ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>private Observable<Integer> createObserver() {<br /></li><li>return Observable.create(new Observable.OnSubscribe<Integer>() {<br /></li><li>@Override<br /></li><li>public void call(Subscriber<? super Integer> subscriber) {<br /></li><li>for (int i = 0; i < 10; i++) {<br /></li><li>if (!subscriber.isUnsubscribed()) {<br /></li><li>subscriber.onNext(i);<br /></li><li>}<br /></li><li>int sleep = 100;<br /></li><li>if (i % 3 == 0) {<br /></li><li>sleep = 300;<br /></li><li>}<br /></li><li>try {<br /></li><li>Thread.sleep(sleep);<br /></li><li>} catch (InterruptedException e) {<br /></li><li>e.printStackTrace();<br /></li><li>}<br /></li><li><br /></li><li>}<br /></li><li>subscriber.onCompleted();<br /></li><li>}<br /></li><li>}).subscribeOn(Schedulers.computation());<br /></li><li>}</li></ol></pre><div class="contentsignin">Copy after login</div></div> Next, use throttleWithTimeOut to filter this Observable. The filtering time we set is 200 milliseconds, which means that data whose emission interval is less than 200 milliseconds will be filtered out. <br /><br /><p></p><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="code"><ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>private Observable<Integer> throttleWithTimeoutObserver() {<br /></li><li>return createObserver().throttleWithTimeout(200, TimeUnit.MILLISECONDS)<br /></li><li>.observeOn(AndroidSchedulers.mainThread());<br /></li><li><br /></li><li>}</li></ol></pre><div class="contentsignin">Copy after login</div></div> Subscribe to it <br /><br /><p></p><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="code"><ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>mLButton.setText("throttleWithTimeout");<br /></li><li>mLButton.setOnClickListener(e -> throttleWithTimeoutObserver().subscribe(i -> log("throttleWithTimeout:" + i)));</li></ol></pre><div class="contentsignin">Copy after login</div></div> The running results are as follows. You can see that data that is not a multiple of 3 is being emitted. New data will be emitted within the next 200 milliseconds, so it will be filtered out. <br /><img src="http://www.bkjia.com/uploads/allimg/151205/1151453122-1.png" style="max-width:90%" style="max-width:90%" alt="" /><br /> The debounce operator can also use time to filter. In this case, it is used the same as throttleWithTimeOut, but the debounce operator can also limit the flow based on a function. The return value of this function is a temporary Observable. If the source Observable emits new data and the previous data has not ended according to the temporary Observable generated by the function, then the previous data will be filtered out. <br /><img src="http://www.bkjia.com/uploads/allimg/151205/115145L93-2.png" style="max-width:90%" style="max-width:90%" alt="" /><br /> Generate an Observable and use debounce to filter it. Only when the emitted data is an even number, the onCompleted method will be called to indicate that this temporary Observable has terminated. <br /><br /><p></p><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="code"><ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>private Observable<Integer> debounceObserver() {<br /></li><li>return Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).debounce(integer -> {<br /></li><li>log(integer);<br /></li><li>return Observable.create(new Observable.OnSubscribe<Integer>() {<br /></li><li>@Override<br /></li><li>public void call(Subscriber<? super Integer> subscriber) {<br /></li><li>if (integer % 2 == 0 && !subscriber.isUnsubscribed()) {<br /></li><li>log("complete:" + integer);<br /></li><li>subscriber.onNext(integer);<br /></li><li>subscriber.onCompleted();<br /></li><li>}<br /></li><li>}<br /></li><li>});<br /></li><li>})<br /></li><li>.observeOn(AndroidSchedulers.mainThread());<br /></li><li>}</li></ol></pre><div class="contentsignin">Copy after login</div></div> Subscribe to it <br /><br /><p></p><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="code"><ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>mRButton.setOnClickListener(e -> debounceObserver().subscribe(i -> log("debounce:" + i)));</li></ol></pre><div class="contentsignin">Copy after login</div></div> The running results are as follows, you can see that only those data that have called the onCompleted method will be emitted, and the rest will be filtered out. <br /><img src="http://www.bkjia.com/uploads/allimg/151205/1151451P1-3.png" style="max-width:90%" style="max-width:90%" alt="" /><br /><br /> 2. Distinct<br /> The purpose of the Distinct operator is to remove duplicates, which is very easy to understand. As shown in the figure below, all duplicate data will be filtered out. There is also an operator distinctUntilChanged, which is used to filter out consecutive duplicate data. <br /><img src="http://www.bkjia.com/uploads/allimg/151205/1151452R6-4.png" style="max-width:90%" alt="" /><img src="http://www.bkjia.com/uploads/allimg/151205/1151455E3-5.png" style="max-width:90%" alt="" /><br /> Create two Observables and filter them using Distinct and DistinctUtilChanged operators respectively <br /><br /><p></p><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="code"><ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>private Observable<Integer> distinctObserver() {<br /></li><li>return Observable.just(1, 2, 3, 4, 5, 4, 3, 2, 1).distinct();<br /></li><li><br /></li><li>}<br /></li><li><br /></li><li>private Observable<Integer> distinctUntilChangedObserver() {<br /></li><li>return Observable.just(1, 2, 3, 3, 3, 1, 2, 3, 3).distinctUntilChanged();<br /></li><li><br /></li><li>}</li></ol></pre><div class="contentsignin">Copy after login</div></div> Subscribe <br /><br /><p></p><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="code"><ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>mLButton.setText("distinct");<br /></li><li>mLButton.setOnClickListener(e -> distinctObserver().subscribe(i -> log("distinct:" + i)));<br /></li><li>mRButton.setText("UntilChanged");<br /></li><li>mRButton.setOnClickListener(e -> distinctUntilChangedObserver().subscribe(i -> log("UntilChanged:" + i)));</li></ol></pre><div class="contentsignin">Copy after login</div></div> The running results are as shown below. You can see that Distinct filters out all duplicate numbers. 2. DistinctUtilChanged only filters out duplicate numbers. <br /><img src="http://www.bkjia.com/uploads/allimg/151205/11514535Z-6.png" style="max-width:90%" style="max-width:90%" alt="" /><br /><br /> 3 , ElementAt and Filter<br /> These two operators are easy to understand. ElementAt will only return data at the specified location, while Filter will only return data that meets the filtering conditions. The diagrams are as follows <br /><img src="http://www.bkjia.com/uploads/allimg/151205/115145E34-7.png" style="max-width:90%" alt="" /><img src="http://www.bkjia.com/uploads/allimg/151205/1151453D1-8.png" style="max-width:90%" alt="" /><br /> Create two Observable objects and filter them using ElementAt and Filter operators respectively <br /><br /><p></p><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="code"><ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>private Observable<Integer> elementAtObserver() {<br /></li><li>return Observable.just(0, 1, 2, 3, 4, 5).elementAt(2);<br /></li><li>}<br /></li><li><br /></li><li>private Observable<Integer> FilterObserver() {<br /></li><li>return Observable.just(0, 1, 2, 3, 4, 5).filter(i -> i < 3);<br /></li><li>}</li></ol></pre><div class="contentsignin">Copy after login</div></div> Subscribe to them respectively <br /><br /><p></p><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="code"><ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>mLButton.setText("elementAt");<br /></li><li>mLButton.setOnClickListener(e -> elementAtObserver().subscribe(i -> log("elementAt:" + i)));<br /></li><li>mRButton.setText("Filter");<br /></li><li>mRButton.setOnClickListener(e -> FilterObserver().subscribe(i -> log("Filter:" + i)));</li></ol></pre><div class="contentsignin">Copy after login</div></div> 运行结果如下<br /><img src="http://www.bkjia.com/uploads/allimg/151205/1151451453-9.png" style="max-width:90%" style="max-width:90%" alt="" /><br /><br /> 四、First、Last<br /> First操作符只会返回第一条数据,并且还可以返回满足条件的第一条数据。如果你看过我以前的博客,就会发现在我们使用Rxjava实现三级缓存的例子里,就是使用first操作符来选择所要使用的缓存。与First相反,Last操作符只返回最后一条满足条件的数据。<br /><img src="http://www.bkjia.com/uploads/allimg/151205/115145DO-10.png" style="max-width:90%" alt="" /><img src="http://www.bkjia.com/uploads/allimg/151205/115145L02-11.png" style="max-width:90%" alt="" /><br /> 另外还有一个BlockingObservable方法,这个方法不会对Observable做任何处理,只会阻塞住,当满足条件的数据发射出来的时候才会返回一个BlockingObservable对象。可以使用<code style="box-sizing:border-box;padding:2px 4px;border-radius:4px;white-space:normal;">Observable.toBlocking
或者
BlockingObservable.from
方法来将一个Observable对象转化为BlockingObservable对象。BlockingObservable可以和first操作符进行配合使用。
创建两个Observable对象并分别使用first操作符进行处理
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>private Observable<Integer> FirstObserver() {<br /></li><li>return Observable.just(0, 1, 2, 3, 4, 5).first(i -> i > 1);<br /></li><li>}<br /></li><li><br /></li><li>private BlockingObservable<Integer> FilterObserver() {<br /></li><li>return Observable.create(new Observable.OnSubscribe<Integer>() {<br /></li><li>@Override<br /></li><li>public void call(Subscriber<? super Integer> subscriber) {<br /></li><li>for (int i = 0; i < 5; i++) {<br /></li><li>try {<br /></li><li>Thread.sleep(500);<br /></li><li>} catch (InterruptedException e) {<br /></li><li>e.printStackTrace();<br /></li><li>}<br /></li><li>if (!subscriber.isUnsubscribed()) {<br /></li><li>log("onNext:" + i);<br /></li><li>subscriber.onNext(i);<br /></li><li>}<br /></li><li>}<br /></li><li>if (!subscriber.isUnsubscribed()) {<br /></li><li>subscriber.onCompleted();<br /></li><li>}<br /></li><li>}<br /></li><li>}).toBlocking();<br /></li><li>}</li></ol>
Copy after login
分别进行订阅
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>mLButton.setText("First");<br /></li><li>mLButton.setOnClickListener(e -> FirstObserver().subscribe(i -> log("First:" + i)));<br /></li><li>mRButton.setText(" Blocking");<br /></li><li>mRButton.setOnClickListener(e -> {<br /></li><li>log("blocking:" + FilterObserver().first(i -> i > 1));<br /></li><li>});</li></ol>
Copy after login
运行结果如下。可以看到first操作符返回了第一个大于1的数2,而BlockingObservable则一直阻塞着,直到第一个大于1的数据发射出来。
五、Skip、Take
Skip操作符将源Observable发射的数据过滤掉前n项,而Take操作符则只取前n项,理解和使用起来都很容易,但是用处很大。另外还有SkipLast和TakeLast操作符,分别是从后面进行过滤操作。
创建两个Observable并分别使用skip和take操作符对其进行过滤操作
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>private Observable<Integer> skipObserver() {<br /></li><li>return Observable.just(0, 1, 2, 3, 4, 5).skip(2);<br /></li><li>}<br /></li><li>private Observable<Integer> takeObserver() {<br /></li><li>return Observable.just(0, 1, 2, 3, 4, 5).take(2);<br /></li><li>}</li></ol>
Copy after login
分别进行订阅
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>mLButton.setText("Skip");<br /></li><li>mLButton.setOnClickListener(e -> skipObserver().subscribe(i -> log("Skip:" + i)));<br /></li><li>mRButton.setText("Take");<br /></li><li>mRButton.setOnClickListener(e -> takeObserver().subscribe(i -> log("Take:" + i)));</li></ol>
Copy after login
运行结果如下,可以看到skip过滤掉了前两项,而take则过滤掉了除了前两项的其他所有项。
六、Sample、ThrottleFirst
Sample操作符会定时地发射源Observable最近发射的数据,其他的都会被过滤掉,等效于ThrottleLast操作符,而ThrottleFirst操作符则会定期发射这个时间段里源Observable发射的第一个数据
我们创建一个Observable每隔200毫秒发射一个数据,然后分别使用sample和throttleFirst操作符对其进行过滤
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>private Observable<Integer> sampleObserver() {<br /></li><li>return createObserver().sample(1000, TimeUnit.MILLISECONDS);<br /></li><li>}<br /></li><li><br /></li><li>private Observable<Integer> throttleFirstObserver() {<br /></li><li>return createObserver().throttleFirst(1000, TimeUnit.MILLISECONDS);<br /></li><li>}<br /></li><li><br /></li><li><br /></li><li>private Observable<Integer> createObserver() {<br /></li><li>return Observable.create(new Observable.OnSubscribe<Integer>() {<br /></li><li>@Override<br /></li><li>public void call(Subscriber<? super Integer> subscriber) {<br /></li><li>for (int i = 0; i < 20; i++) {<br /></li><li>try {<br /></li><li>Thread.sleep(200);<br /></li><li>} catch (InterruptedException e) {<br /></li><li>e.printStackTrace();<br /></li><li>}<br /></li><li>subscriber.onNext(i);<br /></li><li>}<br /></li><li>subscriber.onCompleted();<br /></li><li>}<br /></li><li>});<br /></li><li>}</li></ol>
Copy after login
分别进行订阅
<ol style="margin:0 1px 0 0px;padding-left:40px;" start="1" class="dp-css"><li>mLButton.setText("sample");<br /></li><li>mLButton.setOnClickListener(e -> sampleObserver().subscribe(i -> log("sample:" + i)));<br /></li><li>mRButton.setText("throttleFirst");<br /></li><li>mRButton.setOnClickListener(e -> throttleFirstObserver().subscribe(i -> log("throttleFirst:" + i)));</li></ol>
Copy after login
运行结果如下,可以看到sample操作符会每隔5个数字发射出一个数据来,而throttleFirst则会每隔5个数据发射第一个数据。
本文的demo程序见github:https://github.com/Chaoba/RxJavaDemo
http://www.bkjia.com/PHPjc/1077810.htmlwww.bkjia.comtruehttp://www.bkjia.com/PHPjc/1077810.htmlTechArticleRxJava操作符(三)Filtering 在上一篇文章里,我们了解了转化操作符,能将数据转化为我们想要的格式,但是如果数据集合里面有一些我们想...