我用rxbus连续发送大量消息报错
08-26 09:37:13.458 10637-10637/com.dituwuyou E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.dituwuyou, PID: 10637
java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:112)
at android.os.Handler.handleCallback(Handler.java:739)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:148)
at android.app.ActivityThread.main(ActivityThread.java:5417)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)
Caused by: rx.exceptions.OnErrorNotImplementedException: PublishSubject: could not emit value due to lack of requests
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:152)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219)
at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107)
at android.os.Handler.handleCallback(Handler.java:739)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:148)
at android.app.ActivityThread.main(ActivityThread.java:5417)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)
Caused by: rx.exceptions.MissingBackpressureException: PublishSubject: could not emit value due to lack of requests
at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:308)
at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)
at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
at com.dituwuyou.widget.rxjava.RxBus.send(RxBus.java:45)
at com.dituwuyou.joint.CoorSocketService.messageReceived(CoorSocketService.java:51)
at com.dituwuyou.fayeclient.FayeClient.parseFayeMessage(FayeClient.java:535)
at com.dituwuyou.fayeclient.FayeClient.onMessage(FayeClient.java:390)
at com.dituwuyou.fayeclient.HybiParser.emitFrame(HybiParser.java:304)
at com.dituwuyou.fayeclient.HybiParser.start(HybiParser.java:130)
at com.dituwuyou.fayeclient.WebSocketClient$1.run(WebSocketClient.java:119)
at java.lang.Thread.run(Thread.java:818)
我的rxbus是这样定义的
import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
/**
* Created by xg on 2016/6/24.
* 消息传递(替换handler,eventbus)
*/
public class RxBus {
private static volatile RxBus mInstance;
private final Subject bus;
public RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
/**
* 单例模式RxBus
*
* @return
*/
public static RxBus getRxBusSingleton() {
RxBus rxBus2 = mInstance;
if (mInstance == null) {
synchronized (RxBus.class) {
rxBus2 = mInstance;
if (mInstance == null) {
rxBus2 = new RxBus();
mInstance = rxBus2;
}
}
}
return rxBus2;
}
/**
* 发送消息
*
* @param object
*/
public void send(Object object) {
bus.onNext(object);
}
/**
* 接收消息
*
* @return
*/
public Observable toObserverable() {
return bus;
}
}
这是第45行
bus.onNext(object);
应该是出现背压了
1. I don’t understand why you need to add rxBus2
2. You need to handle the onError situation
3. Register first and then send
What version of RxJava are you using? There is no rx.subjects.PublishSubject$PublishSubjectProducer class in version 1.1.6