Skip to content

Instantly share code, notes, and snippets.

@BewareMyPower
Created November 5, 2025 12:34
Show Gist options
  • Select an option

  • Save BewareMyPower/c7f26eb2b5741c94d8cf771c4dc35a97 to your computer and use it in GitHub Desktop.

Select an option

Save BewareMyPower/c7f26eb2b5741c94d8cf771c4dc35a97 to your computer and use it in GitHub Desktop.
(Drafted) Topic loading in Pulsar

Topic load process

Specifically, for a partition of a partitioned persistent topic, the topic load process is as follows:

  1. Check if the topic exists (Metadata store: /managed-ledgers/<persistence-naming-encoding>)
  2. Get topic policies (via TopicPoliciesService#getTopicPoliciesAsync)
  3. Check topic ownership
  4. Check if the pending topic loading count exceeds the maxConcurrentTopicLoadRequest config (default: 5000), if not, the following steps will be executed after the previous topic loading operations are completed.
  5. Check topic ownership again
  6. Fetch topic properties (Metadata store: /admin/partitioned-topics/<tenant>/<namespace>/persistent/<encoded-topic>)
  7. Check if the topic count in the namespace will exceed the maxTopicsPerNamespace namespace policy (default: 1000) (Two metadata store operations, one is same with the 1st step, the other is /admin/policies/<tenant>/<namespace>)
  8. Check if the cluster migration is enabled (Metadata store: /admin/clusters/<cluster>/policies and /admin/local-policies/<tenant>/<namespace>)
  9. Validate topic partition metadata consistency (see PIP-414), which is similar to step 6
  10. Get managed ledger config, it will require the topic policies (same with step 2) and namespace policies (Metadata store: /admin/policies/<tenant>/<namespace>)
  11. Create the managed ledegr via ManagedLedgerFactory#asyncOpen
  12. Create the PersistentTopic instance via TopicFactory#create and perform some initializations

Normally, the metadata store operations above are fast because many operations are done in previous RPC during topic lookup and the results will be cached. The main concern is the topic level policies, whose default implementation is to read from the __change_events topic in the same namespace with the user topic.

Apart from the topic policies, each topic has its unique properties stored in metadata store as well:

  • Partitioned topic metadata: /admin/partitioned-topics/<tenant>/<namespace>/<topic>
  • Managed ledger properties: /managed-ledgers/<persistence-naming-encoding> (it's actually partition level)

Topic policies are good to manage massive topic level metadata without touching the metadata store too much. Each topic will register itself as a listener to TopicPoliciesService so it can be aware of the topic policies changes. However, blocking the topic loading process to wait for the topic policies is not a good idea, especially for the case when the namespace was not owned by the new owner broker before. When there are many namespaces, each namespace will create a reader to replay the whole <tenant>/<ns>/__change_events topic, which could bring much pressure on BookKeeper and then "too many xxx requests" could happen.

The topic policies are used to apply the PersistencePolicies, RetentionPolicies and OffloadPolicies on the managed ledger config. However, these policies also exist at namespace level.

Proposed solution

Add a namespace policy to disable topic policies in the namespace. In addition, add a built-in topic partition metadata whose key is __topic_policies_disabled to disable topic policies for a specific topic. To avoid conflicts with user-defined policies, ignore this property if the value is neither true or false.

This would also help in the scenario that protocol handlers can create namespaces with topic policies disabled by default.

> admin namespaces set-properties -p __topic_policies_disabled=true public/default
> admin namespaces policies public/default
{
  ...,
  "properties" : {
    "__topic_policies_disabled" : "true"
  },
  ...
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment