Skip to content

Instantly share code, notes, and snippets.

@Elanza-48
Last active December 24, 2025 07:33
Show Gist options
  • Select an option

  • Save Elanza-48/d8ad3bae8ab22cfee7dc30afed2d3c77 to your computer and use it in GitHub Desktop.

Select an option

Save Elanza-48/d8ad3bae8ab22cfee7dc30afed2d3c77 to your computer and use it in GitHub Desktop.
Java Fixed sized byte buffer pool
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
/**
* ByteBuffer pool with blocking acquire when pool is exhausted
*/
@Slf4j
public class ByteBufferPool {
private static final long ONE_MB = 1024 * 1024L;
private static final int DEFAULT_ACQUISITION_TIME_IN_MINUTES = 5;
private final BlockingDeque<BufferMetadata> availableBuffers;
private final ConcurrentHashMap<Integer, BufferMetadata> bufferRegistry;
private final AtomicInteger totalBuffers = new AtomicInteger(0);
private final int maxBuffers;
private final int minBuffers;
private final int bufferSize;
private final long idleTimeoutNanos;
private final ScheduledExecutorService cleanupExecutor;
private final String poolInstanceId;
private volatile boolean shutdown = false;
private static class BufferMetadata {
final ByteBuffer buffer;
final int id;
final AtomicBoolean isLeased;
volatile long lastUsedNanoTime;
BufferMetadata(ByteBuffer buffer) {
this.buffer = buffer;
this.isLeased = new AtomicBoolean(false);
this.id = System.identityHashCode(buffer);
this.lastUsedNanoTime = System.nanoTime();
}
}
/**
* Exception thrown when buffer ownership validation fails
*/
public static class InvalidBufferException extends IllegalArgumentException {
public InvalidBufferException(String message) {
super(message);
}
}
public ByteBufferPool(int maxBuffers, int bufferSizeMB) {
this(Math.max(1, maxBuffers / 4), maxBuffers, bufferSizeMB, 300);
}
public ByteBufferPool(int minBuffers, int maxBuffers, int bufferSizeMB, int idleTimeoutSeconds) {
this.maxBuffers = maxBuffers;
this.minBuffers = Math.min(minBuffers, maxBuffers);
this.bufferSize = bufferSizeMB * (int) ONE_MB;
this.idleTimeoutNanos = TimeUnit.SECONDS.toNanos(idleTimeoutSeconds);
this.availableBuffers = new LinkedBlockingDeque<>();
this.bufferRegistry = new ConcurrentHashMap<>();
this.poolInstanceId = String.format("%016x", System.identityHashCode(this));
this.cleanupExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "ByteBufferPool-Cleanup-" + poolInstanceId.substring(0, 8));
t.setDaemon(true);
return t;
});
this.cleanupExecutor.scheduleAtFixedRate(
this::cleanupIdleBuffers,
idleTimeoutSeconds,
Math.min(5 * 60L, idleTimeoutSeconds),
TimeUnit.SECONDS
);
log.debug("[ByteBufferPool] Initialized pool {}: max={}, min={}, bufferSize={}MB, timeout={}s",
poolInstanceId.substring(0, 8), maxBuffers, minBuffers, bufferSizeMB, idleTimeoutSeconds
);
}
/**
* Acquire a buffer from the pool with ownership tracking
*/
public ByteBuffer acquire() throws InterruptedException {
return acquire(DEFAULT_ACQUISITION_TIME_IN_MINUTES, TimeUnit.MINUTES);
}
public ByteBuffer acquire(long timeout, TimeUnit unit) throws InterruptedException {
if (shutdown) {
throw new InterruptedException("Pool shuting down | pool id: " + poolInstanceId);
}
// 1. Try to get an idle buffer from the pool immediately (Non-blocking)
BufferMetadata meta = availableBuffers.pollFirst();
// 2. Try to create a new buffer if under limit
// CAS loop to safely increment 'totalBuffers' without a lock
if (meta == null) {
while (true && !shutdown) {
int currentTotal = totalBuffers.get();
if (currentTotal >= maxBuffers) break;
if (totalBuffers.compareAndSet(currentTotal, currentTotal + 1)) {
try {
meta = createBuffer();
break;
} catch(OutOfMemoryError e) {
totalBuffers.decrementAndGet();
log.error(" Buffer allocation failed - ", e);
break;
}
}
}
}
if (shutdown) {
throw new InterruptedException("ByteBufferPool is shuting down | pool id: " + poolInstanceId);
}
if (meta == null) {
meta = availableBuffers.pollFirst(timeout, unit);
}
if (meta != null) {
meta.isLeased.set(true);
return meta.buffer;
}
// If we get here, it's a timeout
// (Note: The original code returned null on timeout for the overloaded method,
// but threw TimeoutException for the default one. This adheres to the specific signature's contract)
return null;
}
/**
* Release a buffer back to the pool WITH VALIDATION
*/
public void release(ByteBuffer buffer) {
if (buffer == null || shutdown) return;
int id = System.identityHashCode(buffer);
BufferMetadata meta = bufferRegistry.get(id);
// CRITICAL SECURITY CHECK #2: Is this buffer currently in use?
if (meta == null) {
throw new InvalidBufferException(String.format(
"Buffer release rejected: Buffer (identity=%d) not registered to pool - %d", id, poolInstanceId)
);
}
if (!meta.isLeased.compareAndSet(true, false)) {
throw new InvalidBufferException(String.format(
"Buffer release rejected: Buffer (identity=%d) already exists in pool - %d", id, poolInstanceId)
);
}
buffer.clear();
meta.lastUsedNanoTime = System.nanoTime();
availableBuffers.offerFirst(meta);
}
/**
* Create a new buffer and register it
*/
private BufferMetadata createBuffer() throws OutOfMemoryError {
ByteBuffer buf = ByteBuffer.allocateDirect(bufferSize);
BufferMetadata meta = new BufferMetadata(buf);
// Register this buffer's identity
bufferRegistry.put(meta.id, meta);
log.debug("[ByteBufferPool] Created buffer: {}", meta);
return meta;
}
/**
* Cleanup idle buffers
*/
private void cleanupIdleBuffers() {
if (shutdown) return;
int currentTotal = totalBuffers.get();
if (currentTotal <= minBuffers) return;
long now = System.nanoTime();
// removeIf is thread-safe on LinkedBlockingDeque.
// It iterates efficiently and removes nodes without fully locking the queue for the duration.
// This avoids the "poll -> check -> offer" churn.
availableBuffers.removeIf(meta -> {
// Double check min constraints inside loop in case of rapid changes
if (totalBuffers.get() <= minBuffers) return false;
if ((now - meta.lastUsedNanoTime) > idleTimeoutNanos) {
// Buffer is idle
bufferRegistry.remove(meta.id);
totalBuffers.decrementAndGet();
// Help GC
meta.buffer.clear();
log.debug("[ByteBufferPool] Pruned idle buffer. Pool: {}", poolInstanceId);
return true; // Remove from Deque
}
return false;
});
System.gc();
}
/**
* Validate if a buffer belongs to this pool (without throwing exception)
*/
public boolean isValidBufferRegistered(ByteBuffer buffer) {
if (buffer == null) {
return false;
}
int identity = System.identityHashCode(buffer);
return bufferRegistry.containsKey(identity);
}
/**
* Shutdown the pool and release all resources
*/
public void shutdown() {
shutdown = true;
cleanupExecutor.shutdownNow();
/**
* Note: Threads currently blocked in acquire(timeout) will continue to block
* until timeout or until they are interrupted.
*/
BufferMetadata meta;
while ((meta = availableBuffers.pollFirst()) != null) {
meta.buffer.clear();
}
bufferRegistry.clear();
totalBuffers.set(0);
}
/**
* Executes the provided action with an automatically managed buffer.
* The buffer is acquired, cleared, passed to the function, and always released,
* even if the function throws an exception.
*/
public <T> T withBuffer(Function<ByteBuffer, T> action) {
ByteBuffer buffer = null;
try {
buffer = this.acquire();
// Note: acquire() (no args) throws TimeoutException if it fails,
// if we didn't wrap the TimeoutException.
// The original acquire() method threw TimeoutException, so we are good.
if (buffer == null) throw new TimeoutException("Pool timeout");
return action.apply(buffer);
} catch (TimeoutException e) {
throw new RuntimeException("Timeout while acquiring buffer from pool", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while acquiring buffer from pool", e);
} finally {
if (buffer != null) this.release(buffer);
}
}
/**
* Executes the provided async action with an automatically managed buffer.
* The buffer is acquired and released when the returned CompletableFuture completes.
*/
public <T> CompletableFuture<T> withBufferAsync(Function<ByteBuffer, CompletableFuture<T>> asyncAction) {
ByteBuffer buffer;
try {
buffer = this.acquire();
if (buffer == null) throw new TimeoutException("Pool Timeout");
}catch (Exception e) {
if (e instanceof InterruptedException) Thread.currentThread().interrupt();
return CompletableFuture.failedFuture(e);
}
return asyncAction.apply(buffer)
.whenComplete((res, ex) -> this.release(buffer));
}
/**
* Get pool statistics with security info
*/
public PoolStats getStats() {
return new PoolStats(totalBuffers.get(), availableBuffers.size(), maxBuffers, minBuffers,
bufferSize, TimeUnit.NANOSECONDS.toMillis(idleTimeoutNanos), poolInstanceId);
}
public static class PoolStats {
public final int total, available, max, min, size;
public final long idleTimeoutMs;
public final String id;
public PoolStats(int total, int available, int max, int min,
int size, long timeout, String id) {
this.total = total;
this.available = available;
this.max = max;
this.min = min;
this.size = size;
this.idleTimeoutMs = timeout;
this.id = id;
}
public long getTotalMemoryMB() {
return (long) total * size / (1024 * 1024);
}
public long getAvailableMemoryMB() {
return (long) available * size / (1024 * 1024);
}
@Override
public String toString() {
return String.format(
"Pool[id=%s, total=%d, available=%d, min=%d, max=%d, " +
"size=%dMB, totalMem=%dMB, idleTimeoutMs=%dms]",
id, total, available, min, max, size / (1024 * 1024),
getTotalMemoryMB(), idleTimeoutMs
);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment