Last active
November 29, 2023 10:37
-
-
Save BewareMyPower/b49472b8fbadd6612440a04437a62125 to your computer and use it in GitHub Desktop.
The tool to set up and tear down an Apache Pulsar cluster locally
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // cluster.cc: The tool to set up and tear down an Apache Pulsar cluster locally. | |
| // | |
| // ## Preparation | |
| // | |
| // Build the CLI tool. | |
| // | |
| // ```bash | |
| // g++ cluster.cc -std=c++11 -o cluster.out | |
| // ``` | |
| // | |
| // Remove the data directory via `rm -rf ./data`. | |
| // | |
| // ## Step 1: Configurations | |
| // | |
| // Modify conf/bookkeeper.conf | |
| // | |
| // ```properties | |
| // metadataServiceUri=zk://localhost:2181/ledgers | |
| // advertisedAddress=127.0.0.1 | |
| // prometheusStatsHttpPort=8001 | |
| // ``` | |
| // | |
| // Add conf/broker-0.conf | |
| // | |
| // ```properties | |
| // metadataStoreUrl=zk:localhost:2181 | |
| // configurationMetadataStoreUrl=zk:localhost:2181 | |
| // brokerServicePort=6650 | |
| // webServicePort=8080 | |
| // clusterName=test | |
| // brokerDeleteInactiveTopicsEnabled=false | |
| // managedLedgerDefaultEnsembleSize=1 | |
| // managedLedgerDefaultWriteQuorum=1 | |
| // managedLedgerDefaultAckQuorum=1 | |
| // | |
| // messagingProtocols=kafka | |
| // kafkaListeners=PLAINTEXT://127.0.0.1:9092 | |
| // brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor | |
| // kafkaTransactionCoordinatorEnabled=true | |
| // brokerDeduplicationEnabled=true | |
| // ``` | |
| // | |
| // Add conf/broker-1.conf | |
| // | |
| // ```properties | |
| // metadataStoreUrl=zk:localhost:2181 | |
| // configurationMetadataStoreUrl=zk:localhost:2181 | |
| // brokerServicePort=6651 | |
| // webServicePort=8081 | |
| // clusterName=test | |
| // brokerDeleteInactiveTopicsEnabled=false | |
| // managedLedgerDefaultEnsembleSize=1 | |
| // managedLedgerDefaultWriteQuorum=1 | |
| // managedLedgerDefaultAckQuorum=1 | |
| // | |
| // messagingProtocols=kafka | |
| // kafkaListeners=PLAINTEXT://127.0.0.1:9093 | |
| // brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor | |
| // kafkaTransactionCoordinatorEnabled=true | |
| // brokerDeduplicationEnabled=true | |
| // ``` | |
| // | |
| // Modify conf/proxy.conf | |
| // | |
| // ```properties | |
| // servicePort=6652 | |
| // WebServicePort=8082 | |
| // BrokerServiceURL=pulsar://localhost:6650 | |
| // BrokerWebServiceURL=http://localhost:8080 | |
| // MetadataStoreUrl=zk:localhost:2181 | |
| // | |
| // ProxyExtensions=kafka | |
| // KafkaListeners=PLAINTEXT://localhost:19092 | |
| // KafkaBootstrapServers=localhost:9092,localhost:9093 | |
| // ``` | |
| // | |
| // ## Step 2: Start the cluster | |
| // | |
| // ```bash | |
| // ./cluster.out zk start | |
| // ./cluster.out bk start | |
| // ./cluster.out broker start 0 | |
| // ./cluster.out broker start 1 | |
| // ./cluster.out proxy start | |
| // ``` | |
| // | |
| // ## Step 3: Stop the cluster | |
| // | |
| // ```bash | |
| // ./cluster.out proxy stop | |
| // ./cluster.out broker stop 0 | |
| // ./cluster.out broker stop 1 | |
| // ./cluster.out bk stop | |
| // ./cluster.out zk stop | |
| // ``` | |
| // | |
| #include <set> | |
| #include <stdio.h> | |
| #include <stdlib.h> | |
| #include <string.h> | |
| #include <string> | |
| int main(int argc, char *argv[]) { | |
| if (argc < 3) { | |
| fprintf( | |
| stderr, | |
| "Usage: %s <component> [start|stop] <id>\n" | |
| " component must be one of [\"zk\", \"bk\", \"broker\", \"proxy\"]\n", | |
| argv[0]); | |
| return 1; | |
| } | |
| std::set<std::string> kValidComponents{"zk", "bk", "broker", "proxy"}; | |
| std::string component{argv[1]}; | |
| if (kValidComponents.find(component) == kValidComponents.cend()) { | |
| fprintf( | |
| stderr, | |
| R"(component %s is wrong, it should be one of ["zk", "bk", "broker", "proxy"]\n)", | |
| component.c_str()); | |
| return 2; | |
| } | |
| if (component == "broker" && argc < 4) { | |
| fprintf(stderr, "id must be provided when the component is broker\n"); | |
| return 3; | |
| } | |
| std::string operation{argv[2]}; | |
| if (operation != "start" && operation != "stop") { | |
| fprintf(stderr, "operation %s is wrong, it should be start or stop\n", | |
| operation.c_str()); | |
| return 3; | |
| } | |
| std::string command; | |
| if (component == "broker") { | |
| auto id = std::stoi(argv[3]); | |
| char buf[1024]; | |
| snprintf(buf, sizeof(buf), | |
| R"(PULSAR_BROKER_CONF=conf/broker-%d.conf \ | |
| PULSAR_LOG_DIR=./logs-%d \ | |
| PULSAR_PID_DIR=./pid-%d \ | |
| ./bin/pulsar-daemon %s broker)", | |
| id, id, id, operation.c_str()); | |
| command = buf; | |
| } else { | |
| command = | |
| "bin/pulsar-daemon " + operation + " " + | |
| ((component == "zk") ? "zookeeper" | |
| : ((component == "bk") ? "bookie" : "proxy")); | |
| } | |
| printf("# %s\n", command.c_str()); | |
| system(command.c_str()); | |
| if (component == "zk" && operation == "start") { | |
| const char *command = R"( | |
| bin/pulsar initialize-cluster-metadata \ | |
| --cluster test \ | |
| --metadata-store zk:localhost:2181 \ | |
| --configuration-metadata-store zk:localhost:2181 \ | |
| --web-service-url http://localhost:8080 \ | |
| --broker-service-url pulsar://localhost:6650)"; | |
| printf("# %s\n", command); | |
| system(command); | |
| } | |
| return 0; | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment