From 56b0bc3d2fc34bfe047573e3ee9e72a23bdca285 Mon Sep 17 00:00:00 2001 From: tt Date: Wed, 16 Apr 2025 20:24:51 +0800 Subject: [PATCH] fix: Fixed the issue of triggering channel election each time when the machine is in standalone mode --- cmd/root.go | 66 +++++++++++++++++------- internal/api/conversation_model.go | 6 ++- internal/manager/manager_conversation.go | 43 +++++++-------- internal/server/server.go | 1 + pkg/cluster/cluster/election_channel.go | 4 ++ pkg/cluster/slot/iserver.go | 26 ++++++++++ pkg/cluster/store/conversation.go | 41 +++++++++++++++ pkg/cluster/store/model.go | 10 ++++ pkg/cluster/store/store_apply.go | 11 +++- pkg/wkdb/conversation.go | 42 +++++++++++++++ pkg/wkdb/db.go | 4 ++ 11 files changed, 213 insertions(+), 41 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index a76a6404..ab5b3ea6 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -10,10 +10,12 @@ import ( "net" "os" "os/exec" + "os/signal" "path" "path/filepath" "strconv" "strings" + "syscall" "time" "github.com/WuKongIM/WuKongIM/internal/options" @@ -120,32 +122,60 @@ func cmdRun() error { // 等待集群准备好 s.MustWaitAllSlotsReady(time.Minute) + // 处理 pingback (如果提供了) if pingback != "" { - confirmationBytes, err := io.ReadAll(os.Stdin) - if err != nil { - wklog.Error("read confirmation error from stdin", zap.Error(err)) + if err := handlePingback(); err != nil { + s.Stop() // 如果 pingback 失败,也尝试停止服务器 return err } - conn, err := net.Dial("tcp", pingback) - if err != nil { - wklog.Error("dialing confirmation address", zap.Error(err)) + if err := writePIDFile(); err != nil { + s.Stop() // 如果写 PID 文件失败,也尝试停止服务器 return err } - defer conn.Close() - - _, err = conn.Write(confirmationBytes) - if err != nil { - wklog.Error("write confirmation error", zap.Error(err)) - return err - } - err = os.WriteFile(path.Join(".", pidfile), []byte(strconv.Itoa(os.Getpid())), 0o600) - if err != nil { - log.Fatal(err) - } } - select {} + // 设置信号监听,等待退出信号 + quit := make(chan os.Signal, 1) + signal.Notify(quit, os.Interrupt, syscall.SIGTERM) + wklog.Info("WuKongIM server started successfully. Press Ctrl+C to exit.") + // 阻塞直到收到退出信号 + <-quit + + s.Stop() // <-- 调用 Stop() 进行清理 + wklog.Info("WuKongIM server stopped.") + } + return nil +} + +// 将 pingback 处理逻辑提取到一个单独的函数中 +func handlePingback() error { + confirmationBytes, err := io.ReadAll(os.Stdin) + if err != nil { + wklog.Error("read confirmation error from stdin", zap.Error(err)) + return err + } + conn, err := net.Dial("tcp", pingback) + if err != nil { + wklog.Error("dialing confirmation address", zap.Error(err)) + return err + } + defer conn.Close() + + _, err = conn.Write(confirmationBytes) + if err != nil { + wklog.Error("write confirmation error", zap.Error(err)) + return err + } + return nil +} + +// 将写 PID 文件逻辑提取到一个单独的函数中 +func writePIDFile() error { + err := os.WriteFile(path.Join(".", pidfile), []byte(strconv.Itoa(os.Getpid())), 0o600) + if err != nil { + wklog.Error("write pid file error", zap.Error(err)) + return err } return nil } diff --git a/internal/api/conversation_model.go b/internal/api/conversation_model.go index 7f1134f3..0d064012 100644 --- a/internal/api/conversation_model.go +++ b/internal/api/conversation_model.go @@ -64,12 +64,16 @@ func newSyncUserConversationResp(conversation wkdb.Conversation) *syncUserConver realChannelId = from } } + var version int64 + if conversation.UpdatedAt != nil { + version = conversation.UpdatedAt.UnixNano() + } return &syncUserConversationResp{ ChannelId: realChannelId, ChannelType: conversation.ChannelType, Unread: int(conversation.UnreadCount), ReadedToMsgSeq: uint32(conversation.ReadToMsgSeq), - Version: conversation.UpdatedAt.UnixNano(), + Version: version, } } diff --git a/internal/manager/manager_conversation.go b/internal/manager/manager_conversation.go index c73a23f5..2b27e3a7 100644 --- a/internal/manager/manager_conversation.go +++ b/internal/manager/manager_conversation.go @@ -2,6 +2,7 @@ package manager import ( "encoding/json" + "fmt" "os" "path" "slices" @@ -55,6 +56,7 @@ func (c *ConversationManager) Push(fakeChannelId string, channelType uint8, tagK if event.Frame.GetNoPersist() { continue } + if event.MessageSeq > lastMsgSeq { lastMsgSeq = event.MessageSeq } @@ -79,7 +81,7 @@ func (c *ConversationManager) Push(fakeChannelId string, channelType uint8, tagK return } // 如果是个人频道并且不是第一条消息,则不需要更新最近会话 - if firstMsgSeq > 1 { + if firstMsgSeq == 1 { index := c.getUpdaterIndex(fakeChannelId) c.updaters[index].push(fakeChannelId, channelType, tagKey, lastMsgSeq) } @@ -119,6 +121,8 @@ func (c *ConversationManager) saveToFile() { c.Error("save conversations to file failed", zap.Error(err)) return } + fmt.Println("save conversations to file....", allUpdates) + err = os.WriteFile(path.Join(conversationDir, "conversations.json"), data, 0644) if err != nil { c.Error("save conversations to file failed", zap.Error(err)) @@ -149,7 +153,7 @@ func (c *ConversationManager) loadFromFile() { } for _, update := range allUpdates { - c.updaters[c.getUpdaterIndex(update.channelId)].setChannelUpdate(update.channelId, update.channelType, update.tagKey, update.uids, update.lastMsgSeq) + c.updaters[c.getUpdaterIndex(update.ChannelId)].setChannelUpdate(update.ChannelId, update.ChannelType, update.TagKey, update.Uids, update.LastMsgSeq) } } @@ -187,18 +191,15 @@ func (c *ConversationManager) storeConversations() { updates := updater.getChannelUpdates() for _, update := range updates { conversationType := wkdb.ConversationTypeChat - if options.G.IsCmdChannel(update.channelId) { + if options.G.IsCmdChannel(update.ChannelId) { conversationType = wkdb.ConversationTypeCMD } - for _, uid := range update.uids { + for _, uid := range update.Uids { createdAt := time.Now() updatedAt := time.Now() - if options.G.IsCmdChannel(update.channelId) { - continue - } conversations = append(conversations, wkdb.Conversation{ - ChannelId: update.channelId, - ChannelType: update.channelType, + ChannelId: update.ChannelId, + ChannelType: update.ChannelType, Uid: uid, Type: conversationType, CreatedAt: &createdAt, @@ -230,11 +231,11 @@ func (c *ConversationManager) getUpdaterIndex(fakeChannelId string) int { } type channelUpdate struct { - channelId string // 频道ID - channelType uint8 // 频道类型 - uids []string // 更新的用户 - tagKey string // 标签Key - lastMsgSeq uint64 // 最后一条消息的序号 + ChannelId string `json:"channel_id"` // 频道ID + ChannelType uint8 `json:"channel_type"` // 频道类型 + Uids []string `json:"uids"` // 更新的用户 + TagKey string `json:"tag_key"` // 标签Key + LastMsgSeq uint64 `json:"last_msg_seq"` // 最后一条消息的序号 } type conversationUpdater struct { @@ -258,7 +259,7 @@ func (c *conversationUpdater) push(fakeChannelId string, channelType uint8, tagK c.RLock() update := c.waitUpdates[key] c.RUnlock() - if update != nil && (update.lastMsgSeq >= lastMsgSeq || tagKey == update.tagKey) { + if update != nil && (update.LastMsgSeq >= lastMsgSeq || tagKey == update.TagKey) { return } @@ -273,14 +274,14 @@ func (c *conversationUpdater) push(fakeChannelId string, channelType uint8, tagK return } c.Lock() - c.waitUpdates[key] = &channelUpdate{channelId: fakeChannelId, channelType: channelType, uids: nodeUsers, tagKey: tagKey, lastMsgSeq: lastMsgSeq} + c.waitUpdates[key] = &channelUpdate{ChannelId: fakeChannelId, ChannelType: channelType, Uids: nodeUsers, TagKey: tagKey, LastMsgSeq: lastMsgSeq} c.Unlock() } func (c *conversationUpdater) setChannelUpdate(fakeChannelId string, channelType uint8, tagKey string, uids []string, lastMsgSeq uint64) { key := wkutil.ChannelToKey(fakeChannelId, channelType) c.Lock() - c.waitUpdates[key] = &channelUpdate{channelId: fakeChannelId, channelType: channelType, uids: uids, tagKey: tagKey, lastMsgSeq: lastMsgSeq} + c.waitUpdates[key] = &channelUpdate{ChannelId: fakeChannelId, ChannelType: channelType, Uids: uids, TagKey: tagKey, LastMsgSeq: lastMsgSeq} c.Unlock() } @@ -291,13 +292,13 @@ func (c *conversationUpdater) getUserChannels(uid string, conversationType wkdb. var channels []wkproto.Channel for _, channelUpdate := range c.waitUpdates { - if conversationType == wkdb.ConversationTypeCMD && !options.G.IsCmdChannel(channelUpdate.channelId) { + if conversationType == wkdb.ConversationTypeCMD && !options.G.IsCmdChannel(channelUpdate.ChannelId) { continue } - if slices.Contains(channelUpdate.uids, uid) { + if slices.Contains(channelUpdate.Uids, uid) { channels = append(channels, wkproto.Channel{ - ChannelID: channelUpdate.channelId, - ChannelType: channelUpdate.channelType, + ChannelID: channelUpdate.ChannelId, + ChannelType: channelUpdate.ChannelType, }) } } diff --git a/internal/server/server.go b/internal/server/server.go index 4132134d..dadd1b3c 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -383,6 +383,7 @@ func (s *Server) Stop() error { s.commonService.Stop() + fmt.Println("stop conversation manager....") if s.opts.Conversation.On { s.conversationManager.Stop() } diff --git a/pkg/cluster/cluster/election_channel.go b/pkg/cluster/cluster/election_channel.go index 0b4478c7..84f9608f 100644 --- a/pkg/cluster/cluster/election_channel.go +++ b/pkg/cluster/cluster/election_channel.go @@ -90,6 +90,10 @@ func (s *Server) joinNewRepliceIfNeed(cfg *wkdb.ChannelClusterConfig) (bool, err return false, errors.New("no allow vote nodes") } + if len(cfg.Replicas) >= len(allowVoteNodes) { // 如果当前已集群的副本数大于等于允许投票的节点数,则不需要加入新的副本 + return false, nil + } + newReplicaIds := make([]uint64, 0, len(allowVoteNodes)-len(cfg.Replicas)) for _, node := range allowVoteNodes { if !wkutil.ArrayContainsUint64(cfg.Replicas, node.Id) { diff --git a/pkg/cluster/slot/iserver.go b/pkg/cluster/slot/iserver.go index 330e24f8..43746f53 100644 --- a/pkg/cluster/slot/iserver.go +++ b/pkg/cluster/slot/iserver.go @@ -69,5 +69,31 @@ func (s *Server) ProposeUntilAppliedTimeout(ctx context.Context, slotId uint32, } func (s *Server) MustWaitAllSlotsReady(timeout time.Duration) { + tk := time.NewTicker(time.Millisecond * 10) + defer tk.Stop() + timeoutCtx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + for { + select { + case <-tk.C: + slots := s.opts.Node.Slots() + if len(slots) > 0 { + notReady := false + for _, st := range slots { + if st.Leader == 0 { + notReady = true + break + } + } + if !notReady { + return + } + + } + case <-timeoutCtx.Done(): + s.Panic("wait all slots ready timeout") + return + } + } } diff --git a/pkg/cluster/store/conversation.go b/pkg/cluster/store/conversation.go index aad3ba73..a0215d98 100644 --- a/pkg/cluster/store/conversation.go +++ b/pkg/cluster/store/conversation.go @@ -81,6 +81,47 @@ func (s *Store) AddOrUpdateUserConversations(uid string, conversations []wkdb.Co // AddConversationsIfNotExist 添加最近会话,如果存在则不添加 func (s *Store) AddConversationsIfNotExist(conversations []wkdb.Conversation) error { + // 将会话按照slotId来分组 + slotConversationsMap := make(map[uint32][]wkdb.Conversation) + + for _, c := range conversations { + exist, err := s.wdb.ExistConversation(c.Uid, c.ChannelId, c.ChannelType) + if err != nil { + return err + } + if exist { + continue + } + if c.Id == 0 { + c.Id = s.wdb.NextPrimaryKey() + } + slotId := s.opts.Slot.GetSlotId(c.Uid) + slotConversationsMap[slotId] = append(slotConversationsMap[slotId], c) + } + + if len(slotConversationsMap) == 0 { + return nil + } + + timeoutctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + for slotId, conversations := range slotConversationsMap { + data, err := EncodeCMDAddOrUpdateConversations(conversations) + if err != nil { + return err + } + cmd := NewCMD(CMDAddOrUpdateConversationsBatchIfNotExist, data) + cmdData, err := cmd.Marshal() + if err != nil { + return err + } + _, err = s.opts.Slot.ProposeUntilAppliedTimeout(timeoutctx, slotId, cmdData) + if err != nil { + return err + } + } + return nil } diff --git a/pkg/cluster/store/model.go b/pkg/cluster/store/model.go index 596ae416..39a2f2c1 100644 --- a/pkg/cluster/store/model.go +++ b/pkg/cluster/store/model.go @@ -99,6 +99,8 @@ const ( CMDUpdatePluginConfig // 移除插件用户 CMDRemovePluginUser + // 批量添加最近会话如果存在则不添加 + CMDAddOrUpdateConversationsBatchIfNotExist ) func (c CMDType) Uint16() uint16 { @@ -189,6 +191,8 @@ func (c CMDType) String() string { return "CMDUpdatePluginConfig" case CMDRemovePluginUser: return "CMDRemovePluginUser" + case CMDAddOrUpdateConversationsBatchIfNotExist: + return "CMDAddOrUpdateConversationsBatchIfNotExist" default: return fmt.Sprintf("CMDUnknown[%d]", c) } @@ -493,6 +497,12 @@ func (c *CMD) CMDContent() (string, error) { "pluginNo": pluginNo, "config": config, }), nil + case CMDAddOrUpdateConversationsBatchIfNotExist: + conversations, err := c.DecodeCMDAddOrUpdateConversations() + if err != nil { + return "", err + } + return wkutil.ToJSON(conversations), nil } diff --git a/pkg/cluster/store/store_apply.go b/pkg/cluster/store/store_apply.go index a71c1540..f5945ac5 100644 --- a/pkg/cluster/store/store_apply.go +++ b/pkg/cluster/store/store_apply.go @@ -97,7 +97,8 @@ func (s *Store) applyLog(_ uint32, log types.Log) error { // return s.handleAddOrUpdatePlugin(cmd) // case CMDUpdatePluginConfig: // 更新插件配置 // return s.handleUpdatePluginConfig(cmd) - + case CMDAddOrUpdateConversationsBatchIfNotExist: // 批量添加或更新最近会话,如果存在则不添加 + return s.handleAddOrUpdateConversationsBatchIfNotExist(cmd) } return nil } @@ -446,6 +447,14 @@ func (s *Store) handleRemovePluginUser(cmd *CMD) error { return s.wdb.RemovePluginUser(pluginNo, uid) } +func (s *Store) handleAddOrUpdateConversationsBatchIfNotExist(cmd *CMD) error { + conversations, err := cmd.DecodeCMDAddOrUpdateConversations() + if err != nil { + return err + } + return s.wdb.AddOrUpdateConversationsBatchIfNotExist(conversations) +} + // func (s *Store) handleAddOrUpdatePlugin(cmd *CMD) error { // plugin, err := cmd.DecodeCMDPlugin() // if err != nil { diff --git a/pkg/wkdb/conversation.go b/pkg/wkdb/conversation.go index 797466db..af7ad674 100644 --- a/pkg/wkdb/conversation.go +++ b/pkg/wkdb/conversation.go @@ -73,6 +73,48 @@ func (wk *wukongDB) AddOrUpdateConversations(conversations []Conversation) error } +func (wk *wukongDB) AddOrUpdateConversationsBatchIfNotExist(conversations []Conversation) error { + if len(conversations) == 0 { + return nil + } + + userBatchMap := make(map[uint32]*Batch) // 用户uid分区对应的db + + for _, conversation := range conversations { + + shardId := wk.shardId(conversation.Uid) + batch := userBatchMap[shardId] + if batch == nil { + batch = wk.shardBatchDBById(shardId).NewBatch() + userBatchMap[shardId] = batch + } + + exist, err := wk.ExistConversation(conversation.Uid, conversation.ChannelId, conversation.ChannelType) + if err != nil { + return err + } + if exist { + continue + } + + // 如果会话不存在 则写入 + if err := wk.writeConversation(conversation, batch); err != nil { + return err + } + } + + if len(userBatchMap) == 0 { + return nil + } + + batchs := make([]*Batch, 0, len(userBatchMap)) + for _, batch := range userBatchMap { + batchs = append(batchs, batch) + } + + return Commits(batchs) +} + func (wk *wukongDB) AddOrUpdateConversationsWithUser(uid string, conversations []Conversation) error { wk.metrics.AddOrUpdateConversationsAdd(1) // wk.dblock.conversationLock.lock(uid) diff --git a/pkg/wkdb/db.go b/pkg/wkdb/db.go index 452030a5..878b4ce9 100644 --- a/pkg/wkdb/db.go +++ b/pkg/wkdb/db.go @@ -199,8 +199,12 @@ type ChannelDB interface { } type ConversationDB interface { + // AddOrUpdateConversations 添加或更新最近会话 AddOrUpdateConversations(conversations []Conversation) error + // AddOrUpdateConversationsBatchIfNotExist 批量添加或更新最近会话,如果存在则不添加 + AddOrUpdateConversationsBatchIfNotExist(conversations []Conversation) error + // AddOrUpdateConversationsWithUser 添加或更新最近会话 AddOrUpdateConversationsWithUser(uid string, conversations []Conversation) error