- package it.dmi.csgtasks.service;
-
- import org.junit.jupiter.api.Test;
- import reactor.core.Disposable;
- import reactor.core.publisher.BaseSubscriber;
- import reactor.core.publisher.Flux;
- import reactor.core.publisher.FluxSink;
- import reactor.core.publisher.Sinks;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.util.concurrent.atomic.AtomicReference;
- //import reactor.test.StepVerifier;
-
-
- public class FluxTestTest {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(FluxTestTest.class.getName());
- private static final String TEST_MESSAGE = "Test";
- private Sinks.Many<String> restSink;
- private Disposable subscription;
- private String prefixThread;
- public FluxTestTest() {
- this.restSink = Sinks.many().multicast().onBackpressureBuffer();
-
- this.prefixThread = Thread.currentThread().getName();
- }
-
- @Test
- void shouldNotCancelSharedWhenSubscriberThrowsExceptionAndErrorHandlerIsAbsent() {
- final AtomicReference<FluxSink<String>> producer = new AtomicReference<>();
-
- final Flux<String> flux = restSink.asFlux();
-
- restSink.tryEmitNext("VAL1");
- restSink.tryEmitNext("VAL2");
- restSink.tryEmitNext("VAL3");
-
-
- flux
- .log()
- .subscribe(o -> { throw new RuntimeException("error"); });
-
- restSink.tryEmitNext("VAL4");
- restSink.tryEmitNext("VAL5");
- restSink.tryEmitNext("VAL6");
-
- flux
- .log()
- .subscribe(o -> LOGGER.info("Second client : {}", o));
-
- }
- }
-
- /*
- OUTPUT
-
-
- 2023-04-24 17:50:24.502 INFO --- [ main] reactor.Flux.EmitterProcessor.1 : onSubscribe(EmitterProcessor.EmitterInner)
- 2023-04-24 17:50:24.503 INFO --- [ main] reactor.Flux.EmitterProcessor.1 : request(unbounded)
- 2023-04-24 17:50:24.505 INFO --- [ main] reactor.Flux.EmitterProcessor.1 : onNext(VAL1)
- 2023-04-24 17:50:24.509 INFO --- [ main] reactor.Flux.EmitterProcessor.1 : cancel()
- 2023-04-24 17:50:24.510 ERROR --- [ main] reactor.core.publisher.Operators : Operator called default onErrorDropped
-
- reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: error
- Caused by: java.lang.RuntimeException: error
-
- 2023-04-24 17:50:24.511 INFO --- [ main] reactor.Flux.EmitterProcessor.2 : onSubscribe(EmitterProcessor.EmitterInner)
- 2023-04-24 17:50:24.511 INFO --- [ main] reactor.Flux.EmitterProcessor.2 : request(unbounded)
- 2023-04-24 17:50:24.512 INFO --- [ main] reactor.Flux.EmitterProcessor.2 : onComplete()
- */
-
- WHY MY SECOND SUBSCRIBER CAN'T FETCH DATA IF FIRST THROWS ERROR?