Reactive stream is gaining traction in the mainstream programming and java has its own implementation via the Flow API. Popular reactive stream implementations are RxJava, Reactor and Akka.
import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; import java.util.stream.IntStream; public class Main { /** * Sample subscriber implementation. */ public static class Subscriber implements Flow.Subscriber<Integer> { /** * Holds an instance of Flow.Subscription instance so that we can request what we can handle. */ private Flow.Subscription subscription; /** * Tracks if the publisher was closed. */ private boolean isDone; /** * Triggered on the initial subscription. * @param subscription An instance of Flow.Subscription. */ @Override public void onSubscribe(Flow.Subscription subscription) { System.out.println("Subscribed"); this.subscription = subscription; this.subscription.request(1); } /** * Do the actual processing. * @param item The actual item currently being processed. */ @Override public void onNext(Integer item) { System.out.println("Processing " + item); this.subscription.request(1); } /** * Holds how to handle error. * @param throwable An instance of Throwable. */ @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } /** * Called with the publisher was closed or completed. */ @Override public void onComplete() { System.out.println("Processing done."); isDone = true; } } public static void main(String[] args) throws InterruptedException { //The publisher of the data. SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>(); //The sample subscriber implementation. Subscriber subscriber = new Subscriber(); //Register a subscriber. publisher.subscribe(subscriber); //The sample stream to process. var intData = IntStream.rangeClosed(1, 10); //Publish the stream data. intData.forEach(publisher::submit); //The publisher is done. publisher.close(); //Since this is processing is asynchronous wait for everything to be processed. while(!subscriber.isDone) { Thread.sleep(10); } System.out.println("Done"); } }
Leave a Reply