fix: Fixed the issue of triggering channel election each time when the machine is in standalone mode

This commit is contained in:
tt
2025-04-16 20:24:51 +08:00
parent 62bca42777
commit 56b0bc3d2f
11 changed files with 213 additions and 41 deletions

View File

@@ -10,10 +10,12 @@ import (
"net"
"os"
"os/exec"
"os/signal"
"path"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"
"github.com/WuKongIM/WuKongIM/internal/options"
@@ -120,32 +122,60 @@ func cmdRun() error {
// 等待集群准备好
s.MustWaitAllSlotsReady(time.Minute)
// 处理 pingback (如果提供了)
if pingback != "" {
confirmationBytes, err := io.ReadAll(os.Stdin)
if err != nil {
wklog.Error("read confirmation error from stdin", zap.Error(err))
if err := handlePingback(); err != nil {
s.Stop() // 如果 pingback 失败,也尝试停止服务器
return err
}
conn, err := net.Dial("tcp", pingback)
if err != nil {
wklog.Error("dialing confirmation address", zap.Error(err))
if err := writePIDFile(); err != nil {
s.Stop() // 如果写 PID 文件失败,也尝试停止服务器
return err
}
defer conn.Close()
_, err = conn.Write(confirmationBytes)
if err != nil {
wklog.Error("write confirmation error", zap.Error(err))
return err
}
err = os.WriteFile(path.Join(".", pidfile), []byte(strconv.Itoa(os.Getpid())), 0o600)
if err != nil {
log.Fatal(err)
}
}
select {}
// 设置信号监听,等待退出信号
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
wklog.Info("WuKongIM server started successfully. Press Ctrl+C to exit.")
// 阻塞直到收到退出信号
<-quit
s.Stop() // <-- 调用 Stop() 进行清理
wklog.Info("WuKongIM server stopped.")
}
return nil
}
// 将 pingback 处理逻辑提取到一个单独的函数中
func handlePingback() error {
confirmationBytes, err := io.ReadAll(os.Stdin)
if err != nil {
wklog.Error("read confirmation error from stdin", zap.Error(err))
return err
}
conn, err := net.Dial("tcp", pingback)
if err != nil {
wklog.Error("dialing confirmation address", zap.Error(err))
return err
}
defer conn.Close()
_, err = conn.Write(confirmationBytes)
if err != nil {
wklog.Error("write confirmation error", zap.Error(err))
return err
}
return nil
}
// 将写 PID 文件逻辑提取到一个单独的函数中
func writePIDFile() error {
err := os.WriteFile(path.Join(".", pidfile), []byte(strconv.Itoa(os.Getpid())), 0o600)
if err != nil {
wklog.Error("write pid file error", zap.Error(err))
return err
}
return nil
}

View File

@@ -64,12 +64,16 @@ func newSyncUserConversationResp(conversation wkdb.Conversation) *syncUserConver
realChannelId = from
}
}
var version int64
if conversation.UpdatedAt != nil {
version = conversation.UpdatedAt.UnixNano()
}
return &syncUserConversationResp{
ChannelId: realChannelId,
ChannelType: conversation.ChannelType,
Unread: int(conversation.UnreadCount),
ReadedToMsgSeq: uint32(conversation.ReadToMsgSeq),
Version: conversation.UpdatedAt.UnixNano(),
Version: version,
}
}

View File

