Ich versuche, eine Methode asynchron innerhalb einer vorhandenen reaktiven Kette in meiner Project Reactor-basierten Anwendung auszuführen. Die Methode doUpdateLayoutInAsync soll eine umfangreiche Hintergrundaufgabe ausführen, aber es scheint, dass mein Ansatz nicht wie erwartet funktioniert. Hier ist meine aktuelle Implementierung:
public Mono<Boolean> publishPackage(String branchedPackageId) { PackagePublishingMetaDTO publishingMetaDTO = new PackagePublishingMetaDTO(); publishingMetaDTO.setPublishEvent(true); return packageRepository .findById(branchedPackageId, packagePermission.getPublishPermission()) .switchIfEmpty(Mono.error(new AppsmithException( AppsmithError.ACL_NO_RESOURCE_FOUND, FieldName.PACKAGE_ID, branchedPackageId))) .flatMap(originalPackage -> { String nextVersion = PackageUtils.getNextVersion(originalPackage.getVersion()); Package packageToBePublished = constructPackageToBePublished(originalPackage); originalPackage.setVersion(nextVersion); originalPackage.setLastPublishedAt(packageToBePublished.getLastPublishedAt()); publishingMetaDTO.setOriginPackageId(branchedPackageId); publishingMetaDTO.setWorkspaceId(originalPackage.getWorkspaceId()); Mono<Void> unsetCurrentLatestMono = packageRepository.unsetLatestPackageByOriginId(originalPackage.getId(), null); Mono<Package> saveOriginalPackage = packageRepository.save(originalPackage); Mono<Package> savePackageToBePublished = packageRepository.save(packageToBePublished); return unsetCurrentLatestMono .then(Mono.zip(saveOriginalPackage, savePackageToBePublished)) .flatMap(tuple2 -> { Package publishedPackage = tuple2.getT2(); publishingMetaDTO.setPublishedPackage(publishedPackage); return modulePackagePublishableService .publishEntities(publishingMetaDTO) .flatMap(publishedModules -> { if (publishedModules.isEmpty()) { return Mono.error(new AppsmithException( AppsmithError.PACKAGE_CANNOT_BE_PUBLISHED, originalPackage.getUnpublishedPackage().getName())); } return moduleInstancePackagePublishableService .publishEntities(publishingMetaDTO) .then(Mono.defer(() -> newActionPackagePublishableService.publishEntities(publishingMetaDTO)) .then(Mono.defer(() -> actionCollectionPackagePublishableService .publishEntities(publishingMetaDTO)))); }) .then(Mono.defer(() -> autoUpgradeService.handleAutoUpgrade(publishingMetaDTO))); }) .as(transactionalOperator::transactional) .then(Mono.defer(() -> doUpdateLayoutInAsync(publishingMetaDTO))); }); } private Mono<Boolean> doUpdateLayoutInAsync(PackagePublishingMetaDTO publishingMetaDTO) { Mono<List<String>> updateLayoutsMono = Flux.fromIterable(publishingMetaDTO.getAutoUpgradedPageIds()) .flatMap(pageId -> updateLayoutService .updatePageLayoutsByPageId(pageId) .onErrorResume(throwable -> { log.warn("Update layout failed for pageId: {} with error: {}", pageId, throwable.getMessage()); return Mono.just(pageId); })) .collectList(); // Running the updateLayoutsMono task asynchronously updateLayoutsMono.subscribeOn(Schedulers.boundedElastic()).subscribe(); return Mono.just(Boolean.TRUE); }
Problem: Ich möchte, dass doUpdateLayoutInAsync im Hintergrund ausgeführt wird, während der Rest der reaktiven Kette abgeschlossen wird. Allerdings scheint die Methode synchron ausgeführt zu werden und die reaktive Kette läuft nicht wie erwartet weiter.
Frage: Wie kann ich sicherstellen, dass doUpdateLayoutInAsync asynchron ausgeführt wird und die Fortsetzung der reaktiven Kette nicht blockiert?
Das obige ist der detaillierte Inhalt vonWie führe ich eine Methode asynchron in einer reaktiven Kette in Spring WebFlux aus?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!