如何将自建Kafka中已删除的topic重新创建?

摘要:1. Kafka Topic 查看与消费组检查(allin1-01) # 查看 Kafka consumer groups .kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:90
1. Kafka Topic 查看与消费组检查(allin1-01) # 查看 Kafka consumer groups ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group pb-trading-engine ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups|grep engine ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups|grep event # 列出 Kafka topics /opt/kafka-docker/kafka-cli/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list /opt/kafka-docker/kafka-cli/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list|grep even 2. 创建并分发 recreate-kafka-topic.sh 脚本(4 台机器) # allin1-01: 编写脚本 vim re-create-kafka-topic.sh chmod +x recreate-kafka-topic.sh # allin1-201/301: 同样创建脚本chmod +x recreate-kafka-topic.sh # (201, 301) vim recreate-kafka-topic.sh # (201) 3. 执行 Kafka Topic 重建(4 台机器) # 各机器上执行 bash recreate-kafka-topic.sh # allin1-01, 201, 301 bash -x recreate-kafka-topic.sh # allin1-101 (debug 模式) # 删除 topic(3 台机器均执行) /opt/kafka-docker/kafka-cli/bin/kafka-topics.sh --delete --bootstrap-server 127.0.0.1:9092 --topic de... 4. 服务状态验证 # monit 检查服务进程(3 台机器) monit summary # allin1-101, 201, 301 # Java 进程确认(allin1-301) ps -fe|grep java ps -fe|grep java|grep mark # 过滤 pb-trading-market 进程 #!/usr/bin/env bash set -euo pipefail BOOTSTRAP="127.0.0.1:9092" TOPIC="default.trading.event" echo "==== Start Kafka Topic Reset: $TOPIC ====" # 判断 topic 是否存在 function topic_exists() { /opt/kafka-docker/kafka-cli/bin/kafka-topics.sh --bootstrap-server "$BOOTSTRAP" --list | grep -w "$TOPIC" >/dev/null 2>&1 } # 等待 topic 删除完成(Kafka delete 是异步的) function wait_for_delete() { local retries=10 local i=0 while topic_exists; do if [[ $i -ge $retries ]]; then echo "❌ [DELETE FAILED] Topic still exists after retries: $TOPIC" return 1 fi echo "⏳ Waiting for topic deletion... ($i)" sleep 2 i=$((i + 1)) done echo "✅ [DELETE SUCCESS] Topic deleted: $TOPIC" } # 创建后校验 function verify_create() { if topic_exists; then echo "✅ [CREATE SUCCESS] Topic created: $TOPIC" else echo "❌ [CREATE FAILED] Topic not found after creation: $TOPIC" return 1 fi } # ========================= # Step 1: 删除 # ========================= if topic_exists; then echo "🗑 Deleting topic: $TOPIC" if /opt/kafka-docker/kafka-cli/bin/kafka-topics.sh --delete --bootstrap-server "$BOOTSTRAP" --topic "$TOPIC"; then wait_for_delete else echo "❌ [DELETE FAILED] Command execution failed" exit 1 fi else echo "ℹ️ Topic not exists, skip delete" fi # ========================= # Step 2: 创建 # ========================= echo "🚀 Creating topic: $TOPIC" if /opt/kafka-docker/kafka-cli/bin/kafka-topics.sh \ --create \ --bootstrap-server "$BOOTSTRAP" \ --replication-factor 1 \ --config min.insync.replicas=1 \ --partitions 32 \ --topic "$TOPIC"; then verify_create else echo "❌ [CREATE FAILED] Command execution failed" exit 1 fi echo "==== Done ===="