如何将自建Kafka中已删除的topic重新创建?
摘要:1. Kafka Topic 查看与消费组检查(allin1-01) # 查看 Kafka consumer groups .kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:90
1. Kafka Topic 查看与消费组检查(allin1-01)
# 查看 Kafka consumer groups
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group pb-trading-engine
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups|grep engine
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups|grep event
# 列出 Kafka topics
/opt/kafka-docker/kafka-cli/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
/opt/kafka-docker/kafka-cli/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list|grep even
2. 创建并分发 recreate-kafka-topic.sh 脚本(4 台机器)
# allin1-01: 编写脚本
vim re-create-kafka-topic.sh
chmod +x recreate-kafka-topic.sh
# allin1-201/301: 同样创建脚本chmod +x recreate-kafka-topic.sh # (201, 301)
vim recreate-kafka-topic.sh # (201)
3. 执行 Kafka Topic 重建(4 台机器)
# 各机器上执行
bash recreate-kafka-topic.sh # allin1-01, 201, 301
bash -x recreate-kafka-topic.sh # allin1-101 (debug 模式)
# 删除 topic(3 台机器均执行)
/opt/kafka-docker/kafka-cli/bin/kafka-topics.sh --delete --bootstrap-server 127.0.0.1:9092 --topic de...
4. 服务状态验证
# monit 检查服务进程(3 台机器)
monit summary # allin1-101, 201, 301
# Java 进程确认(allin1-301)
ps -fe|grep java
ps -fe|grep java|grep mark # 过滤 pb-trading-market 进程
#!/usr/bin/env bash
set -euo pipefail
BOOTSTRAP="127.0.0.1:9092"
TOPIC="default.trading.event"
echo "==== Start Kafka Topic Reset: $TOPIC ===="
# 判断 topic 是否存在
function topic_exists() {
/opt/kafka-docker/kafka-cli/bin/kafka-topics.sh --bootstrap-server "$BOOTSTRAP" --list | grep -w "$TOPIC" >/dev/null 2>&1
}
# 等待 topic 删除完成(Kafka delete 是异步的)
function wait_for_delete() {
local retries=10
local i=0
while topic_exists; do
if [[ $i -ge $retries ]]; then
echo "❌ [DELETE FAILED] Topic still exists after retries: $TOPIC"
return 1
fi
echo "⏳ Waiting for topi
