Last active
December 24, 2025 07:33
-
-
Save Elanza-48/d8ad3bae8ab22cfee7dc30afed2d3c77 to your computer and use it in GitHub Desktop.
Java Fixed sized byte buffer pool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.nio.ByteBuffer; | |
| import java.util.concurrent.BlockingDeque; | |
| import java.util.concurrent.CompletableFuture; | |
| import java.util.concurrent.ConcurrentHashMap; | |
| import java.util.concurrent.Executors; | |
| import java.util.concurrent.LinkedBlockingDeque; | |
| import java.util.concurrent.ScheduledExecutorService; | |
| import java.util.concurrent.TimeUnit; | |
| import java.util.concurrent.TimeoutException; | |
| import java.util.concurrent.atomic.AtomicBoolean; | |
| import java.util.concurrent.atomic.AtomicInteger; | |
| import java.util.function.Function; | |
| import lombok.extern.slf4j.Slf4j; | |
| /** | |
| * ByteBuffer pool with blocking acquire when pool is exhausted | |
| */ | |
| @Slf4j | |
| public class ByteBufferPool { | |
| private static final long ONE_MB = 1024 * 1024L; | |
| private static final int DEFAULT_ACQUISITION_TIME_IN_MINUTES = 5; | |
| private final BlockingDeque<BufferMetadata> availableBuffers; | |
| private final ConcurrentHashMap<Integer, BufferMetadata> bufferRegistry; | |
| private final AtomicInteger totalBuffers = new AtomicInteger(0); | |
| private final int maxBuffers; | |
| private final int minBuffers; | |
| private final int bufferSize; | |
| private final long idleTimeoutNanos; | |
| private final ScheduledExecutorService cleanupExecutor; | |
| private final String poolInstanceId; | |
| private volatile boolean shutdown = false; | |
| private static class BufferMetadata { | |
| final ByteBuffer buffer; | |
| final int id; | |
| final AtomicBoolean isLeased; | |
| volatile long lastUsedNanoTime; | |
| BufferMetadata(ByteBuffer buffer) { | |
| this.buffer = buffer; | |
| this.isLeased = new AtomicBoolean(false); | |
| this.id = System.identityHashCode(buffer); | |
| this.lastUsedNanoTime = System.nanoTime(); | |
| } | |
| } | |
| /** | |
| * Exception thrown when buffer ownership validation fails | |
| */ | |
| public static class InvalidBufferException extends IllegalArgumentException { | |
| public InvalidBufferException(String message) { | |
| super(message); | |
| } | |
| } | |
| public ByteBufferPool(int maxBuffers, int bufferSizeMB) { | |
| this(Math.max(1, maxBuffers / 4), maxBuffers, bufferSizeMB, 300); | |
| } | |
| public ByteBufferPool(int minBuffers, int maxBuffers, int bufferSizeMB, int idleTimeoutSeconds) { | |
| this.maxBuffers = maxBuffers; | |
| this.minBuffers = Math.min(minBuffers, maxBuffers); | |
| this.bufferSize = bufferSizeMB * (int) ONE_MB; | |
| this.idleTimeoutNanos = TimeUnit.SECONDS.toNanos(idleTimeoutSeconds); | |
| this.availableBuffers = new LinkedBlockingDeque<>(); | |
| this.bufferRegistry = new ConcurrentHashMap<>(); | |
| this.poolInstanceId = String.format("%016x", System.identityHashCode(this)); | |
| this.cleanupExecutor = Executors.newSingleThreadScheduledExecutor(r -> { | |
| Thread t = new Thread(r, "ByteBufferPool-Cleanup-" + poolInstanceId.substring(0, 8)); | |
| t.setDaemon(true); | |
| return t; | |
| }); | |
| this.cleanupExecutor.scheduleAtFixedRate( | |
| this::cleanupIdleBuffers, | |
| idleTimeoutSeconds, | |
| Math.min(5 * 60L, idleTimeoutSeconds), | |
| TimeUnit.SECONDS | |
| ); | |
| log.debug("[ByteBufferPool] Initialized pool {}: max={}, min={}, bufferSize={}MB, timeout={}s", | |
| poolInstanceId.substring(0, 8), maxBuffers, minBuffers, bufferSizeMB, idleTimeoutSeconds | |
| ); | |
| } | |
| /** | |
| * Acquire a buffer from the pool with ownership tracking | |
| */ | |
| public ByteBuffer acquire() throws InterruptedException { | |
| return acquire(DEFAULT_ACQUISITION_TIME_IN_MINUTES, TimeUnit.MINUTES); | |
| } | |
| public ByteBuffer acquire(long timeout, TimeUnit unit) throws InterruptedException { | |
| if (shutdown) { | |
| throw new InterruptedException("Pool shuting down | pool id: " + poolInstanceId); | |
| } | |
| // 1. Try to get an idle buffer from the pool immediately (Non-blocking) | |
| BufferMetadata meta = availableBuffers.pollFirst(); | |
| // 2. Try to create a new buffer if under limit | |
| // CAS loop to safely increment 'totalBuffers' without a lock | |
| if (meta == null) { | |
| while (true && !shutdown) { | |
| int currentTotal = totalBuffers.get(); | |
| if (currentTotal >= maxBuffers) break; | |
| if (totalBuffers.compareAndSet(currentTotal, currentTotal + 1)) { | |
| try { | |
| meta = createBuffer(); | |
| break; | |
| } catch(OutOfMemoryError e) { | |
| totalBuffers.decrementAndGet(); | |
| log.error(" Buffer allocation failed - ", e); | |
| break; | |
| } | |
| } | |
| } | |
| } | |
| if (shutdown) { | |
| throw new InterruptedException("ByteBufferPool is shuting down | pool id: " + poolInstanceId); | |
| } | |
| if (meta == null) { | |
| meta = availableBuffers.pollFirst(timeout, unit); | |
| } | |
| if (meta != null) { | |
| meta.isLeased.set(true); | |
| return meta.buffer; | |
| } | |
| // If we get here, it's a timeout | |
| // (Note: The original code returned null on timeout for the overloaded method, | |
| // but threw TimeoutException for the default one. This adheres to the specific signature's contract) | |
| return null; | |
| } | |
| /** | |
| * Release a buffer back to the pool WITH VALIDATION | |
| */ | |
| public void release(ByteBuffer buffer) { | |
| if (buffer == null || shutdown) return; | |
| int id = System.identityHashCode(buffer); | |
| BufferMetadata meta = bufferRegistry.get(id); | |
| // CRITICAL SECURITY CHECK #2: Is this buffer currently in use? | |
| if (meta == null) { | |
| throw new InvalidBufferException(String.format( | |
| "Buffer release rejected: Buffer (identity=%d) not registered to pool - %d", id, poolInstanceId) | |
| ); | |
| } | |
| if (!meta.isLeased.compareAndSet(true, false)) { | |
| throw new InvalidBufferException(String.format( | |
| "Buffer release rejected: Buffer (identity=%d) already exists in pool - %d", id, poolInstanceId) | |
| ); | |
| } | |
| buffer.clear(); | |
| meta.lastUsedNanoTime = System.nanoTime(); | |
| availableBuffers.offerFirst(meta); | |
| } | |
| /** | |
| * Create a new buffer and register it | |
| */ | |
| private BufferMetadata createBuffer() throws OutOfMemoryError { | |
| ByteBuffer buf = ByteBuffer.allocateDirect(bufferSize); | |
| BufferMetadata meta = new BufferMetadata(buf); | |
| // Register this buffer's identity | |
| bufferRegistry.put(meta.id, meta); | |
| log.debug("[ByteBufferPool] Created buffer: {}", meta); | |
| return meta; | |
| } | |
| /** | |
| * Cleanup idle buffers | |
| */ | |
| private void cleanupIdleBuffers() { | |
| if (shutdown) return; | |
| int currentTotal = totalBuffers.get(); | |
| if (currentTotal <= minBuffers) return; | |
| long now = System.nanoTime(); | |
| // removeIf is thread-safe on LinkedBlockingDeque. | |
| // It iterates efficiently and removes nodes without fully locking the queue for the duration. | |
| // This avoids the "poll -> check -> offer" churn. | |
| availableBuffers.removeIf(meta -> { | |
| // Double check min constraints inside loop in case of rapid changes | |
| if (totalBuffers.get() <= minBuffers) return false; | |
| if ((now - meta.lastUsedNanoTime) > idleTimeoutNanos) { | |
| // Buffer is idle | |
| bufferRegistry.remove(meta.id); | |
| totalBuffers.decrementAndGet(); | |
| // Help GC | |
| meta.buffer.clear(); | |
| log.debug("[ByteBufferPool] Pruned idle buffer. Pool: {}", poolInstanceId); | |
| return true; // Remove from Deque | |
| } | |
| return false; | |
| }); | |
| System.gc(); | |
| } | |
| /** | |
| * Validate if a buffer belongs to this pool (without throwing exception) | |
| */ | |
| public boolean isValidBufferRegistered(ByteBuffer buffer) { | |
| if (buffer == null) { | |
| return false; | |
| } | |
| int identity = System.identityHashCode(buffer); | |
| return bufferRegistry.containsKey(identity); | |
| } | |
| /** | |
| * Shutdown the pool and release all resources | |
| */ | |
| public void shutdown() { | |
| shutdown = true; | |
| cleanupExecutor.shutdownNow(); | |
| /** | |
| * Note: Threads currently blocked in acquire(timeout) will continue to block | |
| * until timeout or until they are interrupted. | |
| */ | |
| BufferMetadata meta; | |
| while ((meta = availableBuffers.pollFirst()) != null) { | |
| meta.buffer.clear(); | |
| } | |
| bufferRegistry.clear(); | |
| totalBuffers.set(0); | |
| } | |
| /** | |
| * Executes the provided action with an automatically managed buffer. | |
| * The buffer is acquired, cleared, passed to the function, and always released, | |
| * even if the function throws an exception. | |
| */ | |
| public <T> T withBuffer(Function<ByteBuffer, T> action) { | |
| ByteBuffer buffer = null; | |
| try { | |
| buffer = this.acquire(); | |
| // Note: acquire() (no args) throws TimeoutException if it fails, | |
| // if we didn't wrap the TimeoutException. | |
| // The original acquire() method threw TimeoutException, so we are good. | |
| if (buffer == null) throw new TimeoutException("Pool timeout"); | |
| return action.apply(buffer); | |
| } catch (TimeoutException e) { | |
| throw new RuntimeException("Timeout while acquiring buffer from pool", e); | |
| } catch (InterruptedException e) { | |
| Thread.currentThread().interrupt(); | |
| throw new RuntimeException("Interrupted while acquiring buffer from pool", e); | |
| } finally { | |
| if (buffer != null) this.release(buffer); | |
| } | |
| } | |
| /** | |
| * Executes the provided async action with an automatically managed buffer. | |
| * The buffer is acquired and released when the returned CompletableFuture completes. | |
| */ | |
| public <T> CompletableFuture<T> withBufferAsync(Function<ByteBuffer, CompletableFuture<T>> asyncAction) { | |
| ByteBuffer buffer; | |
| try { | |
| buffer = this.acquire(); | |
| if (buffer == null) throw new TimeoutException("Pool Timeout"); | |
| }catch (Exception e) { | |
| if (e instanceof InterruptedException) Thread.currentThread().interrupt(); | |
| return CompletableFuture.failedFuture(e); | |
| } | |
| return asyncAction.apply(buffer) | |
| .whenComplete((res, ex) -> this.release(buffer)); | |
| } | |
| /** | |
| * Get pool statistics with security info | |
| */ | |
| public PoolStats getStats() { | |
| return new PoolStats(totalBuffers.get(), availableBuffers.size(), maxBuffers, minBuffers, | |
| bufferSize, TimeUnit.NANOSECONDS.toMillis(idleTimeoutNanos), poolInstanceId); | |
| } | |
| public static class PoolStats { | |
| public final int total, available, max, min, size; | |
| public final long idleTimeoutMs; | |
| public final String id; | |
| public PoolStats(int total, int available, int max, int min, | |
| int size, long timeout, String id) { | |
| this.total = total; | |
| this.available = available; | |
| this.max = max; | |
| this.min = min; | |
| this.size = size; | |
| this.idleTimeoutMs = timeout; | |
| this.id = id; | |
| } | |
| public long getTotalMemoryMB() { | |
| return (long) total * size / (1024 * 1024); | |
| } | |
| public long getAvailableMemoryMB() { | |
| return (long) available * size / (1024 * 1024); | |
| } | |
| @Override | |
| public String toString() { | |
| return String.format( | |
| "Pool[id=%s, total=%d, available=%d, min=%d, max=%d, " + | |
| "size=%dMB, totalMem=%dMB, idleTimeoutMs=%dms]", | |
| id, total, available, min, max, size / (1024 * 1024), | |
| getTotalMemoryMB(), idleTimeoutMs | |
| ); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment