Skip to content

Instantly share code, notes, and snippets.

@BewareMyPower
Last active November 29, 2023 10:37
Show Gist options
  • Select an option

  • Save BewareMyPower/b49472b8fbadd6612440a04437a62125 to your computer and use it in GitHub Desktop.

Select an option

Save BewareMyPower/b49472b8fbadd6612440a04437a62125 to your computer and use it in GitHub Desktop.
The tool to set up and tear down an Apache Pulsar cluster locally
// cluster.cc: The tool to set up and tear down an Apache Pulsar cluster locally.
//
// ## Preparation
//
// Build the CLI tool.
//
// ```bash
// g++ cluster.cc -std=c++11 -o cluster.out
// ```
//
// Remove the data directory via `rm -rf ./data`.
//
// ## Step 1: Configurations
//
// Modify conf/bookkeeper.conf
//
// ```properties
// metadataServiceUri=zk://localhost:2181/ledgers
// advertisedAddress=127.0.0.1
// prometheusStatsHttpPort=8001
// ```
//
// Add conf/broker-0.conf
//
// ```properties
// metadataStoreUrl=zk:localhost:2181
// configurationMetadataStoreUrl=zk:localhost:2181
// brokerServicePort=6650
// webServicePort=8080
// clusterName=test
// brokerDeleteInactiveTopicsEnabled=false
// managedLedgerDefaultEnsembleSize=1
// managedLedgerDefaultWriteQuorum=1
// managedLedgerDefaultAckQuorum=1
//
// messagingProtocols=kafka
// kafkaListeners=PLAINTEXT://127.0.0.1:9092
// brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
// kafkaTransactionCoordinatorEnabled=true
// brokerDeduplicationEnabled=true
// ```
//
// Add conf/broker-1.conf
//
// ```properties
// metadataStoreUrl=zk:localhost:2181
// configurationMetadataStoreUrl=zk:localhost:2181
// brokerServicePort=6651
// webServicePort=8081
// clusterName=test
// brokerDeleteInactiveTopicsEnabled=false
// managedLedgerDefaultEnsembleSize=1
// managedLedgerDefaultWriteQuorum=1
// managedLedgerDefaultAckQuorum=1
//
// messagingProtocols=kafka
// kafkaListeners=PLAINTEXT://127.0.0.1:9093
// brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
// kafkaTransactionCoordinatorEnabled=true
// brokerDeduplicationEnabled=true
// ```
//
// Modify conf/proxy.conf
//
// ```properties
// servicePort=6652
// WebServicePort=8082
// BrokerServiceURL=pulsar://localhost:6650
// BrokerWebServiceURL=http://localhost:8080
// MetadataStoreUrl=zk:localhost:2181
//
// ProxyExtensions=kafka
// KafkaListeners=PLAINTEXT://localhost:19092
// KafkaBootstrapServers=localhost:9092,localhost:9093
// ```
//
// ## Step 2: Start the cluster
//
// ```bash
// ./cluster.out zk start
// ./cluster.out bk start
// ./cluster.out broker start 0
// ./cluster.out broker start 1
// ./cluster.out proxy start
// ```
//
// ## Step 3: Stop the cluster
//
// ```bash
// ./cluster.out proxy stop
// ./cluster.out broker stop 0
// ./cluster.out broker stop 1
// ./cluster.out bk stop
// ./cluster.out zk stop
// ```
//
#include <set>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <string>
int main(int argc, char *argv[]) {
if (argc < 3) {
fprintf(
stderr,
"Usage: %s <component> [start|stop] <id>\n"
" component must be one of [\"zk\", \"bk\", \"broker\", \"proxy\"]\n",
argv[0]);
return 1;
}
std::set<std::string> kValidComponents{"zk", "bk", "broker", "proxy"};
std::string component{argv[1]};
if (kValidComponents.find(component) == kValidComponents.cend()) {
fprintf(
stderr,
R"(component %s is wrong, it should be one of ["zk", "bk", "broker", "proxy"]\n)",
component.c_str());
return 2;
}
if (component == "broker" && argc < 4) {
fprintf(stderr, "id must be provided when the component is broker\n");
return 3;
}
std::string operation{argv[2]};
if (operation != "start" && operation != "stop") {
fprintf(stderr, "operation %s is wrong, it should be start or stop\n",
operation.c_str());
return 3;
}
std::string command;
if (component == "broker") {
auto id = std::stoi(argv[3]);
char buf[1024];
snprintf(buf, sizeof(buf),
R"(PULSAR_BROKER_CONF=conf/broker-%d.conf \
PULSAR_LOG_DIR=./logs-%d \
PULSAR_PID_DIR=./pid-%d \
./bin/pulsar-daemon %s broker)",
id, id, id, operation.c_str());
command = buf;
} else {
command =
"bin/pulsar-daemon " + operation + " " +
((component == "zk") ? "zookeeper"
: ((component == "bk") ? "bookie" : "proxy"));
}
printf("# %s\n", command.c_str());
system(command.c_str());
if (component == "zk" && operation == "start") {
const char *command = R"(
bin/pulsar initialize-cluster-metadata \
--cluster test \
--metadata-store zk:localhost:2181 \
--configuration-metadata-store zk:localhost:2181 \
--web-service-url http://localhost:8080 \
--broker-service-url pulsar://localhost:6650)";
printf("# %s\n", command);
system(command);
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment