Java 9 supports the creation of reactive streams by introducing some interfaces: Publisher, Subscriber, Subscription and the SubmissionPublisher class that implements the Publisher interface. Each interface can play a different role according to the principles of Reactive Streaming.
We can use the Subscriber interface to subscribe to the data published by publisher. We need to implement the Subscriber interface and provide implementations for abstract methods.
import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; import java.util.stream.IntStream; public class SubscriberImplTest { public static class Subscriber implements <strong>Flow.Subscriber<Integer></strong> { private <strong>Flow.Subscription</strong> subscription; private boolean isDone; <strong> @Override</strong> public void <strong>onSubscribe</strong>(Flow.Subscription subscription) { System.out.println("Subscribed"); this.subscription = subscription; this.subscription.request(1); } <strong>@Override</strong> public void <strong>onNext</strong>(Integer item) { System.out.println("Processing " + item); this.subscription.request(1); } <strong>@Override</strong> public void <strong>onError</strong>(Throwable throwable) { throwable.printStackTrace(); } <strong>@Override</strong> public void <strong>onComplete()</strong> { System.out.println("Processing done"); isDone = true; } } public static void main(String args[]) throws InterruptedException { <strong>SubmissionPublisher<Integer></strong> publisher = new <strong>SubmissionPublisher<></strong><strong>()</strong>; <strong>Subscriber </strong>subscriber = new <strong>Subscriber()</strong>; publisher.subscribe(subscriber); <strong>IntStream</strong> intData = <strong>IntStream.rangeClosed</strong>(1, 10); intData.forEach(<strong>publisher::submit</strong>); publisher.<strong>close()</strong>; while(!subscriber.isDone) { Thread.sleep(10); } System.out.println("Done"); } }
<strong>Subscribed Processing 1 Processing 2 Processing 3 Processing 4 Processing 5 Processing 6 Processing 7 Processing 8 Processing 9 Processing 10 Processing done Done</strong>
The above is the detailed content of How do we implement Subscriber interface in Java 9?. For more information, please follow other related articles on the PHP Chinese website!