Home > Java > javaTutorial > How do we implement Subscriber interface in Java 9?

How do we implement Subscriber interface in Java 9?

WBOY
Release: 2023-09-04 13:33:07
forward
791 people have browsed it

在Java 9中,我们如何实现Subscriber接口?

Java 9 supports the creation of reactive streams by introducing some interfaces: Publisher, Subscriber, Subscription and the SubmissionPublisher class that implements the Publisher interface. Each interface can play a different role according to the principles of Reactive Streaming.

We can use the Subscriber interface to subscribe to the data published by publisher. We need to implement the Subscriber interface and provide implementations for abstract methods.

Methods of the Flow.Subscriber interface:

  • onComplete(): This method is called when the Publisher object completes its role.
  • onError(): This method is called when the Publisher has a problem and notifies the Subscriber.
  • onNext(): This method is called when the Publisher has new information to notify all Subscribers.
  • onSubscribe(): This method is called when Publisher adds Subscriber.

Example

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.stream.IntStream;

public class SubscriberImplTest {
   public static class Subscriber implements <strong>Flow.Subscriber<Integer></strong> {
      private <strong>Flow.Subscription</strong> subscription;
      private boolean isDone;
      
     <strong> @Override</strong>
      public void <strong>onSubscribe</strong>(Flow.Subscription subscription) {
         System.out.println("Subscribed");
         this.subscription = subscription;
         this.subscription.request(1);
      }
      <strong>@Override</strong>
      public void <strong>onNext</strong>(Integer item) {
         System.out.println("Processing " + item);
         this.subscription.request(1);
      }
      <strong>@Override</strong>
      public void <strong>onError</strong>(Throwable throwable) {
         throwable.printStackTrace();
      }
      <strong>@Override</strong>
      public void <strong>onComplete()</strong> {
         System.out.println("Processing done");
         isDone = true;
      }
   }
   public static void main(String args[]) throws InterruptedException {
      <strong>SubmissionPublisher<Integer></strong> publisher = new <strong>SubmissionPublisher<></strong><strong>()</strong>;
      <strong>Subscriber </strong>subscriber = new <strong>Subscriber()</strong>;
      publisher.subscribe(subscriber);
      <strong>IntStream</strong> intData = <strong>IntStream.rangeClosed</strong>(1, 10);
      intData.forEach(<strong>publisher::submit</strong>);
      publisher.<strong>close()</strong>;
      while(!subscriber.isDone) {
         Thread.sleep(10);
      }
      System.out.println("Done");
   }
}
Copy after login

Output

<strong>Subscribed
Processing 1
Processing 2
Processing 3
Processing 4
Processing 5
Processing 6
Processing 7
Processing 8
Processing 9
Processing 10
Processing done
Done</strong>
Copy after login

The above is the detailed content of How do we implement Subscriber interface in Java 9?. For more information, please follow other related articles on the PHP Chinese website!

source:tutorialspoint.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template