如何将自建Kafka中已删除的topic重新创建?

摘要:1. Kafka Topic 查看与消费组检查(allin1-01) # 查看 Kafka consumer groups .kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:90
1. Kafka Topic 查看与消费组检查(allin1-01) # 查看 Kafka consumer groups ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group pb-trading-engine ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups|grep engine ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups|grep event # 列出 Kafka topics /opt/kafka-docker/kafka-cli/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list /opt/kafka-docker/kafka-cli/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list|grep even 2. 创建并分发 recreate-kafka-topic.sh 脚本(4 台机器) # allin1-01: 编写脚本 vim re-create-kafka-topic.sh chmod +x recreate-kafka-topic.sh # allin1-201/301: 同样创建脚本chmod +x recreate-kafka-topic.sh # (201, 301) vim recreate-kafka-topic.sh # (201) 3. 执行 Kafka Topic 重建(4 台机器) # 各机器上执行 bash recreate-kafka-topic.sh # allin1-01, 201, 301 bash -x recreate-kafka-topic.sh # allin1-101 (debug 模式) # 删除 topic(3 台机器均执行) /opt/kafka-docker/kafka-cli/bin/kafka-topics.sh --delete --bootstrap-server 127.0.0.1:9092 --topic de... 4. 服务状态验证 # monit 检查服务进程(3 台机器) monit summary # allin1-101, 201, 301 # Java 进程确认(allin1-301) ps -fe|grep java ps -fe|grep java|grep mark # 过滤 pb-trading-market 进程 #!/usr/bin/env bash set -euo pipefail BOOTSTRAP="127.0.0.1:9092" TOPIC="default.trading.event" echo "==== Start Kafka Topic Reset: $TOPIC ====" # 判断 topic 是否存在 function topic_exists() { /opt/kafka-docker/kafka-cli/bin/kafka-topics.sh --bootstrap-server "$BOOTSTRAP" --list | grep -w "$TOPIC" >/dev/null 2>&1 } # 等待 topic 删除完成(Kafka delete 是异步的) function wait_for_delete() { local retries=10 local i=0 while topic_exists; do if [[ $i -ge $retries ]]; then echo "❌ [DELETE FAILED] Topic still exists after retries: $TOPIC" return 1 fi echo "⏳ Waiting for topi
阅读全文