android - rxbus连续发送消息报错
怪我咯
怪我咯 2017-04-17 17:51:28
0
2
1450

我用rxbus连续发送大量消息报错

08-26 09:37:13.458 10637-10637/com.dituwuyou E/AndroidRuntime: FATAL EXCEPTION: main

                                                           Process: com.dituwuyou, PID: 10637
                                                           java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
                                                               at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:112)
                                                               at android.os.Handler.handleCallback(Handler.java:739)
                                                               at android.os.Handler.dispatchMessage(Handler.java:95)
                                                               at android.os.Looper.loop(Looper.java:148)
                                                               at android.app.ActivityThread.main(ActivityThread.java:5417)
                                                               at java.lang.reflect.Method.invoke(Native Method)
                                                               at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)
                                                               at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)
                                                            Caused by: rx.exceptions.OnErrorNotImplementedException: PublishSubject: could not emit value due to lack of requests
                                                               at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
                                                               at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
                                                               at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
                                                               at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:152)
                                                               at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
                                                               at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276)
                                                               at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219)
                                                               at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107)
                                                               at android.os.Handler.handleCallback(Handler.java:739) 
                                                               at android.os.Handler.dispatchMessage(Handler.java:95) 
                                                               at android.os.Looper.loop(Looper.java:148) 
                                                               at android.app.ActivityThread.main(ActivityThread.java:5417) 
                                                               at java.lang.reflect.Method.invoke(Native Method) 
                                                               at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726) 
                                                               at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616) 
                                                            Caused by: rx.exceptions.MissingBackpressureException: PublishSubject: could not emit value due to lack of requests
                                                               at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:308)
                                                               at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)
                                                               at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
                                                               at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
                                                               at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
                                                               at com.dituwuyou.widget.rxjava.RxBus.send(RxBus.java:45)
                                                               at com.dituwuyou.joint.CoorSocketService.messageReceived(CoorSocketService.java:51)
                                                               at com.dituwuyou.fayeclient.FayeClient.parseFayeMessage(FayeClient.java:535)
                                                               at com.dituwuyou.fayeclient.FayeClient.onMessage(FayeClient.java:390)
                                                               at com.dituwuyou.fayeclient.HybiParser.emitFrame(HybiParser.java:304)
                                                               at com.dituwuyou.fayeclient.HybiParser.start(HybiParser.java:130)
                                                               at com.dituwuyou.fayeclient.WebSocketClient$1.run(WebSocketClient.java:119)
                                                               at java.lang.Thread.run(Thread.java:818)

我的rxbus是这样定义的

import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

/**
 * Created by xg on 2016/6/24.
 * 消息传递(替换handler,eventbus)
 */
public class RxBus {
    private static volatile RxBus mInstance;
    private final Subject bus;

    public RxBus() {
        bus = new SerializedSubject<>(PublishSubject.create());
    }

    /**
     * 单例模式RxBus
     *
     * @return
     */
    public static RxBus getRxBusSingleton() {
        RxBus rxBus2 = mInstance;
        if (mInstance == null) {
            synchronized (RxBus.class) {
                rxBus2 = mInstance;
                if (mInstance == null) {
                    rxBus2 = new RxBus();
                    mInstance = rxBus2;
                }
            }
        }
        return rxBus2;
    }

    /**
     * 发送消息
     *
     * @param object
     */
    public void send(Object object) {
        bus.onNext(object);
    }

    /**
     * 接收消息
     *
     * @return
     */
    public Observable toObserverable() {
        return bus;
    }
}

这是第45行
bus.onNext(object);
应该是出现背压了

怪我咯
怪我咯

走同样的路,发现不同的人生

reply all(2)
迷茫

1. I don’t understand why you need to add rxBus2
2. You need to handle the onError situation
3. Register first and then send

伊谢尔伦

What version of RxJava are you using? There is no rx.subjects.PublishSubject$PublishSubjectProducer class in version 1.1.6

Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template