From 9fc693d5068b20ec6be3e3bc35925b38a277feac Mon Sep 17 00:00:00 2001 From: tangtaoit Date: Tue, 3 Feb 2026 12:44:03 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20#509=20=E8=8A=82=E7=82=B9=E9=87=8D?= =?UTF-8?q?=E5=90=AF=E4=B8=80=E7=9B=B4panic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 4 +-- pkg/cluster/slot/storage_pebble.go | 44 +++++++++++++++++++++++++++++- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 03485b5b..fcbf999f 100644 --- a/Makefile +++ b/Makefile @@ -17,8 +17,8 @@ deploy-arm: docker push wukongim/wukongim:latest-arm64 deploy-v2-dev: docker build -t wukongim . --platform linux/amd64 - docker tag wukongim registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.2.3-dev - docker push registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.2.3-dev + docker tag wukongim registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.2.4-dev + docker push registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.2.4-dev deploy-v2: docker buildx build -t wukongim . --platform linux/amd64,linux/arm64 docker tag wukongim registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.2.1-20250624 diff --git a/pkg/cluster/slot/storage_pebble.go b/pkg/cluster/slot/storage_pebble.go index 80e76b36..93bb5daf 100644 --- a/pkg/cluster/slot/storage_pebble.go +++ b/pkg/cluster/slot/storage_pebble.go @@ -6,6 +6,7 @@ import ( "fmt" "hash/fnv" "math" + "sync" "time" "github.com/WuKongIM/WuKongIM/pkg/cluster/slot/key" @@ -28,6 +29,9 @@ type PebbleShardLogStorage struct { stopper syncutil.Stopper s *Server + + // 每个 shard 的操作锁,防止 Apply 和 Truncate 并发导致数据不一致 + shardLocks []sync.Mutex } func NewPebbleShardLogStorage(s *Server, path string, shardNum uint32) *PebbleShardLogStorage { @@ -81,6 +85,10 @@ func (p *PebbleShardLogStorage) defaultPebbleOptions() *pebble.Options { func (p *PebbleShardLogStorage) Open() error { opts := p.defaultPebbleOptions() + + // 初始化分片锁,每个 shard 有独立的锁,避免全局锁影响性能 + p.shardLocks = make([]sync.Mutex, p.shardNum) + for i := 0; i < int(p.shardNum); i++ { db, err := pebble.Open(fmt.Sprintf("%s/shard%03d", p.path, i), opts) if err != nil { @@ -153,6 +161,25 @@ func (p *PebbleShardLogStorage) GetState(shardNo string) (types.RaftState, error return types.RaftState{}, err } + // 检测并修复不一致状态:appliedIndex > lastLogIndex + // 这种情况可能由 Apply 和 Truncate 并发执行导致 + if applied > lastLogIndex { + p.Warn("detected inconsistent state: appliedIndex > lastLogIndex, auto-fixing", + zap.String("shardNo", shardNo), + zap.Uint64("appliedIndex", applied), + zap.Uint64("lastLogIndex", lastLogIndex)) + + // 将 appliedIndex 回退到 lastLogIndex + applied = lastLogIndex + if err := p.SetAppliedIndex(shardNo, lastLogIndex); err != nil { + p.Error("failed to fix appliedIndex", zap.Error(err)) + return types.RaftState{}, err + } + p.Info("appliedIndex fixed successfully", + zap.String("shardNo", shardNo), + zap.Uint64("newAppliedIndex", applied)) + } + return types.RaftState{ LastLogIndex: lastLogIndex, LastTerm: lastLogTerm, @@ -161,6 +188,11 @@ func (p *PebbleShardLogStorage) GetState(shardNo string) (types.RaftState, error } func (p *PebbleShardLogStorage) AppendLogs(shardNo string, logs []types.Log, termStartIndexInfo *types.TermStartIndexInfo) error { + // 加锁保护,防止与 TruncateLogTo 并发执行 + shardId := p.shardId(shardNo) + p.shardLocks[shardId].Lock() + defer p.shardLocks[shardId].Unlock() + batch := p.shardDB(shardNo).NewBatch() defer batch.Close() @@ -201,6 +233,11 @@ func (p *PebbleShardLogStorage) TruncateLogTo(shardNo string, index uint64) erro return errors.New("index must be greater than 0") } + // 加锁保护,防止与 Apply 并发执行导致 appliedIndex > lastLogIndex + shardId := p.shardId(shardNo) + p.shardLocks[shardId].Lock() + defer p.shardLocks[shardId].Unlock() + lastLog, err := p.lastLog(shardNo) if err != nil { p.Error("TruncateLogTo: getMaxIndex error", zap.Error(err)) @@ -217,7 +254,7 @@ func (p *PebbleShardLogStorage) TruncateLogTo(shardNo string, index uint64) erro return err } if index < appliedIdx { - p.Foucs(" applied must be less than index", zap.Uint64("index", index), zap.Uint64("appliedIdx", appliedIdx), zap.Uint64("lastIndex", lastIndex), zap.String("shardNo", shardNo)) + p.Foucs("applied must be less than index", zap.Uint64("index", index), zap.Uint64("appliedIdx", appliedIdx), zap.Uint64("lastIndex", lastIndex), zap.String("shardNo", shardNo)) return nil } @@ -297,6 +334,11 @@ func (p *PebbleShardLogStorage) GetLogs(shardNo string, startLogIndex uint64, en } func (p *PebbleShardLogStorage) Apply(shardNo string, logs []types.Log) error { + // 加锁保护,防止与 TruncateLogTo 并发执行导致 appliedIndex > lastLogIndex + shardId := p.shardId(shardNo) + p.shardLocks[shardId].Lock() + defer p.shardLocks[shardId].Unlock() + if p.s.opts.OnApply != nil { slotId := KeyToSlotId(shardNo) err := p.s.opts.OnApply(slotId, logs)