Snippet content copied to clipboard.
Are you sure to delete this snippet? No, don't delete
  1. package it.dmi.csgtasks.service;
  2. import org.junit.jupiter.api.Test;
  3. import reactor.core.Disposable;
  4. import reactor.core.publisher.BaseSubscriber;
  5. import reactor.core.publisher.Flux;
  6. import reactor.core.publisher.FluxSink;
  7. import reactor.core.publisher.Sinks;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import java.util.concurrent.atomic.AtomicReference;
  11. //import reactor.test.StepVerifier;
  12. public class FluxTestTest {
  13. private static final Logger LOGGER = LoggerFactory.getLogger(FluxTestTest.class.getName());
  14. private static final String TEST_MESSAGE = "Test";
  15. private Sinks.Many<String> restSink;
  16. private Disposable subscription;
  17. private String prefixThread;
  18. public FluxTestTest() {
  19. this.restSink = Sinks.many().multicast().onBackpressureBuffer();
  20. this.prefixThread = Thread.currentThread().getName();
  21. }
  22. @Test
  23. void shouldNotCancelSharedWhenSubscriberThrowsExceptionAndErrorHandlerIsAbsent() {
  24. final AtomicReference<FluxSink<String>> producer = new AtomicReference<>();
  25. final Flux<String> flux = restSink.asFlux();
  26. restSink.tryEmitNext("VAL1");
  27. restSink.tryEmitNext("VAL2");
  28. restSink.tryEmitNext("VAL3");
  29. flux
  30. .log()
  31. .subscribe(o -> { throw new RuntimeException("error"); });
  32. restSink.tryEmitNext("VAL4");
  33. restSink.tryEmitNext("VAL5");
  34. restSink.tryEmitNext("VAL6");
  35. flux
  36. .log()
  37. .subscribe(o -> LOGGER.info("Second client : {}", o));
  38. }
  39. }
  40. /*
  41. OUTPUT
  42. 2023-04-24 17:50:24.502 INFO --- [ main] reactor.Flux.EmitterProcessor.1 : onSubscribe(EmitterProcessor.EmitterInner)
  43. 2023-04-24 17:50:24.503 INFO --- [ main] reactor.Flux.EmitterProcessor.1 : request(unbounded)
  44. 2023-04-24 17:50:24.505 INFO --- [ main] reactor.Flux.EmitterProcessor.1 : onNext(VAL1)
  45. 2023-04-24 17:50:24.509 INFO --- [ main] reactor.Flux.EmitterProcessor.1 : cancel()
  46. 2023-04-24 17:50:24.510 ERROR --- [ main] reactor.core.publisher.Operators : Operator called default onErrorDropped
  47. reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: error
  48. Caused by: java.lang.RuntimeException: error
  49. 2023-04-24 17:50:24.511 INFO --- [ main] reactor.Flux.EmitterProcessor.2 : onSubscribe(EmitterProcessor.EmitterInner)
  50. 2023-04-24 17:50:24.511 INFO --- [ main] reactor.Flux.EmitterProcessor.2 : request(unbounded)
  51. 2023-04-24 17:50:24.512 INFO --- [ main] reactor.Flux.EmitterProcessor.2 : onComplete()
  52. */
  53. WHY MY SECOND SUBSCRIBER CAN'T FETCH DATA IF FIRST THROWS ERROR?

Edit this Snippet