Files
cloudpods/pkg/hostman/guestman/pod_helper.go

352 lines
10 KiB
Go

// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package guestman
import (
"context"
"fmt"
"path"
"strconv"
"strings"
"time"
"github.com/shirou/gopsutil/v3/disk"
"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/errors"
computeapi "yunion.io/x/onecloud/pkg/apis/compute"
hostapi "yunion.io/x/onecloud/pkg/apis/host"
"yunion.io/x/onecloud/pkg/hostman/container/volume_mount"
"yunion.io/x/onecloud/pkg/hostman/options"
"yunion.io/x/onecloud/pkg/mcclient"
"yunion.io/x/onecloud/pkg/util/cgrouputils"
"yunion.io/x/onecloud/pkg/util/fileutils2"
"yunion.io/x/onecloud/pkg/util/pod/image"
"yunion.io/x/onecloud/pkg/util/pod/nerdctl"
)
func IsContainerNotFoundError(err error) bool {
if errors.Cause(err) == errors.ErrNotFound {
return true
}
for _, errMsg := range []string{
"NotFound",
"not found",
} {
if strings.Contains(err.Error(), errMsg) {
return true
}
}
return false
}
func GetContainerdConnectionInfo() (string, string) {
addr := options.HostOptions.ContainerRuntimeEndpoint
addr = strings.TrimPrefix(addr, "unix://")
namespace := "k8s.io"
return addr, namespace
}
func NewContainerdImageTool() image.ImageTool {
addr, namespace := GetContainerdConnectionInfo()
return image.NewImageTool(addr, namespace)
}
func NewContainerdNerdctl() nerdctl.Nerdctl {
addr, namespace := GetContainerdConnectionInfo()
return nerdctl.NewNerdctl(addr, namespace)
}
func PullContainerdImage(input *hostapi.ContainerPullImageInput) error {
opt := &image.PullOptions{
RepoCommonOptions: image.RepoCommonOptions{
SkipVerify: true,
},
}
if input.Auth != nil {
opt.Username = input.Auth.Username
opt.Password = input.Auth.Password
}
imgTool := NewContainerdImageTool()
errs := make([]error, 0)
_, err := imgTool.Pull(input.Image, opt)
if err != nil {
// try http protocol
errs = append(errs, errors.Errorf("pullImageByCtrCmd by https: %s", trimPullImageError(err.Error())))
opt.PlainHttp = true
log.Infof("try pull image %s by http", input.Image)
if _, err := imgTool.Pull(input.Image, opt); err != nil {
errs = append(errs, errors.Errorf("pullImageByCtrCmd by http: %s", trimPullImageError(err.Error())))
return errors.NewAggregate(errs)
}
}
return nil
}
func trimPullImageError(err string) string {
lines := strings.Split(err, "\n")
filterLines := []string{}
for _, line := range lines {
// 过滤掉 containerd 拉取镜像的输出
if strings.Contains(line, "[0m") {
continue
}
if strings.Contains(line, "elapsed:") && strings.Contains(line, "total:") {
continue
}
filterLines = append(filterLines, line)
}
return strings.Join(filterLines, "\n")
}
func PushContainerdImage(input *hostapi.ContainerPushImageInput) error {
opt := &image.PushOptions{
RepoCommonOptions: image.RepoCommonOptions{
SkipVerify: true,
},
}
if input.Auth != nil {
opt.Username = input.Auth.Username
opt.Password = input.Auth.Password
}
imgTool := NewContainerdImageTool()
err := imgTool.Push(input.Image, opt)
errs := make([]error, 0)
if err != nil {
// try http protocol
errs = append(errs, errors.Wrap(err, "pushImageByCtrCmd: %s"))
opt.PlainHttp = true
log.Infof("try push image %s by http", input.Image)
if err := imgTool.Push(input.Image, opt); err != nil {
errs = append(errs, errors.Wrapf(err, "pushImageByCtrCmd by http"))
return errors.NewAggregate(errs)
}
}
return nil
}
type ContainerVolumeKey struct {
Id string
HostPath string
}
func (s *sPodGuestInstance) GetVolumeMountUsages() (map[ContainerVolumeKey]*volume_mount.ContainerVolumeMountUsage, error) {
errs := []error{}
result := make(map[ContainerVolumeKey]*volume_mount.ContainerVolumeMountUsage)
for ctrId, vols := range s.getContainerVolumeMounts() {
for i := range vols {
vol := vols[i]
drv, ok := volume_mount.GetDriver(vol.Type).(volume_mount.IUsageVolumeMount)
if !ok {
continue
}
vu, err := s.getVolumeMountUsage(drv, ctrId, vol)
if err != nil {
errs = append(errs, errors.Wrapf(err, "get container %s %s volume usage: %s", ctrId, drv.GetType(), jsonutils.Marshal(vol)))
} else {
result[ContainerVolumeKey{
Id: ctrId,
HostPath: vu.HostPath,
}] = vu
}
}
}
return result, errors.NewAggregate(errs)
}
func (s *sPodGuestInstance) getVolumeMountUsage(drv volume_mount.IUsageVolumeMount, ctrId string, vol *hostapi.ContainerVolumeMount) (*volume_mount.ContainerVolumeMountUsage, error) {
hp, err := drv.GetRuntimeMountHostPath(s, ctrId, vol)
if err != nil {
return nil, errors.Wrap(err, "GetRuntimeMountHostPath")
}
us, err := disk.Usage(hp)
if err != nil {
return nil, errors.Wrapf(err, "disk.Usage of %s", hp)
}
usage := &volume_mount.ContainerVolumeMountUsage{
Id: ctrId,
MountPath: vol.MountPath,
HostPath: hp,
VolumeType: string(drv.GetType()),
Usage: us,
Tags: make(map[string]string),
}
drv.InjectUsageTags(usage, vol)
return usage, nil
}
func (s *sPodGuestInstance) RestartLocalPodAndContainers(ctx context.Context, cred mcclient.TokenCredential) {
s.manager.GuestStartWorker.Run(newLocalPodRestartTask(ctx, cred, s), nil, nil)
}
type localPodRestartTask struct {
ctx context.Context
userCred mcclient.TokenCredential
pod *sPodGuestInstance
}
func newLocalPodRestartTask(ctx context.Context, userCred mcclient.TokenCredential, pod *sPodGuestInstance) *localPodRestartTask {
return &localPodRestartTask{
ctx: ctx,
userCred: userCred,
pod: pod,
}
}
func (t *localPodRestartTask) Run() {
log.Infof("restart pod and containers locally (%s/%s)", t.pod.Id, t.pod.GetName())
for _, ctr := range t.pod.GetContainers() {
log.Infof("stop container locally (%s/%s/%s/%s)", t.pod.Id, t.pod.GetName(), ctr.Id, ctr.Name)
if _, err := t.pod.StopContainer(t.ctx, t.userCred, ctr.Id, &hostapi.ContainerStopInput{
Timeout: 0,
ShmSizeMB: ctr.Spec.ShmSizeMB,
ContainerName: ctr.Name,
}); err != nil {
log.Errorf("stop container %s error: %v", ctr.Name, err)
}
}
if _, err := t.pod.startPod(t.ctx, t.userCred); err != nil {
log.Errorf("start pod(%s/%s) err: %s", t.pod.GetId(), t.pod.GetName(), err.Error())
return
}
for _, ctr := range t.pod.GetContainers() {
log.Infof("start container locally (%s/%s/%s/%s)", t.pod.Id, t.pod.GetName(), ctr.Id, ctr.Name)
if _, err := t.pod.StartLocalContainer(t.ctx, t.userCred, ctr.Id); err != nil {
log.Errorf("start container %s err: %s", ctr.Id, err.Error())
}
}
t.pod.SyncStatus("sync status after pod and containers restart locally", "")
}
func (t *localPodRestartTask) Dump() string {
return fmt.Sprintf("pod restart task %s/%s", t.pod.GetId(), t.pod.GetName())
}
func GetPodStatusByContainerStatus(status string, cStatus string, isPrimary bool) string {
if cStatus == computeapi.CONTAINER_STATUS_CRASH_LOOP_BACK_OFF && isPrimary {
status = computeapi.POD_STATUS_CRASH_LOOP_BACK_OFF
}
if cStatus == computeapi.CONTAINER_STATUS_EXITED && status != computeapi.VM_READY {
// status = computeapi.POD_STATUS_CONTAINER_EXITED
if isPrimary {
status = computeapi.VM_READY
}
}
return status
}
type SCpuFreqRealTimeSimulateManager struct {
lastTimeCpuUsage map[string]int64
lastTime *int64
interval int
stop chan struct{}
cpufreqMax int64
cpufreqMin int64
}
func newCpuFreqRealTimeSimulateManager(intervalSecond int, cpufreqMax, cpufreqMin int64) *SCpuFreqRealTimeSimulateManager {
return &SCpuFreqRealTimeSimulateManager{
interval: intervalSecond,
stop: make(chan struct{}),
cpufreqMax: cpufreqMax,
cpufreqMin: cpufreqMin,
}
}
func (m *SCpuFreqRealTimeSimulateManager) StartSetCpuFreqSimulate() {
log.Infof("StartSetCpuFreqSimulate")
ticker := time.NewTicker(time.Duration(m.interval) * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
m.startSetCpuFreqSimulate()
case <-m.stop:
return
}
}
}
func (m *SCpuFreqRealTimeSimulateManager) Stop() {
close(m.stop)
}
func (m *SCpuFreqRealTimeSimulateManager) startSetCpuFreqSimulate() {
newCpuUsage := map[string]int64{}
startTime := time.Now().UnixNano()
guestManager.Servers.Range(func(k, v interface{}) bool {
pod, ok := v.(*sPodGuestInstance)
if !ok {
log.Errorf("is not pod instance")
return false
}
cgroupRoot := path.Join(cgrouputils.GetSubModulePath("cpuacct"), PodCgroupParent())
criIds := pod.GetPodContainerCriIds()
for i := range criIds {
ctr, err := pod.GetContainerByCRIId(criIds[i])
if err != nil {
log.Errorf("failed get %s ctrid by criId %s", pod.GetName(), criIds[i])
continue
}
if ctr == nil {
log.Errorf("found nil container desc by criId %s in pod %s", criIds[i], pod.GetName())
continue
}
cpuDir := pod.getContainerSystemCpusDir(ctr.Id)
if !fileutils2.Exists(cpuDir) {
log.Debugf("%s %s has no cpuDir", pod.GetName(), criIds[i])
continue
}
usagePath := path.Join(cgroupRoot, criIds[i], "cpuacct.usage")
criCpuUsageStr, err := fileutils2.FileGetContents(usagePath)
if err != nil {
continue
}
criCpuUsageStr = strings.TrimSpace(criCpuUsageStr)
criCpuUsage, err := strconv.ParseInt(criCpuUsageStr, 10, 0)
if err != nil {
log.Errorf("failed parse %s %s: %s", usagePath, criCpuUsageStr, err)
continue
}
newCpuUsage[criIds[i]] = criCpuUsage
if m.lastTime != nil {
if lastTimeUsage, ok := m.lastTimeCpuUsage[criIds[i]]; ok {
timeDiff := float64(startTime - *m.lastTime)
usageDiff := float64(criCpuUsage - lastTimeUsage)
cpuUsagePercent := (usageDiff / timeDiff) * 100
freqRange := float64(m.cpufreqMax - m.cpufreqMin)
estimatedFreq := m.cpufreqMin + int64(freqRange*cpuUsagePercent/100)
err := pod.simulateContainerSystemCpuSetScalingCurFreq(ctr.Id, estimatedFreq)
if err != nil {
log.Errorf("failed set %s(%s) simulate cpufreq: %s", pod.GetId(), criIds[i], err)
}
}
}
}
return true
})
m.lastTime = &startTime
m.lastTimeCpuUsage = newCpuUsage
}