Skip to content

Instantly share code, notes, and snippets.

@BewareMyPower
Created September 12, 2025 15:30
Show Gist options
  • Select an option

  • Save BewareMyPower/5dac596a2f48b54c9a6d2223cdaa12c9 to your computer and use it in GitHub Desktop.

Select an option

Save BewareMyPower/5dac596a2f48b54c9a6d2223cdaa12c9 to your computer and use it in GitHub Desktop.
Revisit the Netty Recycler

Revisit Netty Recycler adoptions in Pulsar

TL; DR, go to the last section for the conclusion.

Background

Apache Pulsar heavily uses Netty Recycler as a memory pool to reduce heap memory usage to reduce GC pressure. Here are the full list:

$ find . -name "*.java" |  grep  "src/main" | xargs grep -n "new Recycler"
./pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java:295:        private static final Recycler<OpRequestSend> RECYCLER = new Recycler<OpRequestSend>() {
./pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/TopicExistsInfo.java:27:    private static final Recycler<TopicExistsInfo> RECYCLER = new Recycler<>() {
./pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:601:                new Recycler<ReadEntriesCtx>() {
./pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:463:        private static final Recycler<ProducerSendCallback> RECYCLER = new Recycler<ProducerSendCallback>() {
./pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchSizes.java:52:    private static final Recycler<EntryBatchSizes> RECYCLER = new Recycler<EntryBatchSizes>() {
./pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryBatchIndexesAcks.java:75:    private static final Recycler<EntryBatchIndexesAcks> RECYCLER = new Recycler<EntryBatchIndexesAcks>() {
./pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java:683:        private static final Recycler<MessagePublishContext> RECYCLER = new Recycler<MessagePublishContext>() {
./pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java:221:        private static final Recycler<ProducerSendCallback> RECYCLER = new Recycler<ProducerSendCallback>() {
./pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ReplicationMetrics.java:56:    private static final Recycler<ReplicationMetrics> RECYCLER = new Recycler<ReplicationMetrics>() {
./pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/RestMessagePublishContext.java:79:    private static final Recycler<RestMessagePublishContext> RECYCLER = new Recycler<RestMessagePublishContext>() {
./pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java:35:    private static final Recycler<ConcurrentBitSetRecyclable> RECYCLER = new Recycler<ConcurrentBitSetRecyclable>() {
./pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java:1195:    private static final Recycler<BitSetRecyclable> RECYCLER = new Recycler<BitSetRecyclable>() {
./pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java:43:    private static final Recycler<ByteBufPair> RECYCLER = new Recycler<ByteBufPair>() {
./pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/ReferenceCountedMessageMetadata.java:33:            new Recycler<ReferenceCountedMessageMetadata>() {
./pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java:43:    private static final Recycler<RawMessageImpl> RECYCLER = new Recycler<RawMessageImpl>() {
./managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:2526:        private static final Recycler<ReadEntryCallbackWrapper> RECYCLER = new Recycler<ReadEntryCallbackWrapper>() {
./managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java:38:    private static final Recycler<RangeCacheEntryWrapper> RECYCLER = new Recycler<RangeCacheEntryWrapper>() {
./managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheRemovalCounters.java:29:    private static final Recycler<RangeCacheRemovalCounters> RECYCLER = new Recycler<RangeCacheRemovalCounters>() {
./managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java:46:    private static final Recycler<EntryImpl> RECYCLER = new Recycler<EntryImpl>() {
./managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionRecyclable.java:31:    private static final Recycler<PositionRecyclable> RECYCLER = new Recycler<PositionRecyclable>() {
./managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java:406:    private static final Recycler<OpAddEntry> RECYCLER = new Recycler<OpAddEntry>() {
./managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:232:    private static final Recycler<OpReadEntry> RECYCLER = new Recycler<>() {
./pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:438:        private static final Recycler<AsyncAddArgs> ASYNC_ADD_ARGS_RECYCLER = new Recycler<>() {
./pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:510:        private static final Recycler<FlushContext> FLUSH_CONTEXT_RECYCLER = new Recycler<FlushContext>() {
./pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java:257:        private static final Recycler<ProducerSendCallback> RECYCLER = new Recycler<ProducerSendCallback>() {
./tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java:58:    private static final Recycler<OffloadIndexBlockImpl> RECYCLER = new Recycler<OffloadIndexBlockImpl>() {
./tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockV2Impl.java:61:    private static final Recycler<OffloadIndexBlockV2Impl> RECYCLER = new Recycler<OffloadIndexBlockV2Impl>() {
./tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java:281:        private static final Recycler<FileSystemWriter> RECYCLER = new Recycler<FileSystemWriter>() {
./pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java:633:        private static final Recycler<OpForTxnIdCallBack> RECYCLER = new Recycler<OpForTxnIdCallBack>() {
./pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java:675:        private static final Recycler<OpForVoidCallBack> RECYCLER = new Recycler<OpForVoidCallBack>() {
./pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java:40:    private static final Recycler<MessagePayloadContextImpl> RECYCLER = new Recycler<MessagePayloadContextImpl>() {
./pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java:786:    private static final Recycler<MessageImpl<?>> RECYCLER = new Recycler<MessageImpl<?>>() {
./pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadImpl.java:33:    private static final Recycler<MessagePayloadImpl> RECYCLER = new Recycler<MessagePayloadImpl>() {
./pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:1162:        private static final Recycler<WriteInEventLoopCallback> RECYCLER = new Recycler<WriteInEventLoopCallback>() {
./pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:1526:                new Recycler<ProducerImpl.ChunkedMessageCtx>() {
./pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:1763:        private static final Recycler<OpSendMsg> RECYCLER = new Recycler<OpSendMsg>() {
./pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:3087:        private static final Recycler<ChunkedMessageCtx> RECYCLER = new Recycler<ChunkedMessageCtx>() {
./pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnackMessageIdWrapper.java:28:    private static final Recycler<UnackMessageIdWrapper> RECYCLER = new Recycler<UnackMessageIdWrapper>() {

However, the Recycler does not work as a "silver bullet" to reduce GC as many developers might expect.

The pattern to use a Recycler like:

public class RecyclableTriple {

    private final Recycler.Handle<RecyclableTriple> handle;
    @Getter
    private Object x, y, z;

    private RecyclableTriple(Handle<RecyclableTriple> handle) {
        this.handle = handle;
    }

    public void recycle() {
        this.x = null;
        this.y = null;
        this.z = null;
        handle.recycle(this);
    }

    public static RecyclableTriple create(Object x, Object y, Object z) {
        final RecyclableTriple s = RECYCLER.get();
        s.x = x;
        s.y = y;
        s.z = z;
        return s;
    }

    private static final Recycler<RecyclableTriple> RECYCLER = new Recycler<RecyclableTriple>() {
        @Override
        protected RecyclableTriple newObject(Handle<RecyclableTriple> handle) {
            return new RecyclableTriple(handle);
        }
    };
}

The code above wraps a String object and it's used typically like:

for (int i = 0; i < 100; i++) {
    var s = RecyclableString.create(x, y, z);
    // process s...
    s.recycle();
}

Assuming the memory allocations for x, y, z are necessary, it might be expected that only 1 allocation happens for RecyclableString. The other 99 allocations come from an internal memory pool.

Look into how Recycler works

The get implementation is:

// Get a FastThreadLocal pool and try to get a handle from the pool
LocalPool<T> localPool = threadLocal.get();
DefaultHandle<T> handle = localPool.claim();
T obj;
if (handle == null) {
    // Constructing the object by calling `newObject`, whose implementation mostly
    // allocates memory from heap like the example before
    handle = localPool.newHandle();
    if (handle != null) {
        obj = newObject(handle);
        handle.set(obj);
    } else {
        obj = newObject((Handle<T>) NOOP_HANDLE);
    }
} else {
    // If retriving the object from the pool successfully, return it
    obj = handle.get();
}
return obj;

When newHandle() does not return null, it will return a DefaultHandle object:

private final ArrayDeque<DefaultHandle<T>> batch;
private volatile MessagePassingQueue<DefaultHandle<T>> pooledHandles;

DefaultHandle<T> claim() {
    MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
    if (handles == null) {
        return null;
    }
    // First, try draining some values from `MessagePassingQueue` to `batch`
    if (batch.isEmpty()) {
        handles.drain(this, chunkSize);
    }
    // Retrieve the last object from this deque
    DefaultHandle<T> handle = batch.pollLast();
    if (null != handle) {
        handle.toClaimed(); // just set the state to indicate it's used
    }
    return handle;
}

As you can see, all objects come from the pooledHandles, a MPSC (multi-producer single-consumer) queue. These objects are allocated from the get method and added to pooledHandles via recycle():

@Override
public void recycle(Object object) {
    if (object != value) {
        throw new IllegalArgumentException("object does not belong to handle");
    }
    localPool.release(this, true);
}

void release(DefaultHandle<T> handle, boolean guarded) {
    if (guarded) {
        handle.toAvailable();
    } else {
        handle.unguardedToAvailable();
    }
    Thread owner = this.owner;
    if (owner != null && Thread.currentThread() == owner && batch.size() < chunkSize) {
        accept(handle);
    } else if (owner != null && isTerminated(owner)) {
        this.owner = null;
        pooledHandles = null;
    } else {
        MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
        if (handles != null) {
            handles.relaxedOffer(handle); // HERE
        }
    }
}

Let's look back to the code where a handle is passed to recyclable object's constructor:

handle = localPool.newHandle();
if (handle != null) {
    obj = newObject(handle);
    handle.set(obj);
} else {
    obj = newObject((Handle<T>) NOOP_HANDLE);
}
private static final EnhancedHandle<?> NOOP_HANDLE = new EnhancedHandle<Object>() {
    @Override
    public void recycle(Object object) {
        // NOOP
    }

If the handle is a NOOP_HANDLE, recycle() will do nothing and the pooledHandles won't have enough objects to allocate. Whether to return a NOOP_HANDLE is determined by:

DefaultHandle<T> newHandle() {
    if (++ratioCounter >= ratioInterval) {
        ratioCounter = 0;
        return new DefaultHandle<T>(this);
    }
    return null;
}

ratioInterval is 8 by default, which means every 8 newHandle calls have 1 handle that will send the allocated object back to pooledHandles.

Let me modify the Recycler implementation to count the newObject calls:

public static final AtomicInteger allocateCount = new AtomicInteger(0);

@SuppressWarnings("unchecked")
public final T get() {
    /* ... */
    if (handle == null) {
        handle = localPool.newHandle();
        allocateCount.getAndIncrement();

Then, run the following test:

final int count = 1000;
final int batchSize = 1000; // the number of objects to process in batch
final List<RecyclableTriple> list = new ArrayList<>();
for (int i = 0; i < count; i++) {
    for (int j = 0; j < batchSize; j++) {
        list.add(RecyclableTriple.create(new Object(), new Object(), new Object()));
    }
    list.forEach(RecyclableTriple::recycle);
    list.clear();
}
System.out.println(Recycler.allocateCount.get());

The output is 7993, while without the recycler, we have to allocate 1 million objects.

Performance improvement for FastThreadLocalThread

First, the localPool is a FastThreadLocal pool, which means when the thread is not a FastThreadLocalThread, it's equivalent with a normal ThreadPool in Java. Otherwise, the performane will be slightly better from Netty's documents (I didn't test).

However, when the allocation comes from the FastThreadLocalThread, the pool will remember the thread via its owner field:

LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
    /* ... */
    Thread currentThread = Thread.currentThread();
    owner = !BATCH_FAST_TL_ONLY || currentThread instanceof FastThreadLocalThread ? currentThread : null;

Then, when an object is recycled, release is called:

public void accept(DefaultHandle<T> e) {
    batch.addLast(e);
}

void release(DefaultHandle<T> handle, boolean guarded) {
    /* ... */
    Thread owner = this.owner;
    if (owner != null && Thread.currentThread() == owner && batch.size() < chunkSize) {
        accept(handle);

The object will be recycled to the thread local ArrayDeque object (batch) directly. Then, look back to claim:

DefaultHandle<T> claim() {
    /* ... */
    if (batch.isEmpty()) {
        handles.drain(this, chunkSize);
    }
    DefaultHandle<T> handle = batch.pollLast();

It will poll the last element from a thread local ArrayDeque directly without draining some elements from the MPSC queue.

Trade-off

Firstly, using a recyclable object is error-prone. For example, if such an object was shared among two threads, the 2nd recycle() call will throw an exception, where might lead to further issues. Here is an example:

  1. Thread 1 create a recyclable object A
  2. A is passed to thread 2 and 3.
  3. Thread 2 calls recycle() first.
  4. Thread 4 retrieves the same object again.
  5. Thread 4 modifies the object.
  6. Thread 3 might access the object modified by thread 4.

Most recyclable objects are used without assuming they could be modified by other threads, then many hard-to-debug issues could happen.

Secondly, there is a performance issue:

  • When get() is called in a FastThreadLocalThread and recycle() is called in the same thread, it should be fast because the main cost is the thread local queue's pollLast() and addLast() method calls.
  • Otherwise, the cost will involve the MPSC queue's operations.

I ran a benchmark, and implemented a much more simple thread local pool to compare. You can also check this repository for how I ran a benchmark and feel free to point out anything wrong.

Here is a test result running on a Ubuntu GitHub Action runner:

recycler.RecyclerBenchmark.testFastRecycler                       thrpt   25   71143531.014 ±  126566.373  ops/s
recycler.RecyclerBenchmark.testRecord                             thrpt   25  441939346.484 ± 3873927.846  ops/s
recycler.RecyclerBenchmark.testRecycler                           thrpt   25   35687320.396 ±  250523.720  ops/s
  • testRecord just allocates memory from heap
  • testFastRecycler allocates an object from a thread local ArrayDeque and push the object into the ArrayDeque
  • testRecycler uses Netty Recycler to allocate an object and recycle it immediately

Using the Netty Recycler could reduce the GC pressure, which is hard to measure. But the performance overhead is determistic. Though the performance gap might not make a real difference because the gap might only be microsecond level.

However, wrapping an object with Netty Recycler might not make a real difference as well. When the GC latency is high in a certain case, how can you confirm wrapping a class via Recycler could make a difference? It might not only be a pre-mature optimization, it might even make things worse.

Conclusion

Whenever possible, don't use Netty Recycler. If you have searched the references of Recycler in Netty itself, you'll find it's only used by an ObjectPool implementation, which is responsible to allocate ByteBuf objects. It should be noted that allocating direct memory is not as fast as allocating heap memory.

If you really want a memory pool, implement a customized pool via ThreadLocal objects. But please remember that:

  1. FastThreadLocal is not always faster than ThreadLocal. What's more, relying on unique features like the onRemoval method might even have a chance to cause a memory leak, see apache/pulsar#24719.
  2. Show the profiling result to prove pooling makes sense.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment