feat(region,host): startup probe of container (#21011)

This commit is contained in:
Zexi Li
2024-08-15 10:46:39 +08:00
committed by GitHub
parent 42e210861c
commit addbcbe0d1
34 changed files with 2068 additions and 19 deletions

View File

@@ -163,6 +163,11 @@ func init() {
return doComputeEventList(s, &nargs)
})
R(&TypeEventListOptions{}, "container-event", "Show operation event logs of container", func(s *mcclient.ClientSession, args *TypeEventListOptions) error {
nargs := EventListOptions{BaseEventListOptions: args.BaseEventListOptions, Id: args.ID, Type: []string{"container"}}
return doComputeEventList(s, &nargs)
})
R(&TypeEventListOptions{}, "disk-event", "Show operation event logs of disk", func(s *mcclient.ClientSession, args *TypeEventListOptions) error {
nargs := EventListOptions{BaseEventListOptions: args.BaseEventListOptions, Id: args.ID, Type: []string{"disk"}}
return doComputeEventList(s, &nargs)

View File

@@ -73,6 +73,9 @@ const (
CONTAINER_STATUS_RUNNING = "running"
CONTAINER_STATUS_DELETING = "deleting"
CONTAINER_STATUS_DELETE_FAILED = "delete_failed"
// for health check
CONTAINER_STATUS_PROBING = "probing"
CONTAINER_STATUS_PROBE_FAILED = "probe_failed"
)
const (

View File

@@ -282,9 +282,10 @@ type BackupInfo struct {
}
type PodContainerDesc struct {
Id string `json:"id"`
Name string `json:"name"`
Image string `json:"image"`
Id string `json:"id"`
Name string `json:"name"`
Image string `json:"image"`
Status string `json:"status"`
}
type Floppy struct {

View File

@@ -70,6 +70,23 @@ type ContainerSpec struct {
SimulateCpu bool `json:"simulate_cpu"`
ShmSizeMB int `json:"shm_size_mb"`
SecurityContext *ContainerSecurityContext `json:"security_context,omitempty"`
// Periodic probe of container liveness.
// Container will be restarted if the probe fails.
// Cannot be updated.
//LivenessProbe *ContainerProbe `json:"liveness_probe,omitempty"`
// StartupProbe indicates that the Pod has successfully initialized.
// If specified, no other probes are executed until this completes successfully.
StartupProbe *ContainerProbe `json:"startup_probe,omitempty"`
}
func (c *ContainerSpec) NeedProbe() bool {
//if c.LivenessProbe != nil {
// return true
//}
if c.StartupProbe != nil {
return true
}
return false
}
type ContainerCapability struct {

116
pkg/apis/container_probe.go Normal file
View File

@@ -0,0 +1,116 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apis
// ContainerProbeHandlerExecAction describes a "run in container" action.
type ContainerProbeHandlerExecAction struct {
// Command is the command line to execute inside the container, the working directory for the
// command is root ('/') in the container's filesystem. The command is simply exec'd, it is
// not run inside a shell, so traditional shell instructions ('|', etc) won't work. To use
// a shell, you need to explicitly call out to that shell.
// Exit status of 0 is treated as live/healthy and non-zero is unhealthy.
// +optional
Command []string `json:"command,omitempty"`
}
// URIScheme identifies the scheme used for connection to a host for Get actions
type URIScheme string
const (
// URISchemeHTTP means that the scheme used will be http://
URISchemeHTTP URIScheme = "HTTP"
// URISchemeHTTPS means that the scheme used will be https://
URISchemeHTTPS URIScheme = "HTTPS"
)
// HTTPHeader describes a custom header to be used in HTTP probes
type HTTPHeader struct {
// The header field name
Name string `json:"name"`
// The header field value
Value string `json:"value"`
}
// ContainerProbeHTTPGetAction describes an action based on HTTP Get requests.
type ContainerProbeHTTPGetAction struct {
// Path to access on the HTTP server.
// +optional
Path string `json:"path,omitempty"`
// Name or number of the port to access on the container.
// Number must be in the range 1 to 65535.
// Name must be an IANA_SVC_NAME.
Port int `json:"port"`
// Host name to connect to, defaults to the pod IP. You probably want to set
// "Host" in httpHeaders instead.
// +optional
Host string `json:"host,omitempty"`
// Scheme to use for connecting to the host.
// Defaults to HTTP.
// +optional
Scheme URIScheme `json:"scheme,omitempty"`
// Custom headers to set in the request. HTTP allows repeated headers.
// +optional
HTTPHeaders []HTTPHeader `json:"httpHeaders,omitempty"`
}
// ContainerProbeTCPSocketAction describes an action based on opening a socket
type ContainerProbeTCPSocketAction struct {
// Number or name of the port to access on the container.
// Number must be in the range 1 to 65535.
// Name must be an IANA_SVC_NAME.
Port int `json:"port"`
// Optional: Host name to connect to, defaults to the pod IP.
// +optional
Host string `json:"host,omitempty"`
}
type ContainerProbeType string
const (
ContainerProbeTypeLiveness ContainerProbeType = "Liveness"
ContainerProbeTypeReadiness ContainerProbeType = "Readiness"
ContainerProbeTypeStartup ContainerProbeType = "Startup"
)
// ContainerProbeHandler defines a specific action that should be taken
type ContainerProbeHandler struct {
// One and only one of the following should be specified.
// Exec specifies the action to take.
Exec *ContainerProbeHandlerExecAction `json:"exec,omitempty"`
// HTTPGet specifies the http request to perform.
HTTPGet *ContainerProbeHTTPGetAction `json:"http_get,omitempty"`
// TCPSocket specifies an action involving a TCP port.
TCPSocket *ContainerProbeTCPSocketAction `json:"tcp_socket,omitempty"`
}
// ContainerProbe describes a health check to be performed against a container to determine whether it is
// alive or ready to receive traffic.
type ContainerProbe struct {
// The action taken to determine the health of a container
ContainerProbeHandler `json:",inline"`
// Number of seconds after the container has started before liveness probes are initiated.
// InitialDelaySeconds int32 `json:"initial_delay_seconds,omitempty"`
// Number of seconds after which the probe times out.
TimeoutSeconds int32 `json:"timeout_seconds,omitempty"`
// How often (in seconds) to perform the probe.
// Default to 10 seconds. Minimum value is 1.
PeriodSeconds int32 `json:"period_seconds,omitempty"`
// Minimum consecutive successes for the probe to be considered successful after having failed.
// Defaults to 1. Must be 1 for liveness and startup. Minimum value is 1.
SuccessThreshold int32 `json:"success_threshold,omitempty"`
// Minimum consecutive failures for the probe to be considered failed after having succeeded.
// Defaults to 3. Minimum value is 1.
FailureThreshold int32 `json:"failure_threshold,omitempty"`
}

View File

@@ -164,6 +164,10 @@ func (m *SContainerManager) ValidateSpec(ctx context.Context, userCred mcclient.
return httperrors.NewInputParameterError("/dev/shm size is small than 64MB")
}
if err := m.ValidateSpecProbe(ctx, userCred, spec); err != nil {
return errors.Wrap(err, "validate probe configuration")
}
return nil
}
@@ -248,6 +252,81 @@ func (c *SContainer) CustomizeCreate(ctx context.Context, userCred mcclient.Toke
return nil
}*/
func (m *SContainerManager) ValidateSpecProbe(ctx context.Context, userCred mcclient.TokenCredential, spec *api.ContainerSpec) error {
//if err := m.validateSpecProbe(ctx, userCred, spec.LivenessProbe); err != nil {
// return errors.Wrap(err, "validate liveness probe")
//}
if err := m.validateSpecProbe(ctx, userCred, spec.StartupProbe); err != nil {
return errors.Wrap(err, "validate startup probe")
}
return nil
}
func (m *SContainerManager) validateSpecProbe(ctx context.Context, userCred mcclient.TokenCredential, probe *apis.ContainerProbe) error {
if probe == nil {
return nil
}
if err := m.validateSpecProbeHandler(probe.ContainerProbeHandler); err != nil {
return errors.Wrap(err, "validate container probe handler")
}
for key, val := range map[string]int32{
//"initial_delay_seconds": probe.InitialDelaySeconds,
"timeout_seconds": probe.TimeoutSeconds,
"period_seconds": probe.PeriodSeconds,
"success_threshold": probe.SuccessThreshold,
"failure_threshold": probe.FailureThreshold,
} {
if val < 0 {
return httperrors.NewInputParameterError(key + " is negative")
}
}
//if probe.InitialDelaySeconds == 0 {
// probe.InitialDelaySeconds = 5
//}
if probe.TimeoutSeconds == 0 {
probe.TimeoutSeconds = 3
}
if probe.PeriodSeconds == 0 {
probe.PeriodSeconds = 10
}
if probe.SuccessThreshold == 0 {
probe.SuccessThreshold = 1
}
if probe.FailureThreshold == 0 {
probe.FailureThreshold = 3
}
return nil
}
func (m *SContainerManager) validateSpecProbeHandler(probe apis.ContainerProbeHandler) error {
isAllNil := true
if probe.Exec != nil {
isAllNil = false
if len(probe.Exec.Command) == 0 {
return httperrors.NewInputParameterError("exec command is required")
}
}
if probe.TCPSocket != nil {
isAllNil = false
port := probe.TCPSocket.Port
if port < 1 || port > 65535 {
return httperrors.NewInputParameterError("invalid tcp socket port: %d, must between [1,65535]", port)
}
}
if probe.HTTPGet != nil {
isAllNil = false
port := probe.HTTPGet.Port
if port < 1 || port > 65535 {
return httperrors.NewInputParameterError("invalid http port: %d, must between [1,65535]", port)
}
}
if isAllNil {
return httperrors.NewInputParameterError("one of [exec, http_get, tcp_socket] is required")
}
return nil
}
func (c *SContainer) PostCreate(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data jsonutils.JSONObject) {
c.SVirtualResourceBase.PostCreate(ctx, userCred, ownerId, query, data)
if !jsonutils.QueryBoolean(data, "skip_task", false) {
@@ -581,16 +660,16 @@ func NewContainerReleasedDevice(device *api.ContainerDevice, devType, devModel s
}
}
func (s *SContainer) SaveReleasedDevices(ctx context.Context, userCred mcclient.TokenCredential, devs map[string]ContainerReleasedDevice) error {
return s.SetMetadata(ctx, api.CONTAINER_METADATA_RELEASED_DEVICES, devs, userCred)
func (c *SContainer) SaveReleasedDevices(ctx context.Context, userCred mcclient.TokenCredential, devs map[string]ContainerReleasedDevice) error {
return c.SetMetadata(ctx, api.CONTAINER_METADATA_RELEASED_DEVICES, devs, userCred)
}
func (s *SContainer) GetReleasedDevices(ctx context.Context, userCred mcclient.TokenCredential) (map[string]ContainerReleasedDevice, error) {
func (c *SContainer) GetReleasedDevices(ctx context.Context, userCred mcclient.TokenCredential) (map[string]ContainerReleasedDevice, error) {
out := make(map[string]ContainerReleasedDevice, 0)
if ret := s.GetMetadata(ctx, api.CONTAINER_METADATA_RELEASED_DEVICES, userCred); ret == "" {
if ret := c.GetMetadata(ctx, api.CONTAINER_METADATA_RELEASED_DEVICES, userCred); ret == "" {
return out, nil
}
obj := s.GetMetadataJson(ctx, api.CONTAINER_METADATA_RELEASED_DEVICES, userCred)
obj := c.GetMetadataJson(ctx, api.CONTAINER_METADATA_RELEASED_DEVICES, userCred)
if obj == nil {
return nil, errors.Error("get metadata released devices")
}

View File

@@ -794,7 +794,11 @@ func fetchContainers(guestIds []string) (map[string][]*api.PodContainerDesc, err
if !ok {
ret[container.GuestId] = []*api.PodContainerDesc{}
}
desc := &api.PodContainerDesc{Id: container.GetId(), Name: container.GetName()}
desc := &api.PodContainerDesc{
Id: container.GetId(),
Name: container.GetName(),
Status: container.Status,
}
if container.Spec != nil {
desc.Image = container.Spec.Image
}

View File

@@ -0,0 +1 @@
package prober // import "yunion.io/x/onecloud/pkg/hostman/container/prober"

View File

@@ -0,0 +1,224 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"fmt"
"io"
"time"
"yunion.io/x/log"
"yunion.io/x/pkg/errors"
"yunion.io/x/onecloud/pkg/apis"
hostapi "yunion.io/x/onecloud/pkg/apis/host"
"yunion.io/x/onecloud/pkg/hostman/container/prober/results"
"yunion.io/x/onecloud/pkg/hostman/guestman/container"
"yunion.io/x/onecloud/pkg/hostman/guestman/desc"
"yunion.io/x/onecloud/pkg/util/exec"
"yunion.io/x/onecloud/pkg/util/probe"
execprobe "yunion.io/x/onecloud/pkg/util/probe/exec"
tcpprobe "yunion.io/x/onecloud/pkg/util/probe/tcp"
)
const maxProbeRetries = 3
// Prober helps to check the liveness of a container.
type prober struct {
exec execprobe.Prober
tcp tcpprobe.Prober
runner container.CommandRunner
}
func newProber(runner container.CommandRunner) *prober {
return &prober{
exec: execprobe.New(),
tcp: tcpprobe.New(),
runner: runner,
}
}
// probe probes the container.
func (pb *prober) probe(probeType apis.ContainerProbeType, pod *desc.SGuestDesc, container *hostapi.ContainerDesc) (results.ProbeResult, error) {
var probeSpec *apis.ContainerProbe
switch probeType {
//case apis.ContainerProbeTypeLiveness:
// probeSpec = container.Spec.LivenessProbe
case apis.ContainerProbeTypeStartup:
probeSpec = container.Spec.StartupProbe
default:
err := errors.Errorf("unknown probe type: %q", probeType)
return results.NewFailure(err.Error()), err
}
ctrName := fmt.Sprintf("%s:%s", pod.Name, container.Name)
if probeSpec == nil {
log.Warningf("%s probe for %s is nil", probeType, ctrName)
return results.NewSuccess("probe is not defined"), nil
}
result, output, err := pb.runProbeWithRetries(probeType, probeSpec, pod, container, maxProbeRetries)
var msg string
if err != nil || (result != probe.Success && result != probe.Warning) {
// Probe failed in one way or another
if err != nil {
msg = fmt.Sprintf("%s probe for %q errored: %v", probeType, ctrName, err)
log.Debugf(msg)
} else {
// result != probe.Success
msg = fmt.Sprintf("%s probe for %q failed (%v): %s", probeType, ctrName, result, output)
log.Debugf(msg)
}
return results.NewFailure(msg), err
}
if result == probe.Warning {
msg = fmt.Sprintf("%s probe for %q succeeded with a warning: %s", probeType, ctrName, output)
log.Infof(msg)
} else {
msg = fmt.Sprintf("%s probe for %q succeeded", probeType, ctrName)
log.Debugf(msg)
}
return results.NewSuccess(msg), nil
}
// runProbeWithRetries tries to probe the container in a finite loop, it returns the last result
// if it never succeeds.
func (pb *prober) runProbeWithRetries(probeType apis.ContainerProbeType, p *apis.ContainerProbe, pod *desc.SGuestDesc, container *hostapi.ContainerDesc, retries int) (probe.Result, string, error) {
var err error
var result probe.Result
var output string
for i := 0; i < retries; i++ {
result, output, err = pb.runProbe(probeType, p, pod, container)
if err == nil {
return result, output, nil
}
}
return result, output, err
}
func (pb *prober) runProbe(probeType apis.ContainerProbeType, p *apis.ContainerProbe, pod *desc.SGuestDesc, container *hostapi.ContainerDesc) (probe.Result, string, error) {
timeout := time.Duration(p.TimeoutSeconds) * time.Second
if p.Exec != nil {
log.Debugf("Exec-Probe Pod: %v, Container: %v, Command: %v", pod.Name, container.Name, p.Exec.Command)
return pb.exec.Probe(pb.newExecInContainer(pod, container, p.Exec.Command, timeout))
}
if p.TCPSocket != nil {
port := p.TCPSocket.Port
host := p.TCPSocket.Host
if host == "" {
for _, nic := range pod.Nics {
if nic.Ip != "" {
host = nic.Ip
break
}
}
if host == "" {
return probe.Unknown, "", errors.Errorf("not found guest ip")
}
}
log.Debugf("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
return pb.tcp.Probe(host, port, timeout)
}
errMsg := fmt.Sprintf("Failed to find probe builder for pod %v, container: %v", pod.Name, container.Name)
log.Warningf(errMsg)
return probe.Unknown, "", errors.Error(errMsg)
}
type execInContainer struct {
// run executes a command in a container. Combined stdout and stderr output is always returned. An
// error is returned if one occurred.
run func() ([]byte, error)
writer io.Writer
}
func (pb *prober) newExecInContainer(pod *desc.SGuestDesc, container *hostapi.ContainerDesc, cmd []string, timeout time.Duration) exec.Cmd {
return &execInContainer{
run: func() ([]byte, error) {
return pb.runner.RunInContainer(pod, container.Id, cmd, timeout)
},
}
}
func (eic *execInContainer) Run() error {
return nil
}
func (eic *execInContainer) CombinedOutput() ([]byte, error) {
return eic.run()
}
func (eic *execInContainer) Output() ([]byte, error) {
return nil, fmt.Errorf("unimplemented")
}
func (eic *execInContainer) SetDir(dir string) {
//unimplemented
}
func (eic *execInContainer) SetStdin(in io.Reader) {
//unimplemented
}
func (eic *execInContainer) SetStdout(out io.Writer) {
eic.writer = out
}
func (eic *execInContainer) SetStderr(out io.Writer) {
eic.writer = out
}
func (eic *execInContainer) SetEnv(env []string) {
//unimplemented
}
func (eic *execInContainer) Stop() {
//unimplemented
}
func (eic *execInContainer) Start() error {
data, err := eic.run()
if eic.writer != nil {
eic.writer.Write(data)
}
return err
}
func (eic *execInContainer) Wait() error {
return nil
}
func (eic *execInContainer) StdoutPipe() (io.ReadCloser, error) {
return nil, fmt.Errorf("unimplemented")
}
func (eic *execInContainer) StderrPipe() (io.ReadCloser, error) {
return nil, fmt.Errorf("unimplemented")
}

View File

@@ -0,0 +1,217 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"sync"
"yunion.io/x/log"
"yunion.io/x/pkg/util/sets"
"yunion.io/x/pkg/util/wait"
"yunion.io/x/onecloud/pkg/apis"
"yunion.io/x/onecloud/pkg/hostman/container/prober/results"
"yunion.io/x/onecloud/pkg/hostman/container/status"
"yunion.io/x/onecloud/pkg/hostman/guestman/container"
"yunion.io/x/onecloud/pkg/hostman/guestman/desc"
)
// Key uniquely identifying container probes
type probeKey struct {
podUid string
containerName string
probeType apis.ContainerProbeType
}
// Manager manages pod probing. It creates a probe "worker" for every container that specifies a
// probe (AddPod). The worker periodically probes its assigned container and caches the results. The
// manager use the cached probe results to set the appropriate Ready state in the PodStatus when
// requested (UpdatePodStatus). Updating probe parameters is not currently supported.
// TODO: Move liveness probing out of the runtime, to here.
type Manager interface {
// AddPod creates new probe workers for every container probe. This should be called for every
// pod created.
AddPod(pod *desc.SGuestDesc)
// RemovePod handles cleaning up the removed pod state, including terminating probe workers and
// deleting cached results.
RemovePod(pod *desc.SGuestDesc)
// CleanupPods handles cleaning up pods which should no longer be running.
// It takes a map of "desired pods" which should not be cleaned up.
CleanupPods(desiredPods map[string]sets.Empty)
// UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
// container based on container running status, cached probe results and worker states.
UpdatePodStatus(podId string)
// Start starts the Manager sync loops.
Start()
}
type manager struct {
// Map of active workers for probes
workers map[probeKey]*worker
// Lock for accessing & mutating workers
workerLock sync.RWMutex
statusManager status.Manager
// readinessManager manages the results of readiness probes
// readinessManager results.Manager
// livenessManager manages the results of liveness probes
livenessManager results.Manager
// startupManager manages the results of startup probes
startupManager results.Manager
// prober executes the probe actions
prober *prober
}
func NewManager(
statusManager status.Manager,
livenessManager results.Manager,
startupManager results.Manager,
runner container.CommandRunner) Manager {
prober := newProber(runner)
return &manager{
statusManager: statusManager,
prober: prober,
livenessManager: livenessManager,
startupManager: startupManager,
workers: make(map[probeKey]*worker),
workerLock: sync.RWMutex{},
}
}
// Start syncing probe status. This should only be called once.
func (m *manager) Start() {
// start syncing readiness.
//go wait.Forever(m.updateReadiness, 0)
// start syncing startup.
go wait.Forever(m.updateStartup, 0)
}
func (m *manager) AddPod(pod *desc.SGuestDesc) {
m.workerLock.Lock()
defer m.workerLock.Unlock()
key := probeKey{podUid: pod.Uuid}
for _, c := range pod.Containers {
key.containerName = c.Name
if c.Spec.StartupProbe != nil {
key.probeType = apis.ContainerProbeTypeStartup
if _, ok := m.workers[key]; ok {
log.Errorf("Startup probe already exists: %s:%s", pod.Name, c.Name)
return
}
w := newWorker(m, key.probeType, pod, c)
m.workers[key] = w
go w.run()
}
/*if c.Spec.LivenessProbe != nil {
key.probeType = apis.ContainerProbeTypeLiveness
if _, ok := m.workers[key]; ok {
log.Errorf("Liveness probe already exists: %s:%s", pod.Name, c.Name)
return
}
w := newWorker(m, key.probeType, pod, c)
m.workers[key] = w
go w.run()
}*/
}
}
func (m *manager) RemovePod(pod *desc.SGuestDesc) {
m.workerLock.RLock()
defer m.workerLock.RUnlock()
key := probeKey{podUid: pod.Uuid}
for _, c := range pod.Containers {
key.containerName = c.Name
for _, probeType := range []apis.ContainerProbeType{apis.ContainerProbeTypeLiveness, apis.ContainerProbeTypeReadiness, apis.ContainerProbeTypeStartup} {
key.probeType = probeType
if worker, ok := m.workers[key]; ok {
worker.stop()
}
}
}
}
func (m *manager) CleanupPods(desiredPods map[string]sets.Empty) {
m.workerLock.RLock()
defer m.workerLock.RUnlock()
for key, worker := range m.workers {
if _, ok := desiredPods[key.podUid]; !ok {
worker.stop()
}
}
}
func (m *manager) UpdatePodStatus(status string) {}
func (m *manager) getWorker(podId string, containerName string, probeType apis.ContainerProbeType) (*worker, bool) {
m.workerLock.RLock()
defer m.workerLock.RUnlock()
worker, ok := m.workers[probeKey{podId, containerName, probeType}]
return worker, ok
}
// Called by the worker after exiting
func (m *manager) removeWorker(podId string, containerName string, probeType apis.ContainerProbeType) {
m.workerLock.Lock()
defer m.workerLock.Unlock()
delete(m.workers, probeKey{podUid: podId, containerName: containerName, probeType: probeType})
}
func (m *manager) workerCount() int {
m.workerLock.RLock()
defer m.workerLock.RUnlock()
return len(m.workers)
}
/*func (m *manager) updateReadiness() {
update := <-m.readinessManager.Updates()
ready := update.Result == results.Success
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}*/
func (m *manager) updateStartup() {
update := <-m.startupManager.Updates()
started := update.Result.Result == results.Success
m.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started, update.Result)
}

View File

@@ -0,0 +1 @@
package results // import "yunion.io/x/onecloud/pkg/hostman/container/prober/results"

View File

@@ -0,0 +1,166 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package results
import (
"fmt"
"sync"
"yunion.io/x/onecloud/pkg/hostman/guestman/desc"
)
func NewFailure(reason string) ProbeResult {
return newProbeResult(Failure, reason)
}
func NewSuccess(reason string) ProbeResult {
return newProbeResult(Success, reason)
}
func NewUnknown(reason string) ProbeResult {
return newProbeResult(Unknown, reason)
}
func newProbeResult(r Result, reason string) ProbeResult {
return ProbeResult{
Result: r,
Reason: reason,
}
}
type ProbeResult struct {
Result
Reason string
}
func (pr ProbeResult) String() string {
return fmt.Sprintf("%s: %s", pr.Result.String(), pr.Reason)
}
// Result is the type for probe results.
type Result int
const (
// Unknown is encoded as -1 (type Result)
Unknown Result = iota - 1
// Success is encoded as 0 (type Result)
Success
// Failure is encoded as 1 (type Result)
Failure
)
func (r Result) String() string {
switch r {
case Success:
return "Success"
case Failure:
return "Failure"
default:
return "UNKNOWN"
}
}
// Update is an enum of the types of updates sent over the Updates channel.
type Update struct {
ContainerID string
Result ProbeResult
PodUID string
}
// Manager provides a probe results cache and channel of updates
type Manager interface {
// Get returns the cached result for the container with the given ID.
Get(containerId string) (Result, bool)
// Set sets the cached result for the container with the given ID.
// The pod is only included to be sent with the update.
Set(containerId string, result ProbeResult, pod *desc.SGuestDesc)
// Remove clears the cached result for the container with the given ID.
Remove(containerId string)
// Updates creates a channel that receives an Update whenever its result changes (but not
// removed).
// NOTE: The current implementation only supports a single updates channel.
Updates() <-chan Update
}
var _ Manager = &manager{}
type manager struct {
// guards the cache
sync.RWMutex
// map of container ID -> probe Result
cache map[string]Result
// channel of updates
updates chan Update
}
func NewManager() Manager {
return &manager{
cache: make(map[string]Result),
updates: make(chan Update, 20),
}
}
func (m *manager) Get(id string) (Result, bool) {
m.RLock()
defer m.RUnlock()
result, found := m.cache[id]
return result, found
}
func (m *manager) Set(id string, result ProbeResult, pod *desc.SGuestDesc) {
if m.setInternal(id, result) {
m.updates <- Update{ContainerID: id, Result: result, PodUID: pod.Uuid}
}
}
// Internal helper for locked portion of set. Returns whether an update should be sent.
func (m *manager) setInternal(id string, result ProbeResult) bool {
m.Lock()
defer m.Unlock()
prev, exists := m.cache[id]
if !exists || prev != result.Result {
m.cache[id] = result.Result
return true
}
return false
}
func (m *manager) Remove(id string) {
m.Lock()
defer m.Unlock()
delete(m.cache, id)
}
func (m *manager) Updates() <-chan Update {
return m.updates
}

View File

@@ -0,0 +1,191 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"math/rand"
"time"
"yunion.io/x/pkg/util/runtime"
"yunion.io/x/onecloud/pkg/apis"
hostapi "yunion.io/x/onecloud/pkg/apis/host"
"yunion.io/x/onecloud/pkg/hostman/container/prober/results"
"yunion.io/x/onecloud/pkg/hostman/guestman/desc"
)
// worker handles the periodic probing of its assigned container. Each worker has a go-routine
// associated with it which runs the probe loop until the container permanently terminates, or the
// stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date
// container IDs.
type worker struct {
// Channel for stopping the probe.
stopCh chan struct{}
// The pod containing this probe (read-only)
pod *desc.SGuestDesc
// The container to probe (read-only)
container *hostapi.ContainerDesc
// Describes the probe configuration (read-only)
spec *apis.ContainerProbe
// The type of the worker.
probeType apis.ContainerProbeType
// The probe value during the initial delay.
initialValue results.Result
// Where to store this workers results.
resultsManager results.Manager
probeManager *manager
// The last known container ID for this worker.
containerId string
// The last probe result for this worker.
lastResult results.Result
// How many times in a row the probe has returned the same result.
resultRun int
// If set, skip probing
onHold bool
}
// Creates and starts a new probe worker.
func newWorker(
m *manager,
probeType apis.ContainerProbeType,
pod *desc.SGuestDesc,
container *hostapi.ContainerDesc) *worker {
w := &worker{
stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
pod: pod,
container: container,
probeType: probeType,
probeManager: m,
containerId: container.Id,
}
switch probeType {
//case apis.ContainerProbeTypeLiveness:
// w.spec = container.Spec.LivenessProbe
// w.resultsManager = m.livenessManager
// w.initialValue = results.Success
case apis.ContainerProbeTypeStartup:
w.spec = container.Spec.StartupProbe
w.resultsManager = m.startupManager
w.initialValue = results.Unknown
}
return w
}
// run periodically probes the container.
func (w *worker) run() {
probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second
// If host restarted the probes could be started in rapid succession.
// Let the worker wait for a random portion of tickerPeriod before probing.
time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
probeTicker := time.NewTicker(probeTickerPeriod)
defer func() {
// Clean up.
probeTicker.Stop()
if len(w.containerId) != 0 {
w.resultsManager.Remove(w.containerId)
}
w.probeManager.removeWorker(w.pod.Uuid, w.container.Name, w.probeType)
}()
probeLoop:
for w.doProbe() {
// Wait for next probe tick.
select {
case <-w.stopCh:
break probeLoop
case <-probeTicker.C:
// continue
}
}
}
// stop stops the probe worker. The worker handles cleanup and removes itself from its manager.
// It is safe to call stop multiple times.
func (w *worker) stop() {
select {
case w.stopCh <- struct{}{}:
default: // Non-blocking.
}
}
// doProbe probes the container once and records the result.
// Returns whether the worker should continue.
func (w *worker) doProbe() (keepGoing bool) {
// Actually eat panics (HandleCrash takes care of logging)
defer func() { recover() }()
defer runtime.HandleCrash(func(_ interface{}) {
keepGoing = true
})
result, err := w.probeManager.prober.probe(w.probeType, w.pod, w.container)
if err != nil {
// prober error, throw away the result.
return true
}
if w.lastResult == result.Result {
w.resultRun++
} else {
w.lastResult = result.Result
w.resultRun = 1
}
if (result.Result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
(result.Result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
return true
}
w.resultsManager.Set(w.containerId, result, w.pod)
if (w.probeType == apis.ContainerProbeTypeLiveness || w.probeType == apis.ContainerProbeTypeStartup) && result.Result == results.Failure {
// The container fails a liveness/startup check, it will need to be restarted.
// Stop probing until we see a new container ID. This is to reduce the
// chance of hitting #21751, where running `docker exec` when a
// container is being stopped may lead to corrupted container state.
w.onHold = true
w.resultRun = 0
}
return true
}

View File

@@ -0,0 +1 @@
package status // import "yunion.io/x/onecloud/pkg/hostman/container/status"

View File

@@ -0,0 +1,55 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package status
import (
"context"
"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/onecloud/pkg/apis"
computeapi "yunion.io/x/onecloud/pkg/apis/compute"
"yunion.io/x/onecloud/pkg/hostman/container/prober/results"
"yunion.io/x/onecloud/pkg/hostman/hostutils"
computemodules "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
)
type Manager interface {
// SetContainerStartup updates the container status with the given startup
// and triggers a status update.
SetContainerStartup(podId string, containerId string, started bool, result results.ProbeResult)
}
type manager struct{}
func NewManager() Manager {
return &manager{}
}
func (m *manager) SetContainerStartup(podId string, containerId string, started bool, result results.ProbeResult) {
s := hostutils.GetComputeSession(context.Background())
status := computeapi.CONTAINER_STATUS_PROBE_FAILED
if started {
status = computeapi.CONTAINER_STATUS_RUNNING
}
input := apis.PerformStatusInput{
Status: status,
Reason: result.Reason,
}
if _, err := computemodules.Containers.PerformAction(s, containerId, "status", jsonutils.Marshal(input)); err != nil {
log.Errorf("set container(%s/%s) status failed: %s", podId, containerId, err)
}
}

View File

@@ -0,0 +1 @@
package container // import "yunion.io/x/onecloud/pkg/hostman/guestman/container"

View File

@@ -0,0 +1,27 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package container
import (
"time"
"yunion.io/x/onecloud/pkg/hostman/guestman/desc"
)
// CommandRunner interface allows to run command in a container.
type CommandRunner interface {
// RunInContainer synchronously executes the command in the container, and returns the output.
RunInContainer(pod *desc.SGuestDesc, containerId string, cmd []string, timeout time.Duration) ([]byte, error)
}

View File

@@ -39,6 +39,7 @@ import (
"yunion.io/x/onecloud/pkg/apis/compute"
hostapi "yunion.io/x/onecloud/pkg/apis/host"
"yunion.io/x/onecloud/pkg/appsrv"
"yunion.io/x/onecloud/pkg/hostman/container/prober"
"yunion.io/x/onecloud/pkg/hostman/guestman/arch"
"yunion.io/x/onecloud/pkg/hostman/guestman/desc"
fwd "yunion.io/x/onecloud/pkg/hostman/guestman/forwarder"
@@ -106,6 +107,9 @@ type SGuestManager struct {
numaAllocate bool
cpuSet *CpuSetCounter
pythonPath string
// container related members
containerProbeManager prober.Manager
}
func NewGuestManager(host hostutils.IHost, serversPath string) (*SGuestManager, error) {
@@ -128,6 +132,9 @@ func NewGuestManager(host hostutils.IHost, serversPath string) (*SGuestManager,
if err != nil {
return nil, errors.Wrap(err, "mkdir qemu log dir")
}
if manager.host.IsContainerHost() {
manager.startContainerProbeManager()
}
return manager, nil
}

View File

@@ -38,6 +38,9 @@ import (
hostapi "yunion.io/x/onecloud/pkg/apis/host"
"yunion.io/x/onecloud/pkg/hostman/container/device"
"yunion.io/x/onecloud/pkg/hostman/container/lifecycle"
"yunion.io/x/onecloud/pkg/hostman/container/prober"
proberesults "yunion.io/x/onecloud/pkg/hostman/container/prober/results"
"yunion.io/x/onecloud/pkg/hostman/container/status"
"yunion.io/x/onecloud/pkg/hostman/container/volume_mount"
"yunion.io/x/onecloud/pkg/hostman/guestman/desc"
deployapi "yunion.io/x/onecloud/pkg/hostman/hostdeployer/apis"
@@ -59,6 +62,48 @@ import (
"yunion.io/x/onecloud/pkg/util/procutils"
)
func (m *SGuestManager) startContainerProbeManager() {
livenessManager := proberesults.NewManager()
startupManager := proberesults.NewManager()
man := prober.NewManager(status.NewManager(), livenessManager, startupManager, newContainerRunner(m))
m.containerProbeManager = man
man.Start()
}
func (m *SGuestManager) GetContainerProbeManager() prober.Manager {
return m.containerProbeManager
}
func newContainerRunner(man *SGuestManager) *containerRunner {
return &containerRunner{man}
}
type containerRunner struct {
manager *SGuestManager
}
func (cr *containerRunner) RunInContainer(pod *desc.SGuestDesc, containerId string, cmd []string, timeout time.Duration) ([]byte, error) {
srv, ok := cr.manager.GetServer(pod.Uuid)
if !ok {
return nil, errors.Wrapf(httperrors.ErrNotFound, "server %s not found", pod.Uuid)
}
s := srv.(*sPodGuestInstance)
ctrCriId, err := s.getContainerCRIId(containerId)
if err != nil {
return nil, errors.Wrap(err, "get container cri id")
}
cli := s.getCRI().GetRuntimeClient()
resp, err := cli.ExecSync(context.Background(), &runtimeapi.ExecSyncRequest{
ContainerId: ctrCriId,
Cmd: cmd,
Timeout: int64(timeout),
})
if err != nil {
return nil, errors.Wrapf(err, "exec sync %#v to %s", cmd, ctrCriId)
}
return append(resp.Stdout, resp.Stderr...), nil
}
type PodInstance interface {
GuestRuntimeInstance
@@ -122,6 +167,7 @@ func (s *sPodGuestInstance) ImportServer(pendingDelete bool) {
s.manager.SaveServer(s.Id, s)
s.manager.RemoveCandidateServer(s)
s.SyncStatus("sync status after host started")
s.getProbeManager().AddPod(s.Desc)
}
func (s *sPodGuestInstance) SyncStatus(reason string) {
@@ -196,6 +242,10 @@ func (s *sPodGuestInstance) getCRI() pod.CRI {
return s.manager.GetCRI()
}
func (s *sPodGuestInstance) getProbeManager() prober.Manager {
return s.manager.GetContainerProbeManager()
}
func (s *sPodGuestInstance) getHostCPUMap() *pod.HostContainerCPUMap {
return s.manager.GetContainerCPUMap()
}
@@ -317,6 +367,21 @@ func (s *sPodGuestInstance) umountPodVolumes() error {
return nil
}
func (s *sPodGuestInstance) GetContainers() []*hostapi.ContainerDesc {
return s.GetDesc().Containers
}
func (s *sPodGuestInstance) GetContainerById(ctrId string) *hostapi.ContainerDesc {
ctrs := s.GetContainers()
for i := range ctrs {
ctr := ctrs[i]
if ctr.Id == ctrId {
return ctr
}
}
return nil
}
func (s *sPodGuestInstance) getContainerVolumeMounts() map[string][]*hostapi.ContainerVolumeMount {
result := make(map[string][]*hostapi.ContainerVolumeMount, 0)
for _, ctr := range s.GetDesc().Containers {
@@ -503,6 +568,8 @@ func (s *sPodGuestInstance) startPod(ctx context.Context, userCred mcclient.Toke
if err := s.setPodCgroupResources(criId, s.GetDesc().Mem, s.GetDesc().Cpu); err != nil {
return nil, errors.Wrapf(err, "set pod %s cgroup memMB %d, cpu %d", criId, s.GetDesc().Mem, s.GetDesc().Cpu)
}
s.getProbeManager().AddPod(s.Desc)
return &computeapi.PodStartResponse{
CRIId: criId,
IsRunning: false,
@@ -543,6 +610,8 @@ func (s *sPodGuestInstance) ensurePodRemoved(ctx context.Context, timeout int64)
return errors.Wrapf(err, "remove cri pod: %s", p.GetId())
}
}
s.getProbeManager().RemovePod(s.Desc)
return nil
}
@@ -605,8 +674,12 @@ func (s *sPodGuestInstance) SyncConfig(ctx context.Context, guestDesc *desc.SGue
return nil, nil
}
func (s *sPodGuestInstance) getContainerMeta(id string) *sContainer {
return s.containers[id]
}
func (s *sPodGuestInstance) getContainerCRIId(ctrId string) (string, error) {
ctr := s.getContainer(ctrId)
ctr := s.getContainerMeta(ctrId)
if ctr == nil {
return "", errors.Wrapf(errors.ErrNotFound, "Not found container %s", ctrId)
}
@@ -852,10 +925,6 @@ func (s *sPodGuestInstance) getContainersFilePath() string {
return path.Join(s.HomeDir(), "containers")
}
func (s *sPodGuestInstance) getContainer(id string) *sContainer {
return s.containers[id]
}
func (s *sPodGuestInstance) CreateContainer(ctx context.Context, userCred mcclient.TokenCredential, id string, input *hostapi.ContainerCreateInput) (jsonutils.JSONObject, error) {
ctrCriId, err := s.createContainer(ctx, userCred, id, input)
if err != nil {
@@ -1263,6 +1332,15 @@ func (s *sPodGuestInstance) getContainerStatus(ctx context.Context, ctrId string
case runtimeapi.ContainerState_CONTAINER_UNKNOWN:
status = computeapi.CONTAINER_STATUS_UNKNOWN
}
if status == computeapi.CONTAINER_STATUS_RUNNING {
ctr := s.GetContainerById(ctrId)
if ctr == nil {
return "", errors.Wrapf(httperrors.ErrNotFound, "not found container by id %s", ctrId)
}
if ctr.Spec.NeedProbe() {
status = computeapi.CONTAINER_STATUS_PROBING
}
}
return status, nil
}

View File

@@ -61,7 +61,7 @@ func (h *SHostInfo) startContainerStatsProvider(cri pod.CRI) error {
if err := ca.Start(); err != nil {
return errors.Wrap(err, "start cadvisor")
}
h.containerStatsProvier = stats.NewCRIContainerStatsProvider(ca, cri.GetRuntimeClient(), cri.GetImageClient())
h.containerStatsProvider = stats.NewCRIContainerStatsProvider(ca, cri.GetRuntimeClient(), cri.GetImageClient())
return nil
}
@@ -74,5 +74,5 @@ func (h *SHostInfo) GetContainerCPUMap() *pod.HostContainerCPUMap {
}
func (h *SHostInfo) GetContainerStatsProvider() stats.ContainerStatsProvider {
return h.containerStatsProvier
return h.containerStatsProvider
}

View File

@@ -120,9 +120,10 @@ type SHostInfo struct {
IoScheduler string
cri pod.CRI
containerCPUMap *pod.HostContainerCPUMap
containerStatsProvier stats.ContainerStatsProvider
// container related members
cri pod.CRI
containerCPUMap *pod.HostContainerCPUMap
containerStatsProvider stats.ContainerStatsProvider
}
func (h *SHostInfo) GetContainerDeviceConfigurationFilePath() string {

1
pkg/util/exec/doc.go Normal file
View File

@@ -0,0 +1 @@
package exec // import "yunion.io/x/onecloud/pkg/util/exec"

266
pkg/util/exec/exec.go Normal file
View File

@@ -0,0 +1,266 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exec
import (
"context"
"io"
osexec "os/exec"
"syscall"
"time"
)
// ErrExecutableNotFound is returned if the executable is not found.
var ErrExecutableNotFound = osexec.ErrNotFound
// Interface is an interface that presents a subset of the os/exec API. Use this
// when you want to inject fakeable/mockable exec behavior.
type Interface interface {
// Command returns a Cmd instance which can be used to run a single command.
// This follows the pattern of package os/exec.
Command(cmd string, args ...string) Cmd
// CommandContext returns a Cmd instance which can be used to run a single command.
//
// The provided context is used to kill the process if the context becomes done
// before the command completes on its own. For example, a timeout can be set in
// the context.
CommandContext(ctx context.Context, cmd string, args ...string) Cmd
// LookPath wraps os/exec.LookPath
LookPath(file string) (string, error)
}
// Cmd is an interface that presents an API that is very similar to Cmd from os/exec.
// As more functionality is needed, this can grow. Since Cmd is a struct, we will have
// to replace fields with get/set method pairs.
type Cmd interface {
// Run runs the command to the completion.
Run() error
// CombinedOutput runs the command and returns its combined standard output
// and standard error. This follows the pattern of package os/exec.
CombinedOutput() ([]byte, error)
// Output runs the command and returns standard output, but not standard err
Output() ([]byte, error)
SetDir(dir string)
SetStdin(in io.Reader)
SetStdout(out io.Writer)
SetStderr(out io.Writer)
SetEnv(env []string)
// StdoutPipe and StderrPipe for getting the process' Stdout and Stderr as
// Readers
StdoutPipe() (io.ReadCloser, error)
StderrPipe() (io.ReadCloser, error)
// Start and Wait are for running a process non-blocking
Start() error
Wait() error
// Stops the command by sending SIGTERM. It is not guaranteed the
// process will stop before this function returns. If the process is not
// responding, an internal timer function will send a SIGKILL to force
// terminate after 10 seconds.
Stop()
}
// ExitError is an interface that presents an API similar to os.ProcessState, which is
// what ExitError from os/exec is. This is designed to make testing a bit easier and
// probably loses some of the cross-platform properties of the underlying library.
type ExitError interface {
String() string
Error() string
Exited() bool
ExitStatus() int
}
// Implements Interface in terms of really exec()ing.
type executor struct{}
// New returns a new Interface which will os/exec to run commands.
func New() Interface {
return &executor{}
}
// Command is part of the Interface interface.
func (executor *executor) Command(cmd string, args ...string) Cmd {
return (*cmdWrapper)(osexec.Command(cmd, args...))
}
// CommandContext is part of the Interface interface.
func (executor *executor) CommandContext(ctx context.Context, cmd string, args ...string) Cmd {
return (*cmdWrapper)(osexec.CommandContext(ctx, cmd, args...))
}
// LookPath is part of the Interface interface
func (executor *executor) LookPath(file string) (string, error) {
return osexec.LookPath(file)
}
// Wraps exec.Cmd so we can capture errors.
type cmdWrapper osexec.Cmd
var _ Cmd = &cmdWrapper{}
func (cmd *cmdWrapper) SetDir(dir string) {
cmd.Dir = dir
}
func (cmd *cmdWrapper) SetStdin(in io.Reader) {
cmd.Stdin = in
}
func (cmd *cmdWrapper) SetStdout(out io.Writer) {
cmd.Stdout = out
}
func (cmd *cmdWrapper) SetStderr(out io.Writer) {
cmd.Stderr = out
}
func (cmd *cmdWrapper) SetEnv(env []string) {
cmd.Env = env
}
func (cmd *cmdWrapper) StdoutPipe() (io.ReadCloser, error) {
r, err := (*osexec.Cmd)(cmd).StdoutPipe()
return r, handleError(err)
}
func (cmd *cmdWrapper) StderrPipe() (io.ReadCloser, error) {
r, err := (*osexec.Cmd)(cmd).StderrPipe()
return r, handleError(err)
}
func (cmd *cmdWrapper) Start() error {
err := (*osexec.Cmd)(cmd).Start()
return handleError(err)
}
func (cmd *cmdWrapper) Wait() error {
err := (*osexec.Cmd)(cmd).Wait()
return handleError(err)
}
// Run is part of the Cmd interface.
func (cmd *cmdWrapper) Run() error {
err := (*osexec.Cmd)(cmd).Run()
return handleError(err)
}
// CombinedOutput is part of the Cmd interface.
func (cmd *cmdWrapper) CombinedOutput() ([]byte, error) {
out, err := (*osexec.Cmd)(cmd).CombinedOutput()
return out, handleError(err)
}
func (cmd *cmdWrapper) Output() ([]byte, error) {
out, err := (*osexec.Cmd)(cmd).Output()
return out, handleError(err)
}
// Stop is part of the Cmd interface.
func (cmd *cmdWrapper) Stop() {
c := (*osexec.Cmd)(cmd)
if c.Process == nil {
return
}
c.Process.Signal(syscall.SIGTERM)
time.AfterFunc(10*time.Second, func() {
if !c.ProcessState.Exited() {
c.Process.Signal(syscall.SIGKILL)
}
})
}
func handleError(err error) error {
if err == nil {
return nil
}
switch e := err.(type) {
case *osexec.ExitError:
return &ExitErrorWrapper{e}
case *osexec.Error:
if e.Err == osexec.ErrNotFound {
return ErrExecutableNotFound
}
}
return err
}
// ExitErrorWrapper is an implementation of ExitError in terms of os/exec ExitError.
// Note: standard exec.ExitError is type *os.ProcessState, which already implements Exited().
type ExitErrorWrapper struct {
*osexec.ExitError
}
var _ ExitError = &ExitErrorWrapper{}
// ExitStatus is part of the ExitError interface.
func (eew ExitErrorWrapper) ExitStatus() int {
ws, ok := eew.Sys().(syscall.WaitStatus)
if !ok {
panic("can't call ExitStatus() on a non-WaitStatus exitErrorWrapper")
}
return ws.ExitStatus()
}
// CodeExitError is an implementation of ExitError consisting of an error object
// and an exit code (the upper bits of os.exec.ExitStatus).
type CodeExitError struct {
Err error
Code int
}
var _ ExitError = CodeExitError{}
func (e CodeExitError) Error() string {
return e.Err.Error()
}
func (e CodeExitError) String() string {
return e.Err.Error()
}
// Exited is to check if the process has finished
func (e CodeExitError) Exited() bool {
return true
}
// ExitStatus is for checking the error code
func (e CodeExitError) ExitStatus() int {
return e.Code
}

1
pkg/util/ioutils/doc.go Normal file
View File

@@ -0,0 +1 @@
package ioutils // import "yunion.io/x/onecloud/pkg/util/ioutils"

View File

@@ -0,0 +1,66 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ioutils
import "io"
// LimitWriter is a copy of the standard library ioutils.LimitReader,
// applied to the writer interface.
// LimitWriter returns a Writer that writes to w
// but stops with EOF after n bytes.
// The underlying implementation is a *LimitedWriter.
func LimitWriter(w io.Writer, n int64) io.Writer { return &LimitedWriter{w, n} }
// A LimitedWriter writes to W but limits the amount of
// data returned to just N bytes. Each call to Write
// updates N to reflect the new amount remaining.
// Write returns EOF when N <= 0 or when the underlying W returns EOF.
type LimitedWriter struct {
W io.Writer // underlying writer
N int64 // max bytes remaining
}
func (l *LimitedWriter) Write(p []byte) (n int, err error) {
if l.N <= 0 {
return 0, io.ErrShortWrite
}
truncated := false
if int64(len(p)) > l.N {
p = p[0:l.N]
truncated = true
}
n, err = l.W.Write(p)
l.N -= int64(n)
if err == nil && truncated {
err = io.ErrShortWrite
}
return
}

1
pkg/util/probe/doc.go Normal file
View File

@@ -0,0 +1 @@
package probe // import "yunion.io/x/onecloud/pkg/util/probe"

View File

@@ -0,0 +1 @@
package exec // import "yunion.io/x/onecloud/pkg/util/probe/exec"

View File

@@ -0,0 +1,59 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exec
import "time"
// NewTimeoutError returns a new TimeoutError.
func NewTimeoutError(err error, timeout time.Duration) *TimeoutError {
return &TimeoutError{
err: err,
timeout: timeout,
}
}
// TimeoutError is an error returned on exec probe timeouts. It should be returned by CRI implementations
// in order for the exec prober to interpret exec timeouts as failed probes.
// TODO: this error type can likely be removed when we support CRI errors.
type TimeoutError struct {
err error
timeout time.Duration
}
// Error returns the error string.
func (t *TimeoutError) Error() string {
return t.err.Error()
}
// Timeout returns the timeout duration of the exec probe.
func (t *TimeoutError) Timeout() time.Duration {
return t.timeout
}

View File

@@ -0,0 +1,93 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exec
import (
"bytes"
"yunion.io/x/log"
"yunion.io/x/onecloud/pkg/util/exec"
"yunion.io/x/onecloud/pkg/util/ioutils"
"yunion.io/x/onecloud/pkg/util/probe"
)
const (
maxReadLength = 10 * 1 << 10 // 10KB
)
// Prober is an interface defining the Probe object for container readiness/liveness checks.
type Prober interface {
Probe(e exec.Cmd) (probe.Result, string, error)
}
// New creates a Prober.
func New() Prober {
return execProber{}
}
type execProber struct{}
// Probe executes a command to check the liveness/readiness of container
// from executing a command. Returns the Result status, command output, and
// errors if any.
func (pr execProber) Probe(e exec.Cmd) (probe.Result, string, error) {
var dataBuffer bytes.Buffer
writer := ioutils.LimitWriter(&dataBuffer, maxReadLength)
e.SetStderr(writer)
e.SetStdout(writer)
err := e.Start()
if err == nil {
err = e.Wait()
}
data := dataBuffer.Bytes()
log.Infof("Exec probe response: %q", string(data))
if err != nil {
exit, ok := err.(exec.ExitError)
if ok {
if exit.ExitStatus() == 0 {
return probe.Success, string(data), nil
}
return probe.Failure, string(data), nil
}
timeoutErr, ok := err.(*TimeoutError)
if ok {
log.Warningf("Exec probe timed out after %s", timeoutErr.Timeout())
return probe.Failure, string(data), nil
}
return probe.Unknown, "", err
}
return probe.Success, string(data), nil
}

View File

@@ -0,0 +1,159 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exec
import (
"fmt"
"io"
"strings"
"testing"
"yunion.io/x/onecloud/pkg/util/probe"
)
type FakeCmd struct {
out []byte
stdout []byte
err error
writer io.Writer
}
func (f *FakeCmd) Run() error {
return nil
}
func (f *FakeCmd) CombinedOutput() ([]byte, error) {
return f.out, f.err
}
func (f *FakeCmd) Output() ([]byte, error) {
return f.stdout, f.err
}
func (f *FakeCmd) SetDir(dir string) {}
func (f *FakeCmd) SetStdin(in io.Reader) {}
func (f *FakeCmd) SetStdout(out io.Writer) {
f.writer = out
}
func (f *FakeCmd) SetStderr(out io.Writer) {
f.writer = out
}
func (f *FakeCmd) SetEnv(env []string) {}
func (f *FakeCmd) Stop() {}
func (f *FakeCmd) Start() error {
if f.writer != nil {
f.writer.Write(f.out)
return f.err
}
return f.err
}
func (f *FakeCmd) Wait() error { return nil }
func (f *FakeCmd) StdoutPipe() (io.ReadCloser, error) {
return nil, nil
}
func (f *FakeCmd) StderrPipe() (io.ReadCloser, error) {
return nil, nil
}
type fakeExitError struct {
exited bool
statusCode int
}
func (f *fakeExitError) String() string {
return f.Error()
}
func (f *fakeExitError) Error() string {
return "fake exit"
}
func (f *fakeExitError) Exited() bool {
return f.exited
}
func (f *fakeExitError) ExitStatus() int {
return f.statusCode
}
func TestExec(t *testing.T) {
prober := New()
tenKilobyte := strings.Repeat("logs-123", 128*10) // 8*128*10=10240 = 10KB of text.
elevenKilobyte := strings.Repeat("logs-123", 8*128*11) // 8*128*11=11264 = 11KB of text.
tests := []struct {
expectedStatus probe.Result
expectError bool
input string
output string
err error
}{
// Ok
{probe.Success, false, "OK", "OK", nil},
// Ok
{probe.Success, false, "OK", "OK", &fakeExitError{true, 0}},
// Ok - truncated output
{probe.Success, false, elevenKilobyte, tenKilobyte, nil},
// Run returns error
{probe.Unknown, true, "", "", fmt.Errorf("test error")},
// Unhealthy
{probe.Failure, false, "Fail", "", &fakeExitError{true, 1}},
}
for i, test := range tests {
fake := FakeCmd{
out: []byte(test.output),
err: test.err,
}
status, output, err := prober.Probe(&fake)
if status != test.expectedStatus {
t.Errorf("[%d] expected %v, got %v", i, test.expectedStatus, status)
}
if err != nil && test.expectError == false {
t.Errorf("[%d] unexpected error: %v", i, err)
}
if err == nil && test.expectError == true {
t.Errorf("[%d] unexpected non-error", i)
}
if test.output != output {
t.Errorf("[%d] expected %s, got %s", i, test.output, output)
}
}
}

49
pkg/util/probe/probe.go Normal file
View File

@@ -0,0 +1,49 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package probe
// Result is a string used to handle the results for probing container readiness/liveness
type Result string
const (
// Success Result
Success Result = "success"
// Warning Result. Logically success, but with additional debugging information attached.
Warning Result = "warning"
// Failure Result
Failure Result = "failure"
// Unknown Result
Unknown Result = "unknown"
)
func (r Result) IsEqual(r2 Result) bool {
return string(r) == string(r2)
}

View File

@@ -0,0 +1 @@
package tcp // import "yunion.io/x/onecloud/pkg/util/probe/tcp"

75
pkg/util/probe/tcp/tcp.go Normal file
View File

@@ -0,0 +1,75 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tcp
import (
"net"
"strconv"
"time"
"yunion.io/x/log"
"yunion.io/x/onecloud/pkg/util/probe"
)
// New creates Prober.
func New() Prober {
return tcpProber{}
}
// Prober is an interface that defines the Probe function for doing TCP readiness/liveness checks.
type Prober interface {
Probe(host string, port int, timeout time.Duration) (probe.Result, string, error)
}
type tcpProber struct{}
// Probe returns a ProbeRunner capable of running an TCP check.
func (pr tcpProber) Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) {
return DoTCPProbe(net.JoinHostPort(host, strconv.Itoa(port)), timeout)
}
// DoTCPProbe checks that a TCP socket to the address can be opened.
// If the socket can be opened, it returns Success
// If the socket fails to open, it returns Failure.
// This is exported because some other packages may want to do direct TCP probes.
func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) {
conn, err := net.DialTimeout("tcp", addr, timeout)
if err != nil {
// Convert errors to failures to handle timeouts.
return probe.Failure, err.Error(), nil
}
err = conn.Close()
if err != nil {
log.Errorf("Unexpected error closing TCP probe socket: %v (%#v)", err, err)
}
return probe.Success, "", nil
}

View File

@@ -0,0 +1,82 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tcp
import (
"net"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"time"
"yunion.io/x/onecloud/pkg/util/probe"
)
func TestTcpHealthChecker(t *testing.T) {
// Setup a test server that responds to probing correctly
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer server.Close()
tHost, tPortStr, err := net.SplitHostPort(server.Listener.Addr().String())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
tPort, err := strconv.Atoi(tPortStr)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
tests := []struct {
host string
port int
expectedStatus probe.Result
expectedError error
}{
// A connection is made and probing would succeed
{tHost, tPort, probe.Success, nil},
// No connection can be made and probing would fail
{tHost, -1, probe.Failure, nil},
}
prober := New()
for i, tt := range tests {
status, _, err := prober.Probe(tt.host, tt.port, 1*time.Second)
if status != tt.expectedStatus {
t.Errorf("#%d: expected status=%v, get=%v", i, tt.expectedStatus, status)
}
if err != tt.expectedError {
t.Errorf("#%d: expected error=%v, get=%v", i, tt.expectedError, err)
}
}
}