@@ -2,6 +2,7 @@ package manager
import (
"encoding/json"
"fmt"
"os"
"path"
"slices"
@@ -55,6 +56,7 @@ func (c *ConversationManager) Push(fakeChannelId string, channelType uint8, tagK
if event.Frame.GetNoPersist() {
continue
}
if event.MessageSeq > lastMsgSeq {
lastMsgSeq = event.MessageSeq
}
@@ -79,7 +81,7 @@ func (c *ConversationManager) Push(fakeChannelId string, channelType uint8, tagK
return
}
// 如果是个人频道并且不是第一条消息,则不需要更新最近会话
if firstMsgSeq > 1 {
if firstMsgSeq == 1 {
index := c.getUpdaterIndex(fakeChannelId)
c.updaters[index].push(fakeChannelId, channelType, tagKey, lastMsgSeq)
}
@@ -119,6 +121,8 @@ func (c *ConversationManager) saveToFile() {
c.Error("save conversations to file failed", zap.Error(err))
return
}
fmt.Println("save conversations to file....", allUpdates)
err = os.WriteFile(path.Join(conversationDir, "conversations.json"), data, 0644)
if err != nil {
c.Error("save conversations to file failed", zap.Error(err))
@@ -149,7 +153,7 @@ func (c *ConversationManager) loadFromFile() {
}
for _, update := range allUpdates {
c.updaters[c.getUpdaterIndex(update.channelId)].setChannelUpdate(update.channelId, update.channelType, update.tagKey, update.uids, update.lastMsgSeq)
c.updaters[c.getUpdaterIndex(update.ChannelId)].setChannelUpdate(update.ChannelId, update.ChannelType, update.TagKey, update.Uids, update.LastMsgSeq)
}
}
@@ -187,18 +191,15 @@ func (c *ConversationManager) storeConversations() {
updates := updater.getChannelUpdates()
for _, update := range updates {
conversationType := wkdb.ConversationTypeChat
if options.G.IsCmdChannel(update.channelId) {
if options.G.IsCmdChannel(update.ChannelId) {
conversationType = wkdb.ConversationTypeCMD
}
for _, uid := range update.uids {
for _, uid := range update.Uids {
createdAt := time.Now()
updatedAt := time.Now()
if options.G.IsCmdChannel(update.channelId) {
continue
}
conversations = append(conversations, wkdb.Conversation{
ChannelId: update.channelId,
ChannelType: update.channelType,
ChannelId: update.ChannelId,
ChannelType: update.ChannelType,
Uid: uid,
Type: conversationType,
CreatedAt: &createdAt,
@@ -230,11 +231,11 @@ func (c *ConversationManager) getUpdaterIndex(fakeChannelId string) int {
}
type channelUpdate struct {
channelId string // 频道ID
channelType uint8 // 频道类型
uids []string // 更新的用户
tagKey string // 标签Key
lastMsgSeq uint64 // 最后一条消息的序号
ChannelId string `json:"channel_id"` // 频道ID
ChannelType uint8 `json:"channel_type"` // 频道类型
Uids []string `json:"uids"` // 更新的用户
TagKey string `json:"tag_key"` // 标签Key
LastMsgSeq uint64 `json:"last_msg_seq"` // 最后一条消息的序号
}
type conversationUpdater struct {
@@ -258,7 +259,7 @@ func (c *conversationUpdater) push(fakeChannelId string, channelType uint8, tagK
c.RLock()
update := c.waitUpdates[key]
c.RUnlock()
if update != nil && (update.lastMsgSeq >= lastMsgSeq || tagKey == update.tagKey) {
if update != nil && (update.LastMsgSeq >= lastMsgSeq || tagKey == update.TagKey) {
return
}
@@ -273,14 +274,14 @@ func (c *conversationUpdater) push(fakeChannelId string, channelType uint8, tagK
return
}
c.Lock()
c.waitUpdates[key] = &channelUpdate{channelId: fakeChannelId, channelType: channelType, uids: nodeUsers, tagKey: tagKey, lastMsgSeq: lastMsgSeq}
c.waitUpdates[key] = &channelUpdate{ChannelId: fakeChannelId, ChannelType: channelType, Uids: nodeUsers, TagKey: tagKey, LastMsgSeq: lastMsgSeq}
c.Unlock()
}
func (c *conversationUpdater) setChannelUpdate(fakeChannelId string, channelType uint8, tagKey string, uids []string, lastMsgSeq uint64) {
key := wkutil.ChannelToKey(fakeChannelId, channelType)
c.Lock()
c.waitUpdates[key] = &channelUpdate{channelId: fakeChannelId, channelType: channelType, uids: uids, tagKey: tagKey, lastMsgSeq: lastMsgSeq}
c.waitUpdates[key] = &channelUpdate{ChannelId: fakeChannelId, ChannelType: channelType, Uids: uids, TagKey: tagKey, LastMsgSeq: lastMsgSeq}
c.Unlock()
}
@@ -291,13 +292,13 @@ func (c *conversationUpdater) getUserChannels(uid string, conversationType wkdb.
var channels []wkproto.Channel
for _, channelUpdate := range c.waitUpdates {
if conversationType == wkdb.ConversationTypeCMD && !options.G.IsCmdChannel(channelUpdate.channelId) {
if conversationType == wkdb.ConversationTypeCMD && !options.G.IsCmdChannel(channelUpdate.ChannelId) {
continue
}
if slices.Contains(channelUpdate.uids, uid) {
if slices.Contains(channelUpdate.Uids, uid) {
channels = append(channels, wkproto.Channel{
ChannelID: channelUpdate.channelId,
ChannelType: channelUpdate.channelType,
ChannelID: channelUpdate.ChannelId,
ChannelType: channelUpdate.ChannelType,
})
}
}

View File

@@ -383,6 +383,7 @@ func (s *Server) Stop() error {
s.commonService.Stop()
fmt.Println("stop conversation manager....")
if s.opts.Conversation.On {
s.conversationManager.Stop()
}

View File

@@ -90,6 +90,10 @@ func (s *Server) joinNewRepliceIfNeed(cfg *wkdb.ChannelClusterConfig) (bool, err
return false, errors.New("no allow vote nodes")
}
if len(cfg.Replicas) >= len(allowVoteNodes) { // 如果当前已集群的副本数大于等于允许投票的节点数,则不需要加入新的副本
return false, nil
}
newReplicaIds := make([]uint64, 0, len(allowVoteNodes)-len(cfg.Replicas))
for _, node := range allowVoteNodes {
if !wkutil.ArrayContainsUint64(cfg.Replicas, node.Id) {

View File

@@ -69,5 +69,31 @@ func (s *Server) ProposeUntilAppliedTimeout(ctx context.Context, slotId uint32,
}
func (s *Server) MustWaitAllSlotsReady(timeout time.Duration) {
tk := time.NewTicker(time.Millisecond * 10)
defer tk.Stop()
timeoutCtx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
select {
case <-tk.C:
slots := s.opts.Node.Slots()
if len(slots) > 0 {
notReady := false
for _, st := range slots {
if st.Leader == 0 {
notReady = true
break
}
}
if !notReady {
return
}
}
case <-timeoutCtx.Done():
s.Panic("wait all slots ready timeout")
return
}
}
}

View File

@@ -81,6 +81,47 @@ func (s *Store) AddOrUpdateUserConversations(uid string, conversations []wkdb.Co
// AddConversationsIfNotExist 添加最近会话,如果存在则不添加
func (s *Store) AddConversationsIfNotExist(conversations []wkdb.Conversation) error {
// 将会话按照slotId来分组
slotConversationsMap := make(map[uint32][]wkdb.Conversation)
for _, c := range conversations {
exist, err := s.wdb.ExistConversation(c.Uid, c.ChannelId, c.ChannelType)
if err != nil {
return err
}
if exist {
continue
}
if c.Id == 0 {
c.Id = s.wdb.NextPrimaryKey()
}
slotId := s.opts.Slot.GetSlotId(c.Uid)
slotConversationsMap[slotId] = append(slotConversationsMap[slotId], c)
}
if len(slotConversationsMap) == 0 {
return nil
}
timeoutctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
for slotId, conversations := range slotConversationsMap {
data, err := EncodeCMDAddOrUpdateConversations(conversations)
if err != nil {
return err
}
cmd := NewCMD(CMDAddOrUpdateConversationsBatchIfNotExist, data)
cmdData, err := cmd.Marshal()
if err != nil {
return err
}
_, err = s.opts.Slot.ProposeUntilAppliedTimeout(timeoutctx, slotId, cmdData)
if err != nil {
return err
}
}
return nil
}

View File

@@ -99,6 +99,8 @@ const (
CMDUpdatePluginConfig
// 移除插件用户
CMDRemovePluginUser
// 批量添加最近会话如果存在则不添加
CMDAddOrUpdateConversationsBatchIfNotExist
)
func (c CMDType) Uint16() uint16 {
@@ -189,6 +191,8 @@ func (c CMDType) String() string {
return "CMDUpdatePluginConfig"
case CMDRemovePluginUser:
return "CMDRemovePluginUser"
case CMDAddOrUpdateConversationsBatchIfNotExist:
return "CMDAddOrUpdateConversationsBatchIfNotExist"
default:
return fmt.Sprintf("CMDUnknown[%d]", c)
}
@@ -493,6 +497,12 @@ func (c *CMD) CMDContent() (string, error) {
"pluginNo": pluginNo,
"config": config,
}), nil
case CMDAddOrUpdateConversationsBatchIfNotExist:
conversations, err := c.DecodeCMDAddOrUpdateConversations()
if err != nil {
return "", err
}
return wkutil.ToJSON(conversations), nil
}

View File

@@ -97,7 +97,8 @@ func (s *Store) applyLog(_ uint32, log types.Log) error {
// return s.handleAddOrUpdatePlugin(cmd)
// case CMDUpdatePluginConfig: // 更新插件配置
// return s.handleUpdatePluginConfig(cmd)
case CMDAddOrUpdateConversationsBatchIfNotExist: // 批量添加或更新最近会话,如果存在则不添加
return s.handleAddOrUpdateConversationsBatchIfNotExist(cmd)
}
return nil
}
@@ -446,6 +447,14 @@ func (s *Store) handleRemovePluginUser(cmd *CMD) error {
return s.wdb.RemovePluginUser(pluginNo, uid)
}
func (s *Store) handleAddOrUpdateConversationsBatchIfNotExist(cmd *CMD) error {
conversations, err := cmd.DecodeCMDAddOrUpdateConversations()
if err != nil {
return err
}
return s.wdb.AddOrUpdateConversationsBatchIfNotExist(conversations)
}
// func (s *Store) handleAddOrUpdatePlugin(cmd *CMD) error {
// plugin, err := cmd.DecodeCMDPlugin()
// if err != nil {

View File

@@ -73,6 +73,48 @@ func (wk *wukongDB) AddOrUpdateConversations(conversations []Conversation) error
}
func (wk *wukongDB) AddOrUpdateConversationsBatchIfNotExist(conversations []Conversation) error {
if len(conversations) == 0 {
return nil
}
userBatchMap := make(map[uint32]*Batch) // 用户uid分区对应的db
for _, conversation := range conversations {
shardId := wk.shardId(conversation.Uid)
batch := userBatchMap[shardId]
if batch == nil {
batch = wk.shardBatchDBById(shardId).NewBatch()
userBatchMap[shardId] = batch
}
exist, err := wk.ExistConversation(conversation.Uid, conversation.ChannelId, conversation.ChannelType)
if err != nil {
return err
}
if exist {
continue
}
// 如果会话不存在 则写入
if err := wk.writeConversation(conversation, batch); err != nil {
return err
}
}
if len(userBatchMap) == 0 {
return nil
}
batchs := make([]*Batch, 0, len(userBatchMap))
for _, batch := range userBatchMap {
batchs = append(batchs, batch)
}
return Commits(batchs)
}
func (wk *wukongDB) AddOrUpdateConversationsWithUser(uid string, conversations []Conversation) error {
wk.metrics.AddOrUpdateConversationsAdd(1)
// wk.dblock.conversationLock.lock(uid)

View File

@@ -199,8 +199,12 @@ type ChannelDB interface {
}
type ConversationDB interface {
// AddOrUpdateConversations 添加或更新最近会话
AddOrUpdateConversations(conversations []Conversation) error
// AddOrUpdateConversationsBatchIfNotExist 批量添加或更新最近会话,如果存在则不添加
AddOrUpdateConversationsBatchIfNotExist(conversations []Conversation) error
// AddOrUpdateConversationsWithUser 添加或更新最近会话
AddOrUpdateConversationsWithUser(uid string, conversations []Conversation) error