feat(region,host): set pod cgroup for lxcfs

This commit is contained in:
Zexi Li
2024-03-24 20:47:06 +08:00
parent 62137c2c68
commit 559df5f161
6 changed files with 217 additions and 67 deletions

View File

@@ -16,6 +16,7 @@ package main
import (
"context"
"fmt"
"time"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
@@ -26,6 +27,32 @@ import (
"yunion.io/x/onecloud/pkg/util/pod"
)
func getLxcfsMounts() []*runtimeapi.Mount {
lxcfsPath := "/var/lib/lxcfs"
const (
procCpuinfo = "/proc/cpuinfo"
procDiskstats = "/proc/diskstats"
procMeminfo = "/proc/meminfo"
procStat = "/proc/stat"
procSwaps = "/proc/swaps"
procUptime = "/proc/uptime"
)
newLxcfsMount := func(fp string) *runtimeapi.Mount {
return &runtimeapi.Mount{
ContainerPath: fp,
HostPath: fmt.Sprintf("%s%s", lxcfsPath, fp),
}
}
return []*runtimeapi.Mount{
newLxcfsMount(procUptime),
newLxcfsMount(procMeminfo),
newLxcfsMount(procStat),
newLxcfsMount(procCpuinfo),
newLxcfsMount(procSwaps),
newLxcfsMount(procDiskstats),
}
}
func main() {
ctl, err := pod.NewCRI("unix:///var/run/onecloud/containerd/containerd.sock", 3*time.Second)
if err != nil {
@@ -50,7 +77,7 @@ func main() {
podCfg := &runtimeapi.PodSandboxConfig{
Metadata: &runtimeapi.PodSandboxMetadata{
Name: "test-gpu",
Uid: "e25e38ef-fe98-4993-8641-699cd0530fc0",
Uid: "6659d5d0-9187-4b4f-8143-dbe0453229af",
Namespace: "27c9464ab54947328a29298761895be3",
Attempt: 1,
},
@@ -63,19 +90,32 @@ func main() {
Linux: nil,
Windows: nil,
}
var defaultCPUPeriod int64 = 100000
ctrCfgs := []*runtimeapi.ContainerConfig{
{
Metadata: &runtimeapi.ContainerMetadata{
Name: "nvidia-smi",
},
Image: &runtimeapi.ImageSpec{
Image: "ubuntu",
Image: "ubuntu:18.04",
},
Command: []string{"sleep", "100d"},
Linux: &runtimeapi.LinuxContainerConfig{
Linux: &runtimeapi.LinuxContainerConfig{
//SecurityContext: &runtimeapi.LinuxContainerSecurityContext{
// Privileged: true,
//},
Resources: &runtimeapi.LinuxContainerResources{
CpuPeriod: defaultCPUPeriod,
CpuQuota: 2 * defaultCPUPeriod,
CpuShares: 0,
MemoryLimitInBytes: 512 * 1024 * 1024,
OomScoreAdj: 0,
CpusetCpus: "",
CpusetMems: "",
HugepageLimits: nil,
Unified: nil,
MemorySwapLimitInBytes: 0,
},
},
Envs: []*runtimeapi.KeyValue{
{
@@ -87,6 +127,7 @@ func main() {
Value: "compute,utility",
},
},
// Mounts: getLxcfsMounts(),
/*Devices: []*runtimeapi.Device{
{
HostPath: "/dev/nvidia0",

View File

@@ -297,15 +297,15 @@ func (p *SPodDriver) StartGuestStopTask(guest *models.SGuest, ctx context.Contex
return task.ScheduleRun(nil)
}
func (p *SPodDriver) RequestUndeployGuestOnHost(ctx context.Context, guest *models.SGuest, host *models.SHost, task taskman.ITask) error {
task, err := taskman.TaskManager.NewTask(ctx, "PodDeleteTask", guest, task.GetUserCred(), nil, task.GetTaskId(), "", nil)
func (p *SPodDriver) StartDeleteGuestTask(ctx context.Context, userCred mcclient.TokenCredential, guest *models.SGuest, params *jsonutils.JSONDict, parentTaskId string) error {
task, err := taskman.TaskManager.NewTask(ctx, "PodDeleteTask", guest, userCred, params, parentTaskId, "", nil)
if err != nil {
return errors.Wrap(err, "New PodDeleteTask")
}
return task.ScheduleRun(nil)
}
func (p *SPodDriver) RequestUndeployPod(ctx context.Context, guest *models.SGuest, host *models.SHost, task taskman.ITask) error {
func (p *SPodDriver) RequestUndeployGuestOnHost(ctx context.Context, guest *models.SGuest, host *models.SHost, task taskman.ITask) error {
url := fmt.Sprintf("%s/servers/%s", host.ManagerUri, guest.Id)
header := p.getTaskRequestHeader(task)
_, _, err := httputils.JSONRequest(httputils.GetDefaultClient(), ctx, "DELETE", url, header, nil, false)

View File

@@ -16,6 +16,7 @@ package tasks
import (
"context"
"strings"
"yunion.io/x/jsonutils"
"yunion.io/x/pkg/errors"
@@ -62,23 +63,46 @@ func (t *PodDeleteTask) OnWaitContainerDeletedFailed(ctx context.Context, pod *m
}
func (t *PodDeleteTask) OnContainerDeleted(ctx context.Context, pod *models.SGuest) {
t.SetStage("OnPodUndeploy", nil)
host, _ := pod.GetHost()
if err := pod.GetDriver().(*guestdrivers.SPodDriver).RequestUndeployPod(ctx, pod, host, t); err != nil {
t.SetStage("OnPodStopped", nil)
if pod.HostId == "" {
t.OnPodStopped(ctx, pod, nil)
return
}
// call stop task to umount volumes
if err := pod.GetDriver().(*guestdrivers.SPodDriver).StartGuestStopTask(pod, ctx, t.GetUserCred(), nil, t.GetTaskId()); err != nil {
if errors.Cause(err) == httperrors.ErrNotFound {
t.OnPodUndeploy(ctx, pod, nil)
t.OnPodStopped(ctx, pod, nil)
return
}
t.OnPodUndeployFailed(ctx, pod, jsonutils.NewString(err.Error()))
t.OnPodStoppedFailed(ctx, pod, jsonutils.NewString(err.Error()))
return
}
}
func (t *PodDeleteTask) OnPodUndeploy(ctx context.Context, pod *models.SGuest, data jsonutils.JSONObject) {
func (t *PodDeleteTask) OnPodStopped(ctx context.Context, pod *models.SGuest, data jsonutils.JSONObject) {
t.SetStage("OnPodDeleted", nil)
task, err := taskman.TaskManager.NewTask(ctx, "GuestDeleteTask", pod, t.GetUserCred(), t.GetParams(), t.GetTaskId(), "", nil)
if err != nil {
t.OnPodDeletedFailed(ctx, pod, jsonutils.NewString(err.Error()))
return
}
task.ScheduleRun(nil)
}
func (t *PodDeleteTask) OnPodStoppedFailed(ctx context.Context, pod *models.SGuest, reason jsonutils.JSONObject) {
if strings.Contains(reason.String(), "NotFoundError") {
t.OnPodStopped(ctx, pod, jsonutils.NewDict())
return
} else {
pod.SetStatus(ctx, t.GetUserCred(), api.VM_STOP_FAILED, reason.String())
t.SetStageFailed(ctx, reason)
}
}
func (t *PodDeleteTask) OnPodDeleted(ctx context.Context, pod *models.SGuest, data jsonutils.JSONObject) {
t.SetStageComplete(ctx, nil)
}
func (t *PodDeleteTask) OnPodUndeployFailed(ctx context.Context, pod *models.SGuest, reason jsonutils.JSONObject) {
pod.SetStatus(ctx, t.GetUserCred(), api.VM_DELETE_FAIL, reason.String())
func (t *PodDeleteTask) OnPodDeletedFailed(ctx context.Context, pod *models.SGuest, reason jsonutils.JSONObject) {
t.SetStageFailed(ctx, reason)
}

View File

@@ -49,7 +49,6 @@ import (
"yunion.io/x/onecloud/pkg/util/fileutils2"
"yunion.io/x/onecloud/pkg/util/netutils2/getport"
"yunion.io/x/onecloud/pkg/util/pod"
"yunion.io/x/onecloud/pkg/util/procutils"
)
type PodInstance interface {
@@ -431,12 +430,26 @@ func (s *sPodGuestInstance) startPod(ctx context.Context, userCred mcclient.Toke
if err := s.setCRIInfo(ctx, userCred, criId, podCfg); err != nil {
return nil, errors.Wrap(err, "setCRIId")
}
// set pod cgroup resources
if err := s.setPodCgroupResources(criId, s.GetDesc().Mem, s.GetDesc().Cpu); err != nil {
return nil, errors.Wrapf(err, "set pod %s cgroup memMB %d, cpu %d", criId, s.GetDesc().Mem, s.GetDesc().Cpu)
}
return &computeapi.PodStartResponse{
CRIId: criId,
IsRunning: false,
}, nil
}
func (s *sPodGuestInstance) setPodCgroupResources(criId string, memMB int64, cpuCnt int64) error {
if err := s.getCGUtil().SetMemoryLimitBytes(criId, memMB*1024*1024); err != nil {
return errors.Wrap(err, "set cgroup memory limit")
}
if err := s.getCGUtil().SetCPUCfs(criId, cpuCnt*s.getDefaultCPUPeriod(), s.getDefaultCPUPeriod()); err != nil {
return errors.Wrap(err, "set cgroup cfs")
}
return nil
}
func (s *sPodGuestInstance) stopPod(ctx context.Context, timeout int64) error {
if err := s.umountPodVolumes(); err != nil {
return errors.Wrapf(err, "umount pod volumes")
@@ -452,8 +465,11 @@ func (s *sPodGuestInstance) stopPod(ctx context.Context, timeout int64) error {
}); err != nil {
return errors.Wrapf(err, "stop cri pod: %s", s.getCRIId())
}*/
if err := s.getCRI().RemovePod(ctx, s.getCRIId()); err != nil {
return errors.Wrapf(err, "remove cri pod: %s", s.getCRIId())
criId := s.getCRIId()
if criId != "" {
if err := s.getCRI().RemovePod(ctx, s.getCRIId()); err != nil {
return errors.Wrapf(err, "remove cri pod: %s", s.getCRIId())
}
}
return nil
}
@@ -659,7 +675,7 @@ func (s *sPodGuestInstance) GetPodMetadataPortMappings() ([]*computeapi.PodMetad
return nil, errors.Wrapf(err, "ParseString to json object: %s", cfgStr)
}
pms := make([]*computeapi.PodMetadataPortMapping, 0)
if err := obj.Unmarshal(pms); err != nil {
if err := obj.Unmarshal(&pms); err != nil {
return nil, errors.Wrap(err, "Unmarshal to PodMetadataPortMappings")
}
return pms, nil
@@ -713,37 +729,27 @@ func (s *sPodGuestInstance) getContainerLogPath(ctrId string) string {
func (s *sPodGuestInstance) getLxcfsMounts() []*runtimeapi.Mount {
// lxcfsPath := "/var/lib/lxc/lxcfs"
lxcfsPath := options.HostOptions.LxcfsPath
const (
procCpuinfo = "/proc/cpuinfo"
procDiskstats = "/proc/diskstats"
procMeminfo = "/proc/meminfo"
procStat = "/proc/stat"
procSwaps = "/proc/swaps"
procUptime = "/proc/uptime"
)
newLxcfsMount := func(fp string) *runtimeapi.Mount {
return &runtimeapi.Mount{
ContainerPath: fp,
HostPath: fmt.Sprintf("%s%s", lxcfsPath, fp),
}
}
return []*runtimeapi.Mount{
{
ContainerPath: "/proc/uptime",
HostPath: fmt.Sprintf("%s/proc/uptime", lxcfsPath),
Readonly: true,
},
{
ContainerPath: "/proc/meminfo",
HostPath: fmt.Sprintf("%s/proc/meminfo", lxcfsPath),
Readonly: true,
},
{
ContainerPath: "/proc/stat",
HostPath: fmt.Sprintf("%s/proc/stat", lxcfsPath),
Readonly: true,
},
{
ContainerPath: "/proc/cpuinfo",
HostPath: fmt.Sprintf("%s/proc/cpuinfo", lxcfsPath),
Readonly: true,
},
{
ContainerPath: "/proc/swaps",
HostPath: fmt.Sprintf("%s/proc/swaps", lxcfsPath),
Readonly: true,
},
{
ContainerPath: "/proc/diskstats",
HostPath: fmt.Sprintf("%s/proc/diskstats", lxcfsPath),
Readonly: true,
},
newLxcfsMount(procUptime),
newLxcfsMount(procMeminfo),
newLxcfsMount(procStat),
newLxcfsMount(procCpuinfo),
newLxcfsMount(procSwaps),
newLxcfsMount(procDiskstats),
}
}
@@ -771,24 +777,16 @@ func (s *sPodGuestInstance) getContainerMounts(input *hostapi.ContainerCreateInp
return mounts, nil
}
func (s *sPodGuestInstance) getContainerCgroupDir(dirType string, ctrId string) string {
cgroupDir := "/sys/fs/cgroup"
return filepath.Join(cgroupDir, dirType, s.getCgroupParent(), ctrId)
}
func (s *sPodGuestInstance) getContainerCgroupDevicesDir(ctrId string) string {
return s.getContainerCgroupDir("devices", ctrId)
func (s *sPodGuestInstance) getCGUtil() pod.CgroupUtil {
return pod.NewPodCgroupV1Util(s.getCgroupParent())
}
func (s *sPodGuestInstance) setContainerCgroupDevicesAllow(ctrId string, allowStrs []string) error {
for _, allowStr := range allowStrs {
deviceAllowFile := filepath.Join(s.getContainerCgroupDevicesDir(ctrId), "devices.allow")
out, err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", fmt.Sprintf("echo '%s' > %s", allowStr, deviceAllowFile)).Output()
if err != nil {
return errors.Wrapf(err, "echo %s to %s: %s", deviceAllowFile, allowStr, out)
}
}
return nil
return s.getCGUtil().SetDevicesAllow(ctrId, allowStrs)
}
func (s *sPodGuestInstance) getDefaultCPUPeriod() int64 {
return 100000
}
func (s *sPodGuestInstance) createContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerCreateInput) (string, error) {
@@ -803,7 +801,6 @@ func (s *sPodGuestInstance) createContainer(ctx context.Context, userCred mcclie
}
// REF: https://docs.docker.com/config/containers/resource_constraints/#configure-the-default-cfs-scheduler
var defaultCPUPeriod int64 = 1000
spec := input.Spec
ctrCfg := &runtimeapi.ContainerConfig{
Metadata: &runtimeapi.ContainerMetadata{
@@ -814,8 +811,8 @@ func (s *sPodGuestInstance) createContainer(ctx context.Context, userCred mcclie
},
Linux: &runtimeapi.LinuxContainerConfig{
Resources: &runtimeapi.LinuxContainerResources{
CpuPeriod: defaultCPUPeriod,
//CpuQuota: s.GetDesc().Cpu * defaultCPUPeriod,
CpuPeriod: s.getDefaultCPUPeriod(),
CpuQuota: s.GetDesc().Cpu * s.getDefaultCPUPeriod(),
//CpuShares: defaultCPUPeriod,
MemoryLimitInBytes: s.GetDesc().Mem * 1024 * 1024,
OomScoreAdj: 0,

View File

@@ -663,7 +663,7 @@ func GetApiResourceData(dev IDevice) *jsonutils.JSONDict {
if numaNode, err := dev.GetNumaNode(); err == nil {
data["numa_node"] = numaNode
} else {
log.Errorf("failed get dev %s numa node %s", dev.GetAddr(), err)
log.Warningf("failed get dev %s numa node %s", dev.GetAddr(), err)
}
if dev.GetMdevId() != "" {

88
pkg/util/pod/cgroup.go Normal file
View File

@@ -0,0 +1,88 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pod
import (
"fmt"
"path/filepath"
"yunion.io/x/pkg/errors"
"yunion.io/x/onecloud/pkg/util/procutils"
)
const (
CGROUP_PATH_SYSFS = "/sys/fs/cgroup"
)
type CgroupUtil interface {
SetMemoryLimitBytes(ctrId string, bytes int64) error
SetCPUCfs(ctrId string, quota int64, period int64) error
SetDevicesAllow(ctrId string, allows []string) error
}
type podCgroupV1Util struct {
parentPath string
}
func NewPodCgroupV1Util(parentPath string) CgroupUtil {
return &podCgroupV1Util{
parentPath: parentPath,
}
}
func (p podCgroupV1Util) getContainerControllerPath(controller string, ctrId string) string {
return filepath.Join(CGROUP_PATH_SYSFS, controller, p.parentPath, ctrId)
}
func (p podCgroupV1Util) getContainerCGFilePath(controller string, ctrId string, filename string) string {
return filepath.Join(p.getContainerControllerPath(controller, ctrId), filename)
}
func (p podCgroupV1Util) write(fp string, content string) error {
cmd := fmt.Sprintf("echo %q > %s", content, fp)
out, err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", cmd).Output()
if err != nil {
return errors.Wrapf(err, "%s: %s", cmd, out)
}
return nil
}
func (p podCgroupV1Util) SetMemoryLimitBytes(ctrId string, bytes int64) error {
memFp := p.getContainerCGFilePath("memory", ctrId, "memory.limit_in_bytes")
return p.write(memFp, fmt.Sprintf("%d", bytes))
}
func (p podCgroupV1Util) SetCPUCfs(ctrId string, quota int64, period int64) error {
quotaFp := p.getContainerCGFilePath("cpu,cpuacct", ctrId, "cpu.cfs_quota_us")
periodFp := p.getContainerCGFilePath("cpu,cpuacct", ctrId, "cpu.cfs_period_us")
if err := p.write(quotaFp, fmt.Sprintf("%d", quota)); err != nil {
return errors.Wrapf(err, "write quota: %d", quota)
}
if err := p.write(periodFp, fmt.Sprintf("%d", period)); err != nil {
return errors.Wrapf(err, "write period: %d", period)
}
return nil
}
func (p podCgroupV1Util) SetDevicesAllow(ctrId string, allows []string) error {
devicesFp := p.getContainerCGFilePath("devices", ctrId, "devices.allow")
for _, allowStr := range allows {
if err := p.write(devicesFp, allowStr); err != nil {
return errors.Wrapf(err, "write: %s", allowStr)
}
}
return nil
}