Introduction
Implementing hot observable can be achieved with RxJava's PublishSubject. Moreover, we will try to publish data to the subject concurrently safely (i.e. using the synchronized keyword). Thus the complete code at the bottom uses several threads for publishing data and several threads for subscription.
Creating an Instance of PublishSubject
A simple way to create an instance of the PublishSubject instance is by invoking the static method create like the following snippet (see the complete code at the bottom).
Snippet 1 - Invoking the Static Method Create
PublishSubject subject = PublishSubject.create();
Sending Data to PublishSubject
Sending data to a PublishSubject instance can be done by calling its onNext method. Place the actual call in a synchronized block if we are sending data concurrently like the following snippet (see the complete code at the bottom):
Snippet 2 - Sending Data to PublishSubject Instance
synchronized (subject) { subject.onNext(String.valueOf(strItem)); }
Subscribing to PublishSubject
To listen to any of the data sent to a PublishSubject instance we can use one of the subscribe method. In addition, we can also opt to use the computation Scheduler (i.e. normally the Scheduler is the one responsible to managing threads in a multi-threaded environment.) via the observeOn method (i.e. for some reason the subscribeOn method is not working with PublishSubject instance.) like the following snippet (see the complete code at the bottom):
Snippet 3 - Subscribing to PublishSubject Instance
subject.observeOn(Schedulers.computation()) .subscribe(___item -> { System.out.println(_name + " ThreadID:" + Thread.currentThread().getId() + ": " + ___item); });
The Complete Code
package xyz.ronella.reactive; import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.stream.IntStream; public class PublishedObservable { public static void main(String ... args) { final PublishSubject subject = PublishSubject.create(); List publishers = new ArrayList<>(); List subscribers = new ArrayList<>(); int defaultThreadCount = 3; ExecutorService executorPublisher = Executors.newFixedThreadPool(defaultThreadCount); ExecutorService executorSubscriber = Executors.newFixedThreadPool(defaultThreadCount); class LocalPublisher implements Runnable { private long _item; private String _name; private PublishSubject _subject; public LocalPublisher(String name, long itemStart, PublishSubject subject) { this._name = name; this._item = itemStart; this._subject = subject; } @Override public void run() { try { while(true) { Thread.sleep(1000); String strItem = String.valueOf(++_item); System.out.println("\n" + _name + " ThreadID:" + Thread.currentThread().getId() + ": " + strItem); synchronized (_subject) { _subject.onNext(String.valueOf(strItem)); } } } catch (InterruptedException e) { System.out.println(_name + " interrupted."); } } } class LocalSubscriber implements Runnable { private String _name; private PublishSubject _subject; public LocalSubscriber(String name, PublishSubject subject) { this._name = name; this._subject = subject; } @Override public void run() { _subject.observeOn(Schedulers.computation()) .subscribe(___item -> { System.out.println(_name + " ThreadID:" + Thread.currentThread().getId() + ": " + ___item); }); } } IntStream.rangeClosed(1, 3).forEach(___idx -> subscribers.add(executorSubscriber.submit(new LocalSubscriber("Subscriber " + ___idx, subject)))); IntStream.rangeClosed(1, 6).forEach(___idx -> publishers.add(executorPublisher.submit(new LocalPublisher("Publisher " + ___idx,___idx * 100, subject)))); subject.subscribe( ___item -> System.out.println("Main - ThreadID:" + Thread.currentThread().getId() + " " + ___item)); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } executorPublisher.shutdownNow(); executorSubscriber.shutdown(); } }
Recent Comments