Snippet content copied to clipboard.
Are you sure to delete this snippet? No, don't delete
  1. // SUBSCRIBER
  2. @Async("singleThreadAsyncExecutor")
  3. @Transactional(propagation = Propagation.REQUIRED, rollbackFor = Throwable.class)
  4. public void startExportDirezioniNew(Principal principal, @Nullable DirezioniSearchDTO filters, HttpHeaders headers) {
  5. log.info("startExportDirezioni thread [START]");
  6. File csvTemp = TempFilenameGenerator.getDocumentaleTempFile("exportCsvDirezioni","txt").toFile();
  7. csvTemp.deleteOnExit();
  8. Disposable disposable = exportRestEmitter.dataExportObserver().subscribe(taskData -> {
  9. taskProgressDelegate.saveTaskWithProgress(taskData.getPrincipalName(), taskData.getTaskName(), taskData.getProgress(), taskData.getTaskStatusEnum());
  10. // write csv file
  11. // controlla vada in append
  12. log.info("Appending data {} to csv file: {}", taskData.getDirezioni(), csvTemp);
  13. csvHelperd.exportDirezioni(taskData.getDirezioni(), csvTemp, DirezioniDTO.class); // ERROR HERE NO CLASS DEF FOUND,
  14. // AND SUBSEQUENTS CALL TO THiS METHOD WON'T EVER REACH HERE, BUT PUBLISHER KEEP PUBLISHING
  15. }, error -> {
  16. log.error("Generic error in exportRestEmitter {}", error);
  17. taskProgressDelegate.saveTaskWithProgress(principal.getName(), "exportCsv", 0, TaskStatusEnum.KO);
  18. }, () -> {
  19. log.info("Transferring csv file {} to remote {} ", csvTemp, fileManagerHelper.getRemoteHostUri());
  20. if (fileManagerHelper.putLocalFileToRemote(csvTemp.toPath(), Paths.get(principal.getName(),"exportCsv_" + Utils.getTimestamp()))) {
  21. sendNotificationToQueue(principal);
  22. } else {
  23. log.error("File {} could not be transfered to remote ", csvTemp.toPath());
  24. }
  25. });
  26. exportRestEmitter.readDirezioneDataFromRest(principal, filters, headers);
  27. log.info("startExportDirezioni thread [COMPLETE]");
  28. }
  29. // PUBLISHER
  30. package it.dmi.csgtasks.service;
  31. import it.dmi.csgproject.csgcommon.dto.DirezioniDTO;
  32. import it.dmi.csgproject.csgcommon.dto.DirezioniResultDTO;
  33. import it.dmi.csgproject.csgcommon.dto.DirezioniSearchDTO;
  34. import it.dmi.csgtasks.config.TasksProperties;
  35. import it.dmi.csgtasks.domain.model.TaskStatusEnum;
  36. import it.dmi.csgtasks.web.rest.client.ResilientRestTemplateClient;
  37. import lombok.AllArgsConstructor;
  38. import lombok.Builder;
  39. import lombok.Data;
  40. import lombok.NoArgsConstructor;
  41. import lombok.extern.slf4j.Slf4j;
  42. import org.apache.http.NameValuePair;
  43. import org.apache.http.client.utils.URLEncodedUtils;
  44. import org.apache.http.message.BasicNameValuePair;
  45. import org.reactivestreams.Publisher;
  46. import org.springframework.context.annotation.Scope;
  47. import org.springframework.core.ParameterizedTypeReference;
  48. import org.springframework.http.HttpHeaders;
  49. import org.springframework.http.ResponseEntity;
  50. import org.springframework.stereotype.Component;
  51. import org.springframework.web.context.annotation.RequestScope;
  52. import reactor.core.publisher.*;
  53. import javax.annotation.Nullable;
  54. import java.nio.charset.StandardCharsets;
  55. import java.security.Principal;
  56. import java.util.ArrayList;
  57. import java.util.List;
  58. import java.util.function.Function;
  59. /**
  60. * usage:
  61. *
  62. * ExportRestEmitter emitter = new ExportRestEmitter();
  63. * emitter.dataExportObserver().subscribe(next -> { writeCsvFile(x); saveTask(Progress) }, error -> {
  64. * saveTask(Error);
  65. * });
  66. * emitter.readDirezioneDataFromRest();
  67. *
  68. * class useful for attaching business logic while reading paginated data from export rest endpoint
  69. *
  70. * @see it.dmi.csgtasks.service.FluxServer for test case
  71. */
  72. @Component
  73. @Slf4j
  74. public class ExportRestEmitter {
  75. private Flux<TaskData> sink;
  76. private Sinks.Many<TaskData> restSink;
  77. private final ResilientRestTemplateClient restTemplateClient;
  78. private final TasksProperties tasksProperties;
  79. public ExportRestEmitter( ResilientRestTemplateClient restTemplateClient, TasksProperties tasksProperties) {
  80. this.restTemplateClient = restTemplateClient;
  81. this.tasksProperties = tasksProperties;
  82. initObserver();
  83. // https://stackoverflow.com/questions/64715890/restarting-inifinite-flux-on-error-with-pubsubreactivefactory
  84. // per ovviare al problema che il bean e' singleton, dovrebbe essere request scoped, altrimenti quando viene completato il task viene completato
  85. // il flux e i successivi task non vengono avviati, genero un flux che viene restartato automaticamente on error / on complete
  86. // l'alternativa e' markare il bean come request scoped, ma sono dentro un thread
  87. // Function<Throwable, Publisher<TaskData>> recoverFromThrow = throwable -> this.restSink.asFlux();
  88. // var recoveringFromThrowFlux =
  89. // this.restSink.asFlux().onErrorResume(recoverFromThrow);
  90. // this.sink = Flux.<Flux<TaskData>>generate((sink) -> sink.next(recoveringFromThrowFlux))
  91. // .flatMap(flux -> flux);
  92. }
  93. public void readDirezioneDataFromRest(Principal principal, @Nullable DirezioniSearchDTO filters, HttpHeaders headers) {
  94. log.info("readDirezioneDataFromRest Start");
  95. try {
  96. ResponseEntity<Long> response = restTemplateClient.callPost(
  97. "csgcore",
  98. "/api/direzioni/count",
  99. headers,
  100. filters,
  101. new ParameterizedTypeReference<Long>() {}
  102. );
  103. if (!response.getStatusCode().is2xxSuccessful()) {
  104. publish(TaskData.builder()
  105. .principalName(principal.getName())
  106. .taskName("exportCsv")
  107. .progress(0)
  108. .taskStatusEnum(TaskStatusEnum.KO)
  109. .build());
  110. throw new RuntimeException("rest call api to fetch direzioni count failed, abort csv");
  111. }
  112. Long totalElements = response.getBody();
  113. StepData stepData = calculatePageSizeAndStep(totalElements.intValue());
  114. int pageSize = stepData.getPageSize();
  115. int numSteps = stepData.getSteps();
  116. log.info("total records direzioni to export: {}\n pageSize: {} total steps/query to fetch paginated results: {}", totalElements, pageSize, numSteps);
  117. for (int step = 1; step <= numSteps; step++) {
  118. ResponseEntity<DirezioniResultDTO> resp = preparePaginatedDirezioniCall(filters, headers, step, pageSize);
  119. // normalizza progress in base a numSteps e step (idealmente va da 0 a 100)
  120. int progress = Double.valueOf(Math.floor(step * 100 / numSteps)).intValue();
  121. if (!resp.getStatusCode().is2xxSuccessful()) {
  122. publish(TaskData.builder()
  123. .principalName(principal.getName())
  124. .taskName("exportCsv")
  125. .progress(0)
  126. .taskStatusEnum(TaskStatusEnum.KO)
  127. .build());
  128. throw new RuntimeException("rest call api to fetch direzioni data failed, abort csv");
  129. }
  130. log.info("sending progress value: {} to user {} for current step: {}", progress, principal.getName(), step);
  131. publish(TaskData.builder()
  132. .principalName(principal.getName())
  133. .taskName("exportCsv")
  134. .progress(progress)
  135. .taskStatusEnum(TaskStatusEnum.IN_PROGRESS)
  136. .direzioni(resp.getBody().getDirezioni())
  137. .build());
  138. }
  139. } catch (Exception e) {
  140. log.error("readDirezioneDataFromRest Error", e);
  141. }
  142. complete();
  143. log.info("readDirezioneDataFromRest End");
  144. }
  145. public Flux<TaskData> dataExportObserver() {
  146. // return this.sink;
  147. return this.restSink.asFlux().doOnComplete(() -> {
  148. this.restSink = Sinks.many().multicast().onBackpressureBuffer();
  149. log.info("\n\n INIT OBSERVER ON COMPLETE \n\n");
  150. }).doOnError(throwable -> {
  151. log.info("\n\n INIT OBSERVER ON ERROR \n\n");
  152. Sinks.many().multicast().onBackpressureBuffer();
  153. });
  154. }
  155. public void initObserver() {
  156. log.info("\n\n INIT OBSERVER \n\n");
  157. this.restSink = Sinks.many().multicast().onBackpressureBuffer();
  158. }
  159. public void publish(TaskData value) {
  160. restSink.tryEmitNext(value);
  161. }
  162. public void complete() {
  163. restSink.tryEmitComplete();
  164. }
  165. private ResponseEntity<DirezioniResultDTO> preparePaginatedDirezioniCall(@Nullable DirezioniSearchDTO filters, HttpHeaders headers, int numSteps, int pageSize) {
  166. String url = "/api/direzioni/";
  167. url += (filters != null) ? "search?" : "get?";
  168. List<NameValuePair> params = new ArrayList<>();
  169. params.add(new BasicNameValuePair("page", String.valueOf(numSteps)));
  170. params.add(new BasicNameValuePair("size", String.valueOf(pageSize)));
  171. String urlParam = URLEncodedUtils.format(params, StandardCharsets.UTF_8);
  172. url += urlParam;
  173. ResponseEntity<DirezioniResultDTO> resp;
  174. if (filters != null) {
  175. resp = restTemplateClient.callPost(
  176. "csgcore",
  177. url,
  178. headers,
  179. filters,
  180. new ParameterizedTypeReference<DirezioniResultDTO>() {}
  181. );
  182. } else {
  183. resp = restTemplateClient.callGet(
  184. "csgcore",
  185. url,
  186. headers,
  187. new ParameterizedTypeReference<DirezioniResultDTO>() {}
  188. );
  189. }
  190. return resp;
  191. }
  192. // TODO testa che non perda step
  193. private StepData calculatePageSizeAndStep(int count) {
  194. Integer maxPageSize = tasksProperties.getExport().getMaxPageSize();
  195. Integer minPageSize = tasksProperties.getExport().getMinPageSize();
  196. Integer initialNumSteps = tasksProperties.getExport().getInitialNumSteps();
  197. // determina page size di base, in funzione degli step settati e del numero elementi
  198. int calculatedPageSize = count / initialNumSteps + 1;
  199. if (calculatedPageSize > tasksProperties.getExport().getMaxPageSize()) {
  200. // se pagesize eccessivo, settalo al massimo e aumenta il numero di step
  201. int calculatedSteps = (int)Math.ceil((double)count / maxPageSize);
  202. return new StepData(calculatedSteps, maxPageSize );
  203. } else if (calculatedPageSize < minPageSize) {
  204. // se pagesize troppo piccolo, settalo al limite minPageSize
  205. int calculatedSteps = (int)Math.ceil((double)count / minPageSize);
  206. return new StepData(calculatedSteps, minPageSize );
  207. } else {
  208. // se pagesize rientra nel range [minPageSize, maxPageSize], calcola il numero effettivo di step necessari
  209. int calculatedSteps = (int)Math.ceil((double)count / calculatedPageSize);
  210. return new StepData(calculatedSteps, calculatedPageSize );
  211. }
  212. }
  213. @Data
  214. @NoArgsConstructor
  215. @AllArgsConstructor
  216. public class StepData {
  217. private Integer steps;
  218. private Integer pageSize;
  219. }
  220. @Data
  221. @NoArgsConstructor
  222. @AllArgsConstructor
  223. @Builder
  224. public static class TaskData {
  225. private String principalName;
  226. private String taskName;
  227. private int progress;
  228. private TaskStatusEnum taskStatusEnum;
  229. private List<DirezioniDTO> direzioni;
  230. }
  231. }

Edit this Snippet