Last active
July 18, 2025 15:15
-
-
Save guildenstern70/c6498191161e6bc57509bd006e0fa9ac to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @Slf4j | |
| @ApplicationScoped | |
| public class QuasiSync | |
| { | |
| @Inject | |
| @Channel("out-topic") | |
| Emitter<String> requestEmitter; | |
| private final ConcurrentHashMap<String, CompletableFuture<String>> responseMap; | |
| public QuasiSync() | |
| { | |
| // Map to hold correlation ID and CompletableFuture | |
| // for async response handling | |
| this.responseMap = new ConcurrentHashMap<>(); | |
| } | |
| public Uni<String> requestAsync(@NotNull String message) | |
| { | |
| CompletableFuture<String> responseFuture = new CompletableFuture<>(); | |
| var correlationId = UUID.randomUUID().toString(); | |
| log.info("Sending request with ID = {}", correlationId); | |
| message = correlationId + ":" + message; | |
| responseMap.put(correlationId, responseFuture); | |
| requestEmitter.send(message); | |
| return Uni.createFrom().completionStage(responseFuture); | |
| } | |
| @Incoming("in-topic") | |
| public void handleResponse(String response) | |
| { | |
| log.info("Received message from Kafka: {}", response); | |
| // Extract correlation ID from the response | |
| String correlationId = response.split(":")[0]; | |
| // Assuming the response format is "correlationId:responseData" | |
| // If correlation ID is found in the response message, complete the original | |
| // request with the corresponding response. | |
| // If not found, log a warning and do not complete the future. | |
| if (correlationId != null) | |
| { | |
| CompletableFuture<String> responseFuture = responseMap.remove(correlationId); | |
| if (responseFuture != null) | |
| { | |
| log.info("Completing future for correlation ID: {}", correlationId); | |
| // Complete the future sending the response to the CompletableFuture | |
| responseFuture.complete(response); | |
| } | |
| else | |
| { | |
| log.warn("No future found for correlation ID: {}. Ignoring response: {}", correlationId, response); | |
| } | |
| } | |
| else | |
| { | |
| log.warn("Correlation ID not found in response: {}. Ignoring.", response); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment