mirror of
https://gitee.com/WuKongDev/WuKongIM.git
synced 2026-05-06 16:50:14 +08:00
fix: #509 节点重启一直panic
This commit is contained in:
4
Makefile
4
Makefile
@@ -17,8 +17,8 @@ deploy-arm:
|
||||
docker push wukongim/wukongim:latest-arm64
|
||||
deploy-v2-dev:
|
||||
docker build -t wukongim . --platform linux/amd64
|
||||
docker tag wukongim registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.2.3-dev
|
||||
docker push registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.2.3-dev
|
||||
docker tag wukongim registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.2.4-dev
|
||||
docker push registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.2.4-dev
|
||||
deploy-v2:
|
||||
docker buildx build -t wukongim . --platform linux/amd64,linux/arm64
|
||||
docker tag wukongim registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.2.1-20250624
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/WuKongIM/WuKongIM/pkg/cluster/slot/key"
|
||||
@@ -28,6 +29,9 @@ type PebbleShardLogStorage struct {
|
||||
|
||||
stopper syncutil.Stopper
|
||||
s *Server
|
||||
|
||||
// 每个 shard 的操作锁,防止 Apply 和 Truncate 并发导致数据不一致
|
||||
shardLocks []sync.Mutex
|
||||
}
|
||||
|
||||
func NewPebbleShardLogStorage(s *Server, path string, shardNum uint32) *PebbleShardLogStorage {
|
||||
@@ -81,6 +85,10 @@ func (p *PebbleShardLogStorage) defaultPebbleOptions() *pebble.Options {
|
||||
func (p *PebbleShardLogStorage) Open() error {
|
||||
|
||||
opts := p.defaultPebbleOptions()
|
||||
|
||||
// 初始化分片锁,每个 shard 有独立的锁,避免全局锁影响性能
|
||||
p.shardLocks = make([]sync.Mutex, p.shardNum)
|
||||
|
||||
for i := 0; i < int(p.shardNum); i++ {
|
||||
db, err := pebble.Open(fmt.Sprintf("%s/shard%03d", p.path, i), opts)
|
||||
if err != nil {
|
||||
@@ -153,6 +161,25 @@ func (p *PebbleShardLogStorage) GetState(shardNo string) (types.RaftState, error
|
||||
return types.RaftState{}, err
|
||||
}
|
||||
|
||||
// 检测并修复不一致状态:appliedIndex > lastLogIndex
|
||||
// 这种情况可能由 Apply 和 Truncate 并发执行导致
|
||||
if applied > lastLogIndex {
|
||||
p.Warn("detected inconsistent state: appliedIndex > lastLogIndex, auto-fixing",
|
||||
zap.String("shardNo", shardNo),
|
||||
zap.Uint64("appliedIndex", applied),
|
||||
zap.Uint64("lastLogIndex", lastLogIndex))
|
||||
|
||||
// 将 appliedIndex 回退到 lastLogIndex
|
||||
applied = lastLogIndex
|
||||
if err := p.SetAppliedIndex(shardNo, lastLogIndex); err != nil {
|
||||
p.Error("failed to fix appliedIndex", zap.Error(err))
|
||||
return types.RaftState{}, err
|
||||
}
|
||||
p.Info("appliedIndex fixed successfully",
|
||||
zap.String("shardNo", shardNo),
|
||||
zap.Uint64("newAppliedIndex", applied))
|
||||
}
|
||||
|
||||
return types.RaftState{
|
||||
LastLogIndex: lastLogIndex,
|
||||
LastTerm: lastLogTerm,
|
||||
@@ -161,6 +188,11 @@ func (p *PebbleShardLogStorage) GetState(shardNo string) (types.RaftState, error
|
||||
}
|
||||
|
||||
func (p *PebbleShardLogStorage) AppendLogs(shardNo string, logs []types.Log, termStartIndexInfo *types.TermStartIndexInfo) error {
|
||||
// 加锁保护,防止与 TruncateLogTo 并发执行
|
||||
shardId := p.shardId(shardNo)
|
||||
p.shardLocks[shardId].Lock()
|
||||
defer p.shardLocks[shardId].Unlock()
|
||||
|
||||
batch := p.shardDB(shardNo).NewBatch()
|
||||
defer batch.Close()
|
||||
|
||||
@@ -201,6 +233,11 @@ func (p *PebbleShardLogStorage) TruncateLogTo(shardNo string, index uint64) erro
|
||||
return errors.New("index must be greater than 0")
|
||||
}
|
||||
|
||||
// 加锁保护,防止与 Apply 并发执行导致 appliedIndex > lastLogIndex
|
||||
shardId := p.shardId(shardNo)
|
||||
p.shardLocks[shardId].Lock()
|
||||
defer p.shardLocks[shardId].Unlock()
|
||||
|
||||
lastLog, err := p.lastLog(shardNo)
|
||||
if err != nil {
|
||||
p.Error("TruncateLogTo: getMaxIndex error", zap.Error(err))
|
||||
@@ -217,7 +254,7 @@ func (p *PebbleShardLogStorage) TruncateLogTo(shardNo string, index uint64) erro
|
||||
return err
|
||||
}
|
||||
if index < appliedIdx {
|
||||
p.Foucs(" applied must be less than index", zap.Uint64("index", index), zap.Uint64("appliedIdx", appliedIdx), zap.Uint64("lastIndex", lastIndex), zap.String("shardNo", shardNo))
|
||||
p.Foucs("applied must be less than index", zap.Uint64("index", index), zap.Uint64("appliedIdx", appliedIdx), zap.Uint64("lastIndex", lastIndex), zap.String("shardNo", shardNo))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -297,6 +334,11 @@ func (p *PebbleShardLogStorage) GetLogs(shardNo string, startLogIndex uint64, en
|
||||
}
|
||||
|
||||
func (p *PebbleShardLogStorage) Apply(shardNo string, logs []types.Log) error {
|
||||
// 加锁保护,防止与 TruncateLogTo 并发执行导致 appliedIndex > lastLogIndex
|
||||
shardId := p.shardId(shardNo)
|
||||
p.shardLocks[shardId].Lock()
|
||||
defer p.shardLocks[shardId].Unlock()
|
||||
|
||||
if p.s.opts.OnApply != nil {
|
||||
slotId := KeyToSlotId(shardNo)
|
||||
err := p.s.opts.OnApply(slotId, logs)
|
||||
|
||||
Reference in New Issue
Block a user