Skip to content

Instantly share code, notes, and snippets.

@guildenstern70
Last active July 18, 2025 15:15
Show Gist options
  • Select an option

  • Save guildenstern70/c6498191161e6bc57509bd006e0fa9ac to your computer and use it in GitHub Desktop.

Select an option

Save guildenstern70/c6498191161e6bc57509bd006e0fa9ac to your computer and use it in GitHub Desktop.
@Slf4j
@ApplicationScoped
public class QuasiSync
{
@Inject
@Channel("out-topic")
Emitter<String> requestEmitter;
private final ConcurrentHashMap<String, CompletableFuture<String>> responseMap;
public QuasiSync()
{
// Map to hold correlation ID and CompletableFuture
// for async response handling
this.responseMap = new ConcurrentHashMap<>();
}
public Uni<String> requestAsync(@NotNull String message)
{
CompletableFuture<String> responseFuture = new CompletableFuture<>();
var correlationId = UUID.randomUUID().toString();
log.info("Sending request with ID = {}", correlationId);
message = correlationId + ":" + message;
responseMap.put(correlationId, responseFuture);
requestEmitter.send(message);
return Uni.createFrom().completionStage(responseFuture);
}
@Incoming("in-topic")
public void handleResponse(String response)
{
log.info("Received message from Kafka: {}", response);
// Extract correlation ID from the response
String correlationId = response.split(":")[0];
// Assuming the response format is "correlationId:responseData"
// If correlation ID is found in the response message, complete the original
// request with the corresponding response.
// If not found, log a warning and do not complete the future.
if (correlationId != null)
{
CompletableFuture<String> responseFuture = responseMap.remove(correlationId);
if (responseFuture != null)
{
log.info("Completing future for correlation ID: {}", correlationId);
// Complete the future sending the response to the CompletableFuture
responseFuture.complete(response);
}
else
{
log.warn("No future found for correlation ID: {}. Ignoring response: {}", correlationId, response);
}
}
else
{
log.warn("Correlation ID not found in response: {}. Ignoring.", response);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment