- // SUBSCRIBER
-
- @Async("singleThreadAsyncExecutor")
- @Transactional(propagation = Propagation.REQUIRED, rollbackFor = Throwable.class)
- public void startExportDirezioniNew(Principal principal, @Nullable DirezioniSearchDTO filters, HttpHeaders headers) {
-
- log.info("startExportDirezioni thread [START]");
-
- File csvTemp = TempFilenameGenerator.getDocumentaleTempFile("exportCsvDirezioni","txt").toFile();
- csvTemp.deleteOnExit();
-
-
- Disposable disposable = exportRestEmitter.dataExportObserver().subscribe(taskData -> {
-
- taskProgressDelegate.saveTaskWithProgress(taskData.getPrincipalName(), taskData.getTaskName(), taskData.getProgress(), taskData.getTaskStatusEnum());
- // write csv file
- // controlla vada in append
-
- log.info("Appending data {} to csv file: {}", taskData.getDirezioni(), csvTemp);
- csvHelperd.exportDirezioni(taskData.getDirezioni(), csvTemp, DirezioniDTO.class); // ERROR HERE NO CLASS DEF FOUND,
- // AND SUBSEQUENTS CALL TO THiS METHOD WON'T EVER REACH HERE, BUT PUBLISHER KEEP PUBLISHING
-
- }, error -> {
- log.error("Generic error in exportRestEmitter {}", error);
- taskProgressDelegate.saveTaskWithProgress(principal.getName(), "exportCsv", 0, TaskStatusEnum.KO);
- }, () -> {
-
- log.info("Transferring csv file {} to remote {} ", csvTemp, fileManagerHelper.getRemoteHostUri());
- if (fileManagerHelper.putLocalFileToRemote(csvTemp.toPath(), Paths.get(principal.getName(),"exportCsv_" + Utils.getTimestamp()))) {
- sendNotificationToQueue(principal);
- } else {
- log.error("File {} could not be transfered to remote ", csvTemp.toPath());
- }
- });
-
- exportRestEmitter.readDirezioneDataFromRest(principal, filters, headers);
-
- log.info("startExportDirezioni thread [COMPLETE]");
- }
-
-
-
-
- // PUBLISHER
-
-
- package it.dmi.csgtasks.service;
-
- import it.dmi.csgproject.csgcommon.dto.DirezioniDTO;
- import it.dmi.csgproject.csgcommon.dto.DirezioniResultDTO;
- import it.dmi.csgproject.csgcommon.dto.DirezioniSearchDTO;
- import it.dmi.csgtasks.config.TasksProperties;
- import it.dmi.csgtasks.domain.model.TaskStatusEnum;
- import it.dmi.csgtasks.web.rest.client.ResilientRestTemplateClient;
- import lombok.AllArgsConstructor;
- import lombok.Builder;
- import lombok.Data;
- import lombok.NoArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.http.NameValuePair;
- import org.apache.http.client.utils.URLEncodedUtils;
- import org.apache.http.message.BasicNameValuePair;
- import org.reactivestreams.Publisher;
- import org.springframework.context.annotation.Scope;
- import org.springframework.core.ParameterizedTypeReference;
- import org.springframework.http.HttpHeaders;
- import org.springframework.http.ResponseEntity;
- import org.springframework.stereotype.Component;
- import org.springframework.web.context.annotation.RequestScope;
- import reactor.core.publisher.*;
-
- import javax.annotation.Nullable;
- import java.nio.charset.StandardCharsets;
- import java.security.Principal;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.function.Function;
-
-
- /**
- * usage:
- *
- * ExportRestEmitter emitter = new ExportRestEmitter();
- * emitter.dataExportObserver().subscribe(next -> { writeCsvFile(x); saveTask(Progress) }, error -> {
- * saveTask(Error);
- * });
- * emitter.readDirezioneDataFromRest();
- *
- * class useful for attaching business logic while reading paginated data from export rest endpoint
- *
- * @see it.dmi.csgtasks.service.FluxServer for test case
- */
- @Component
- @Slf4j
- public class ExportRestEmitter {
-
- private Flux<TaskData> sink;
- private Sinks.Many<TaskData> restSink;
- private final ResilientRestTemplateClient restTemplateClient;
- private final TasksProperties tasksProperties;
-
- public ExportRestEmitter( ResilientRestTemplateClient restTemplateClient, TasksProperties tasksProperties) {
-
- this.restTemplateClient = restTemplateClient;
- this.tasksProperties = tasksProperties;
-
- initObserver();
-
-
- // https://stackoverflow.com/questions/64715890/restarting-inifinite-flux-on-error-with-pubsubreactivefactory
- // per ovviare al problema che il bean e' singleton, dovrebbe essere request scoped, altrimenti quando viene completato il task viene completato
- // il flux e i successivi task non vengono avviati, genero un flux che viene restartato automaticamente on error / on complete
- // l'alternativa e' markare il bean come request scoped, ma sono dentro un thread
- // Function<Throwable, Publisher<TaskData>> recoverFromThrow = throwable -> this.restSink.asFlux();
- // var recoveringFromThrowFlux =
- // this.restSink.asFlux().onErrorResume(recoverFromThrow);
- // this.sink = Flux.<Flux<TaskData>>generate((sink) -> sink.next(recoveringFromThrowFlux))
- // .flatMap(flux -> flux);
-
- }
-
-
- public void readDirezioneDataFromRest(Principal principal, @Nullable DirezioniSearchDTO filters, HttpHeaders headers) {
-
- log.info("readDirezioneDataFromRest Start");
-
-
- try {
-
- ResponseEntity<Long> response = restTemplateClient.callPost(
- "csgcore",
- "/api/direzioni/count",
- headers,
- filters,
- new ParameterizedTypeReference<Long>() {}
- );
-
- if (!response.getStatusCode().is2xxSuccessful()) {
-
- publish(TaskData.builder()
- .principalName(principal.getName())
- .taskName("exportCsv")
- .progress(0)
- .taskStatusEnum(TaskStatusEnum.KO)
- .build());
-
- throw new RuntimeException("rest call api to fetch direzioni count failed, abort csv");
- }
-
- Long totalElements = response.getBody();
- StepData stepData = calculatePageSizeAndStep(totalElements.intValue());
- int pageSize = stepData.getPageSize();
- int numSteps = stepData.getSteps();
- log.info("total records direzioni to export: {}\n pageSize: {} total steps/query to fetch paginated results: {}", totalElements, pageSize, numSteps);
-
-
- for (int step = 1; step <= numSteps; step++) {
-
- ResponseEntity<DirezioniResultDTO> resp = preparePaginatedDirezioniCall(filters, headers, step, pageSize);
-
- // normalizza progress in base a numSteps e step (idealmente va da 0 a 100)
- int progress = Double.valueOf(Math.floor(step * 100 / numSteps)).intValue();
-
- if (!resp.getStatusCode().is2xxSuccessful()) {
- publish(TaskData.builder()
- .principalName(principal.getName())
- .taskName("exportCsv")
- .progress(0)
- .taskStatusEnum(TaskStatusEnum.KO)
- .build());
-
- throw new RuntimeException("rest call api to fetch direzioni data failed, abort csv");
- }
-
- log.info("sending progress value: {} to user {} for current step: {}", progress, principal.getName(), step);
- publish(TaskData.builder()
- .principalName(principal.getName())
- .taskName("exportCsv")
- .progress(progress)
- .taskStatusEnum(TaskStatusEnum.IN_PROGRESS)
- .direzioni(resp.getBody().getDirezioni())
- .build());
- }
- } catch (Exception e) {
- log.error("readDirezioneDataFromRest Error", e);
- }
-
- complete();
-
- log.info("readDirezioneDataFromRest End");
-
- }
-
- public Flux<TaskData> dataExportObserver() {
- // return this.sink;
- return this.restSink.asFlux().doOnComplete(() -> {
- this.restSink = Sinks.many().multicast().onBackpressureBuffer();
- log.info("\n\n INIT OBSERVER ON COMPLETE \n\n");
- }).doOnError(throwable -> {
- log.info("\n\n INIT OBSERVER ON ERROR \n\n");
- Sinks.many().multicast().onBackpressureBuffer();
- });
-
-
- }
-
- public void initObserver() {
- log.info("\n\n INIT OBSERVER \n\n");
- this.restSink = Sinks.many().multicast().onBackpressureBuffer();
- }
- public void publish(TaskData value) {
- restSink.tryEmitNext(value);
- }
-
- public void complete() {
- restSink.tryEmitComplete();
- }
-
-
-
-
-
- private ResponseEntity<DirezioniResultDTO> preparePaginatedDirezioniCall(@Nullable DirezioniSearchDTO filters, HttpHeaders headers, int numSteps, int pageSize) {
- String url = "/api/direzioni/";
-
- url += (filters != null) ? "search?" : "get?";
-
- List<NameValuePair> params = new ArrayList<>();
- params.add(new BasicNameValuePair("page", String.valueOf(numSteps)));
- params.add(new BasicNameValuePair("size", String.valueOf(pageSize)));
- String urlParam = URLEncodedUtils.format(params, StandardCharsets.UTF_8);
- url += urlParam;
- ResponseEntity<DirezioniResultDTO> resp;
- if (filters != null) {
- resp = restTemplateClient.callPost(
- "csgcore",
- url,
- headers,
- filters,
- new ParameterizedTypeReference<DirezioniResultDTO>() {}
- );
- } else {
- resp = restTemplateClient.callGet(
- "csgcore",
- url,
- headers,
- new ParameterizedTypeReference<DirezioniResultDTO>() {}
- );
- }
-
- return resp;
- }
-
- // TODO testa che non perda step
- private StepData calculatePageSizeAndStep(int count) {
-
- Integer maxPageSize = tasksProperties.getExport().getMaxPageSize();
- Integer minPageSize = tasksProperties.getExport().getMinPageSize();
- Integer initialNumSteps = tasksProperties.getExport().getInitialNumSteps();
-
- // determina page size di base, in funzione degli step settati e del numero elementi
- int calculatedPageSize = count / initialNumSteps + 1;
-
- if (calculatedPageSize > tasksProperties.getExport().getMaxPageSize()) {
- // se pagesize eccessivo, settalo al massimo e aumenta il numero di step
- int calculatedSteps = (int)Math.ceil((double)count / maxPageSize);
- return new StepData(calculatedSteps, maxPageSize );
-
- } else if (calculatedPageSize < minPageSize) {
- // se pagesize troppo piccolo, settalo al limite minPageSize
- int calculatedSteps = (int)Math.ceil((double)count / minPageSize);
- return new StepData(calculatedSteps, minPageSize );
-
- } else {
- // se pagesize rientra nel range [minPageSize, maxPageSize], calcola il numero effettivo di step necessari
- int calculatedSteps = (int)Math.ceil((double)count / calculatedPageSize);
- return new StepData(calculatedSteps, calculatedPageSize );
- }
-
- }
-
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- public class StepData {
- private Integer steps;
- private Integer pageSize;
- }
-
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- @Builder
- public static class TaskData {
- private String principalName;
- private String taskName;
- private int progress;
- private TaskStatusEnum taskStatusEnum;
- private List<DirezioniDTO> direzioni;
- }
-
- }