mirror of
https://github.com/yunionio/cloudpods.git
synced 2026-05-06 21:52:54 +08:00
fix(host-health): check etcd client is reconnecting (#22420)
Co-authored-by: wanyaoqi <d3lx.yq@gmail.com>
This commit is contained in:
@@ -19,6 +19,7 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"yunion.io/x/log"
|
||||
@@ -43,11 +44,38 @@ type SHostHealthManager struct {
|
||||
requestExpend int
|
||||
|
||||
hostId string
|
||||
status string
|
||||
status StatusManager
|
||||
|
||||
masterNodesIps []string
|
||||
}
|
||||
|
||||
type StatusManager struct {
|
||||
status string
|
||||
statusLock sync.Mutex
|
||||
}
|
||||
|
||||
func (m *StatusManager) GetStatus() string {
|
||||
m.statusLock.Lock()
|
||||
defer m.statusLock.Unlock()
|
||||
return m.status
|
||||
}
|
||||
|
||||
func (m *StatusManager) CheckAndSetStatus(status string) bool {
|
||||
m.statusLock.Lock()
|
||||
defer m.statusLock.Unlock()
|
||||
if status == m.status {
|
||||
return false
|
||||
}
|
||||
m.status = status
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *StatusManager) SetStatus(status string) {
|
||||
m.statusLock.Lock()
|
||||
defer m.statusLock.Unlock()
|
||||
m.status = status
|
||||
}
|
||||
|
||||
var (
|
||||
manager *SHostHealthManager
|
||||
)
|
||||
@@ -89,7 +117,7 @@ func InitHostHealthManager(hostId string) (*SHostHealthManager, error) {
|
||||
return nil, err
|
||||
}
|
||||
log.Infof("put key %s success", m.GetKey())
|
||||
m.status = api.HOST_HEALTH_STATUS_RUNNING
|
||||
m.status.SetStatus(api.HOST_HEALTH_STATUS_RUNNING)
|
||||
manager = &m
|
||||
return manager, nil
|
||||
}
|
||||
@@ -122,7 +150,12 @@ func (m *SHostHealthManager) GetKey() string {
|
||||
}
|
||||
|
||||
func (m *SHostHealthManager) OnKeepaliveFailure() {
|
||||
m.status = api.HOST_HEALTH_STATUS_RECONNECTING
|
||||
if !m.status.CheckAndSetStatus(api.HOST_HEALTH_STATUS_RECONNECTING) {
|
||||
log.Warningf("OnKeepaliveFailure status already %s", api.HOST_HEALTH_STATUS_RECONNECTING)
|
||||
return
|
||||
}
|
||||
|
||||
m.status.SetStatus(api.HOST_HEALTH_STATUS_RECONNECTING)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(m.timeout))
|
||||
defer cancel()
|
||||
err := m.cli.RestartSessionWithContext(ctx)
|
||||
@@ -132,7 +165,7 @@ func (m *SHostHealthManager) OnKeepaliveFailure() {
|
||||
); err != nil {
|
||||
log.Errorf("put host key failed %s", err)
|
||||
} else {
|
||||
m.status = api.HOST_HEALTH_STATUS_RUNNING
|
||||
m.status.SetStatus(api.HOST_HEALTH_STATUS_RUNNING)
|
||||
log.Infof("etcd client restart session put %s success", m.GetKey())
|
||||
return
|
||||
}
|
||||
@@ -145,7 +178,7 @@ func (m *SHostHealthManager) OnKeepaliveFailure() {
|
||||
m.Reconnect()
|
||||
} else {
|
||||
log.Errorf("netwrok is unavailable, going to shutdown servers")
|
||||
m.status = api.HOST_HEALTH_STATUS_UNKNOWN
|
||||
m.status.SetStatus(api.HOST_HEALTH_STATUS_UNKNOWN)
|
||||
m.OnUnhealth()
|
||||
}
|
||||
}
|
||||
@@ -193,28 +226,40 @@ func (m *SHostHealthManager) OnUnhealth() {
|
||||
|
||||
func (m *SHostHealthManager) Reconnect() {
|
||||
if m.cli.SessionLiving() {
|
||||
m.status.SetStatus(api.HOST_HEALTH_STATUS_RUNNING)
|
||||
return
|
||||
}
|
||||
|
||||
idx := 0
|
||||
for {
|
||||
if err := m.doReconnect(); err != nil {
|
||||
log.Errorf("failed do_reconnect %s, reconnect after %d seconds", err, idx)
|
||||
time.Sleep(time.Duration(idx) * time.Second)
|
||||
if idx < 5 {
|
||||
idx += 1
|
||||
}
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
m.status.SetStatus(api.HOST_HEALTH_STATUS_RUNNING)
|
||||
}
|
||||
|
||||
func (m *SHostHealthManager) doReconnect() error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer cancel()
|
||||
|
||||
if err := m.cli.RestartSessionWithContext(ctx); err != nil && !m.cli.SessionLiving() {
|
||||
log.Errorf("restart session failed %s", err)
|
||||
go m.Reconnect()
|
||||
return
|
||||
return errors.Wrap(err, "RestartSessionWithContext")
|
||||
}
|
||||
log.Infof("restart ression success")
|
||||
|
||||
if err := m.cli.PutSession(
|
||||
context.Background(), m.GetKey(), api.HOST_HEALTH_STATUS_RUNNING,
|
||||
); err != nil {
|
||||
log.Errorf("put host key failed %s", err)
|
||||
go m.Reconnect()
|
||||
return
|
||||
// put session use client default timeout
|
||||
if err := m.cli.PutSession(context.Background(), m.GetKey(), api.HOST_HEALTH_STATUS_RUNNING); err != nil {
|
||||
return errors.Wrap(err, "PutSession")
|
||||
}
|
||||
log.Infof("put key %s success", m.GetKey())
|
||||
m.status = api.HOST_HEALTH_STATUS_RUNNING
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *SHostHealthManager) shutdownServers() {
|
||||
@@ -241,5 +286,5 @@ func GetHealthStatus() string {
|
||||
if manager == nil {
|
||||
return ""
|
||||
}
|
||||
return manager.status
|
||||
return manager.status.GetStatus()
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
execlient "yunion.io/x/executor/client"
|
||||
"yunion.io/x/log"
|
||||
"yunion.io/x/pkg/errors"
|
||||
"yunion.io/x/pkg/util/signalutils"
|
||||
|
||||
app_common "yunion.io/x/onecloud/pkg/cloudcommon/app"
|
||||
"yunion.io/x/onecloud/pkg/cloudcommon/service"
|
||||
@@ -77,6 +78,8 @@ func (host *SHostHealthService) RunService() {
|
||||
}
|
||||
}
|
||||
})
|
||||
signalutils.SetDumpStackSignal()
|
||||
signalutils.StartTrap()
|
||||
select {}
|
||||
}
|
||||
|
||||
|
||||
@@ -261,6 +261,9 @@ show_update_cmd() {
|
||||
'host-deployer')
|
||||
spec='hostdeployer'
|
||||
;;
|
||||
'host-health')
|
||||
spec='hostagent/HostHealth'
|
||||
;;
|
||||
'region')
|
||||
spec='regionServer'
|
||||
;;
|
||||
|
||||
Reference in New Issue
Block a user