From addbcbe0d151e3c8deca7a13613b19e02b971be2 Mon Sep 17 00:00:00 2001 From: Zexi Li Date: Thu, 15 Aug 2024 10:46:39 +0800 Subject: [PATCH] feat(region,host): startup probe of container (#21011) --- cmd/climc/shell/events/events.go | 5 + pkg/apis/compute/container.go | 3 + pkg/apis/compute/guests.go | 7 +- pkg/apis/container.go | 17 ++ pkg/apis/container_probe.go | 116 ++++++++ pkg/compute/models/containers.go | 89 +++++- pkg/compute/models/guest_queries.go | 6 +- pkg/hostman/container/prober/doc.go | 1 + pkg/hostman/container/prober/prober.go | 224 +++++++++++++++ .../container/prober/prober_manager.go | 217 ++++++++++++++ pkg/hostman/container/prober/results/doc.go | 1 + .../prober/results/results_manager.go | 166 +++++++++++ pkg/hostman/container/prober/worker.go | 191 +++++++++++++ pkg/hostman/container/status/doc.go | 1 + .../container/status/status_manager.go | 55 ++++ pkg/hostman/guestman/container/doc.go | 1 + pkg/hostman/guestman/container/runtime.go | 27 ++ pkg/hostman/guestman/guestman.go | 7 + pkg/hostman/guestman/pod.go | 88 +++++- pkg/hostman/hostinfo/container.go | 4 +- pkg/hostman/hostinfo/hostinfo.go | 7 +- pkg/util/exec/doc.go | 1 + pkg/util/exec/exec.go | 266 ++++++++++++++++++ pkg/util/ioutils/doc.go | 1 + pkg/util/ioutils/ioutils.go | 66 +++++ pkg/util/probe/doc.go | 1 + pkg/util/probe/exec/doc.go | 1 + pkg/util/probe/exec/errors.go | 59 ++++ pkg/util/probe/exec/exec.go | 93 ++++++ pkg/util/probe/exec/exec_test.go | 159 +++++++++++ pkg/util/probe/probe.go | 49 ++++ pkg/util/probe/tcp/doc.go | 1 + pkg/util/probe/tcp/tcp.go | 75 +++++ pkg/util/probe/tcp/tcp_test.go | 82 ++++++ 34 files changed, 2068 insertions(+), 19 deletions(-) create mode 100644 pkg/apis/container_probe.go create mode 100644 pkg/hostman/container/prober/doc.go create mode 100644 pkg/hostman/container/prober/prober.go create mode 100644 pkg/hostman/container/prober/prober_manager.go create mode 100644 pkg/hostman/container/prober/results/doc.go create mode 100644 pkg/hostman/container/prober/results/results_manager.go create mode 100644 pkg/hostman/container/prober/worker.go create mode 100644 pkg/hostman/container/status/doc.go create mode 100644 pkg/hostman/container/status/status_manager.go create mode 100644 pkg/hostman/guestman/container/doc.go create mode 100644 pkg/hostman/guestman/container/runtime.go create mode 100644 pkg/util/exec/doc.go create mode 100644 pkg/util/exec/exec.go create mode 100644 pkg/util/ioutils/doc.go create mode 100644 pkg/util/ioutils/ioutils.go create mode 100644 pkg/util/probe/doc.go create mode 100644 pkg/util/probe/exec/doc.go create mode 100644 pkg/util/probe/exec/errors.go create mode 100644 pkg/util/probe/exec/exec.go create mode 100644 pkg/util/probe/exec/exec_test.go create mode 100644 pkg/util/probe/probe.go create mode 100644 pkg/util/probe/tcp/doc.go create mode 100644 pkg/util/probe/tcp/tcp.go create mode 100644 pkg/util/probe/tcp/tcp_test.go diff --git a/cmd/climc/shell/events/events.go b/cmd/climc/shell/events/events.go index a06df41a78..71e3044ba3 100644 --- a/cmd/climc/shell/events/events.go +++ b/cmd/climc/shell/events/events.go @@ -163,6 +163,11 @@ func init() { return doComputeEventList(s, &nargs) }) + R(&TypeEventListOptions{}, "container-event", "Show operation event logs of container", func(s *mcclient.ClientSession, args *TypeEventListOptions) error { + nargs := EventListOptions{BaseEventListOptions: args.BaseEventListOptions, Id: args.ID, Type: []string{"container"}} + return doComputeEventList(s, &nargs) + }) + R(&TypeEventListOptions{}, "disk-event", "Show operation event logs of disk", func(s *mcclient.ClientSession, args *TypeEventListOptions) error { nargs := EventListOptions{BaseEventListOptions: args.BaseEventListOptions, Id: args.ID, Type: []string{"disk"}} return doComputeEventList(s, &nargs) diff --git a/pkg/apis/compute/container.go b/pkg/apis/compute/container.go index 0bd8602c20..9d3e67a3e4 100644 --- a/pkg/apis/compute/container.go +++ b/pkg/apis/compute/container.go @@ -73,6 +73,9 @@ const ( CONTAINER_STATUS_RUNNING = "running" CONTAINER_STATUS_DELETING = "deleting" CONTAINER_STATUS_DELETE_FAILED = "delete_failed" + // for health check + CONTAINER_STATUS_PROBING = "probing" + CONTAINER_STATUS_PROBE_FAILED = "probe_failed" ) const ( diff --git a/pkg/apis/compute/guests.go b/pkg/apis/compute/guests.go index 803c916f8e..e6e1aa1e76 100644 --- a/pkg/apis/compute/guests.go +++ b/pkg/apis/compute/guests.go @@ -282,9 +282,10 @@ type BackupInfo struct { } type PodContainerDesc struct { - Id string `json:"id"` - Name string `json:"name"` - Image string `json:"image"` + Id string `json:"id"` + Name string `json:"name"` + Image string `json:"image"` + Status string `json:"status"` } type Floppy struct { diff --git a/pkg/apis/container.go b/pkg/apis/container.go index 452d959979..4e951b683e 100644 --- a/pkg/apis/container.go +++ b/pkg/apis/container.go @@ -70,6 +70,23 @@ type ContainerSpec struct { SimulateCpu bool `json:"simulate_cpu"` ShmSizeMB int `json:"shm_size_mb"` SecurityContext *ContainerSecurityContext `json:"security_context,omitempty"` + // Periodic probe of container liveness. + // Container will be restarted if the probe fails. + // Cannot be updated. + //LivenessProbe *ContainerProbe `json:"liveness_probe,omitempty"` + // StartupProbe indicates that the Pod has successfully initialized. + // If specified, no other probes are executed until this completes successfully. + StartupProbe *ContainerProbe `json:"startup_probe,omitempty"` +} + +func (c *ContainerSpec) NeedProbe() bool { + //if c.LivenessProbe != nil { + // return true + //} + if c.StartupProbe != nil { + return true + } + return false } type ContainerCapability struct { diff --git a/pkg/apis/container_probe.go b/pkg/apis/container_probe.go new file mode 100644 index 0000000000..0a76fd0024 --- /dev/null +++ b/pkg/apis/container_probe.go @@ -0,0 +1,116 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apis + +// ContainerProbeHandlerExecAction describes a "run in container" action. +type ContainerProbeHandlerExecAction struct { + // Command is the command line to execute inside the container, the working directory for the + // command is root ('/') in the container's filesystem. The command is simply exec'd, it is + // not run inside a shell, so traditional shell instructions ('|', etc) won't work. To use + // a shell, you need to explicitly call out to that shell. + // Exit status of 0 is treated as live/healthy and non-zero is unhealthy. + // +optional + Command []string `json:"command,omitempty"` +} + +// URIScheme identifies the scheme used for connection to a host for Get actions +type URIScheme string + +const ( + // URISchemeHTTP means that the scheme used will be http:// + URISchemeHTTP URIScheme = "HTTP" + // URISchemeHTTPS means that the scheme used will be https:// + URISchemeHTTPS URIScheme = "HTTPS" +) + +// HTTPHeader describes a custom header to be used in HTTP probes +type HTTPHeader struct { + // The header field name + Name string `json:"name"` + // The header field value + Value string `json:"value"` +} + +// ContainerProbeHTTPGetAction describes an action based on HTTP Get requests. +type ContainerProbeHTTPGetAction struct { + // Path to access on the HTTP server. + // +optional + Path string `json:"path,omitempty"` + // Name or number of the port to access on the container. + // Number must be in the range 1 to 65535. + // Name must be an IANA_SVC_NAME. + Port int `json:"port"` + // Host name to connect to, defaults to the pod IP. You probably want to set + // "Host" in httpHeaders instead. + // +optional + Host string `json:"host,omitempty"` + // Scheme to use for connecting to the host. + // Defaults to HTTP. + // +optional + Scheme URIScheme `json:"scheme,omitempty"` + // Custom headers to set in the request. HTTP allows repeated headers. + // +optional + HTTPHeaders []HTTPHeader `json:"httpHeaders,omitempty"` +} + +// ContainerProbeTCPSocketAction describes an action based on opening a socket +type ContainerProbeTCPSocketAction struct { + // Number or name of the port to access on the container. + // Number must be in the range 1 to 65535. + // Name must be an IANA_SVC_NAME. + Port int `json:"port"` + // Optional: Host name to connect to, defaults to the pod IP. + // +optional + Host string `json:"host,omitempty"` +} + +type ContainerProbeType string + +const ( + ContainerProbeTypeLiveness ContainerProbeType = "Liveness" + ContainerProbeTypeReadiness ContainerProbeType = "Readiness" + ContainerProbeTypeStartup ContainerProbeType = "Startup" +) + +// ContainerProbeHandler defines a specific action that should be taken +type ContainerProbeHandler struct { + // One and only one of the following should be specified. + // Exec specifies the action to take. + Exec *ContainerProbeHandlerExecAction `json:"exec,omitempty"` + // HTTPGet specifies the http request to perform. + HTTPGet *ContainerProbeHTTPGetAction `json:"http_get,omitempty"` + // TCPSocket specifies an action involving a TCP port. + TCPSocket *ContainerProbeTCPSocketAction `json:"tcp_socket,omitempty"` +} + +// ContainerProbe describes a health check to be performed against a container to determine whether it is +// alive or ready to receive traffic. +type ContainerProbe struct { + // The action taken to determine the health of a container + ContainerProbeHandler `json:",inline"` + // Number of seconds after the container has started before liveness probes are initiated. + // InitialDelaySeconds int32 `json:"initial_delay_seconds,omitempty"` + // Number of seconds after which the probe times out. + TimeoutSeconds int32 `json:"timeout_seconds,omitempty"` + // How often (in seconds) to perform the probe. + // Default to 10 seconds. Minimum value is 1. + PeriodSeconds int32 `json:"period_seconds,omitempty"` + // Minimum consecutive successes for the probe to be considered successful after having failed. + // Defaults to 1. Must be 1 for liveness and startup. Minimum value is 1. + SuccessThreshold int32 `json:"success_threshold,omitempty"` + // Minimum consecutive failures for the probe to be considered failed after having succeeded. + // Defaults to 3. Minimum value is 1. + FailureThreshold int32 `json:"failure_threshold,omitempty"` +} diff --git a/pkg/compute/models/containers.go b/pkg/compute/models/containers.go index d91455fbde..158b3ac9b3 100644 --- a/pkg/compute/models/containers.go +++ b/pkg/compute/models/containers.go @@ -164,6 +164,10 @@ func (m *SContainerManager) ValidateSpec(ctx context.Context, userCred mcclient. return httperrors.NewInputParameterError("/dev/shm size is small than 64MB") } + if err := m.ValidateSpecProbe(ctx, userCred, spec); err != nil { + return errors.Wrap(err, "validate probe configuration") + } + return nil } @@ -248,6 +252,81 @@ func (c *SContainer) CustomizeCreate(ctx context.Context, userCred mcclient.Toke return nil }*/ +func (m *SContainerManager) ValidateSpecProbe(ctx context.Context, userCred mcclient.TokenCredential, spec *api.ContainerSpec) error { + //if err := m.validateSpecProbe(ctx, userCred, spec.LivenessProbe); err != nil { + // return errors.Wrap(err, "validate liveness probe") + //} + if err := m.validateSpecProbe(ctx, userCred, spec.StartupProbe); err != nil { + return errors.Wrap(err, "validate startup probe") + } + return nil +} + +func (m *SContainerManager) validateSpecProbe(ctx context.Context, userCred mcclient.TokenCredential, probe *apis.ContainerProbe) error { + if probe == nil { + return nil + } + if err := m.validateSpecProbeHandler(probe.ContainerProbeHandler); err != nil { + return errors.Wrap(err, "validate container probe handler") + } + for key, val := range map[string]int32{ + //"initial_delay_seconds": probe.InitialDelaySeconds, + "timeout_seconds": probe.TimeoutSeconds, + "period_seconds": probe.PeriodSeconds, + "success_threshold": probe.SuccessThreshold, + "failure_threshold": probe.FailureThreshold, + } { + if val < 0 { + return httperrors.NewInputParameterError(key + " is negative") + } + } + + //if probe.InitialDelaySeconds == 0 { + // probe.InitialDelaySeconds = 5 + //} + if probe.TimeoutSeconds == 0 { + probe.TimeoutSeconds = 3 + } + if probe.PeriodSeconds == 0 { + probe.PeriodSeconds = 10 + } + if probe.SuccessThreshold == 0 { + probe.SuccessThreshold = 1 + } + if probe.FailureThreshold == 0 { + probe.FailureThreshold = 3 + } + return nil +} + +func (m *SContainerManager) validateSpecProbeHandler(probe apis.ContainerProbeHandler) error { + isAllNil := true + if probe.Exec != nil { + isAllNil = false + if len(probe.Exec.Command) == 0 { + return httperrors.NewInputParameterError("exec command is required") + } + } + if probe.TCPSocket != nil { + isAllNil = false + port := probe.TCPSocket.Port + if port < 1 || port > 65535 { + return httperrors.NewInputParameterError("invalid tcp socket port: %d, must between [1,65535]", port) + } + } + if probe.HTTPGet != nil { + isAllNil = false + port := probe.HTTPGet.Port + if port < 1 || port > 65535 { + return httperrors.NewInputParameterError("invalid http port: %d, must between [1,65535]", port) + } + } + if isAllNil { + return httperrors.NewInputParameterError("one of [exec, http_get, tcp_socket] is required") + } + return nil +} + func (c *SContainer) PostCreate(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data jsonutils.JSONObject) { c.SVirtualResourceBase.PostCreate(ctx, userCred, ownerId, query, data) if !jsonutils.QueryBoolean(data, "skip_task", false) { @@ -581,16 +660,16 @@ func NewContainerReleasedDevice(device *api.ContainerDevice, devType, devModel s } } -func (s *SContainer) SaveReleasedDevices(ctx context.Context, userCred mcclient.TokenCredential, devs map[string]ContainerReleasedDevice) error { - return s.SetMetadata(ctx, api.CONTAINER_METADATA_RELEASED_DEVICES, devs, userCred) +func (c *SContainer) SaveReleasedDevices(ctx context.Context, userCred mcclient.TokenCredential, devs map[string]ContainerReleasedDevice) error { + return c.SetMetadata(ctx, api.CONTAINER_METADATA_RELEASED_DEVICES, devs, userCred) } -func (s *SContainer) GetReleasedDevices(ctx context.Context, userCred mcclient.TokenCredential) (map[string]ContainerReleasedDevice, error) { +func (c *SContainer) GetReleasedDevices(ctx context.Context, userCred mcclient.TokenCredential) (map[string]ContainerReleasedDevice, error) { out := make(map[string]ContainerReleasedDevice, 0) - if ret := s.GetMetadata(ctx, api.CONTAINER_METADATA_RELEASED_DEVICES, userCred); ret == "" { + if ret := c.GetMetadata(ctx, api.CONTAINER_METADATA_RELEASED_DEVICES, userCred); ret == "" { return out, nil } - obj := s.GetMetadataJson(ctx, api.CONTAINER_METADATA_RELEASED_DEVICES, userCred) + obj := c.GetMetadataJson(ctx, api.CONTAINER_METADATA_RELEASED_DEVICES, userCred) if obj == nil { return nil, errors.Error("get metadata released devices") } diff --git a/pkg/compute/models/guest_queries.go b/pkg/compute/models/guest_queries.go index b22d9c65a1..6b292053a4 100644 --- a/pkg/compute/models/guest_queries.go +++ b/pkg/compute/models/guest_queries.go @@ -794,7 +794,11 @@ func fetchContainers(guestIds []string) (map[string][]*api.PodContainerDesc, err if !ok { ret[container.GuestId] = []*api.PodContainerDesc{} } - desc := &api.PodContainerDesc{Id: container.GetId(), Name: container.GetName()} + desc := &api.PodContainerDesc{ + Id: container.GetId(), + Name: container.GetName(), + Status: container.Status, + } if container.Spec != nil { desc.Image = container.Spec.Image } diff --git a/pkg/hostman/container/prober/doc.go b/pkg/hostman/container/prober/doc.go new file mode 100644 index 0000000000..625135c7c8 --- /dev/null +++ b/pkg/hostman/container/prober/doc.go @@ -0,0 +1 @@ +package prober // import "yunion.io/x/onecloud/pkg/hostman/container/prober" diff --git a/pkg/hostman/container/prober/prober.go b/pkg/hostman/container/prober/prober.go new file mode 100644 index 0000000000..cbdfc2e719 --- /dev/null +++ b/pkg/hostman/container/prober/prober.go @@ -0,0 +1,224 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prober + +import ( + "fmt" + "io" + "time" + + "yunion.io/x/log" + "yunion.io/x/pkg/errors" + + "yunion.io/x/onecloud/pkg/apis" + hostapi "yunion.io/x/onecloud/pkg/apis/host" + "yunion.io/x/onecloud/pkg/hostman/container/prober/results" + "yunion.io/x/onecloud/pkg/hostman/guestman/container" + "yunion.io/x/onecloud/pkg/hostman/guestman/desc" + "yunion.io/x/onecloud/pkg/util/exec" + "yunion.io/x/onecloud/pkg/util/probe" + execprobe "yunion.io/x/onecloud/pkg/util/probe/exec" + tcpprobe "yunion.io/x/onecloud/pkg/util/probe/tcp" +) + +const maxProbeRetries = 3 + +// Prober helps to check the liveness of a container. +type prober struct { + exec execprobe.Prober + tcp tcpprobe.Prober + runner container.CommandRunner +} + +func newProber(runner container.CommandRunner) *prober { + return &prober{ + exec: execprobe.New(), + tcp: tcpprobe.New(), + runner: runner, + } +} + +// probe probes the container. +func (pb *prober) probe(probeType apis.ContainerProbeType, pod *desc.SGuestDesc, container *hostapi.ContainerDesc) (results.ProbeResult, error) { + var probeSpec *apis.ContainerProbe + switch probeType { + //case apis.ContainerProbeTypeLiveness: + // probeSpec = container.Spec.LivenessProbe + case apis.ContainerProbeTypeStartup: + probeSpec = container.Spec.StartupProbe + default: + err := errors.Errorf("unknown probe type: %q", probeType) + return results.NewFailure(err.Error()), err + } + + ctrName := fmt.Sprintf("%s:%s", pod.Name, container.Name) + if probeSpec == nil { + log.Warningf("%s probe for %s is nil", probeType, ctrName) + return results.NewSuccess("probe is not defined"), nil + } + + result, output, err := pb.runProbeWithRetries(probeType, probeSpec, pod, container, maxProbeRetries) + var msg string + if err != nil || (result != probe.Success && result != probe.Warning) { + // Probe failed in one way or another + if err != nil { + msg = fmt.Sprintf("%s probe for %q errored: %v", probeType, ctrName, err) + log.Debugf(msg) + } else { + // result != probe.Success + msg = fmt.Sprintf("%s probe for %q failed (%v): %s", probeType, ctrName, result, output) + log.Debugf(msg) + } + return results.NewFailure(msg), err + } + if result == probe.Warning { + msg = fmt.Sprintf("%s probe for %q succeeded with a warning: %s", probeType, ctrName, output) + log.Infof(msg) + } else { + msg = fmt.Sprintf("%s probe for %q succeeded", probeType, ctrName) + log.Debugf(msg) + } + return results.NewSuccess(msg), nil +} + +// runProbeWithRetries tries to probe the container in a finite loop, it returns the last result +// if it never succeeds. +func (pb *prober) runProbeWithRetries(probeType apis.ContainerProbeType, p *apis.ContainerProbe, pod *desc.SGuestDesc, container *hostapi.ContainerDesc, retries int) (probe.Result, string, error) { + var err error + var result probe.Result + var output string + for i := 0; i < retries; i++ { + result, output, err = pb.runProbe(probeType, p, pod, container) + if err == nil { + return result, output, nil + } + } + return result, output, err +} + +func (pb *prober) runProbe(probeType apis.ContainerProbeType, p *apis.ContainerProbe, pod *desc.SGuestDesc, container *hostapi.ContainerDesc) (probe.Result, string, error) { + timeout := time.Duration(p.TimeoutSeconds) * time.Second + if p.Exec != nil { + log.Debugf("Exec-Probe Pod: %v, Container: %v, Command: %v", pod.Name, container.Name, p.Exec.Command) + return pb.exec.Probe(pb.newExecInContainer(pod, container, p.Exec.Command, timeout)) + } + if p.TCPSocket != nil { + port := p.TCPSocket.Port + host := p.TCPSocket.Host + if host == "" { + for _, nic := range pod.Nics { + if nic.Ip != "" { + host = nic.Ip + break + } + } + if host == "" { + return probe.Unknown, "", errors.Errorf("not found guest ip") + } + } + log.Debugf("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout) + return pb.tcp.Probe(host, port, timeout) + } + errMsg := fmt.Sprintf("Failed to find probe builder for pod %v, container: %v", pod.Name, container.Name) + log.Warningf(errMsg) + return probe.Unknown, "", errors.Error(errMsg) +} + +type execInContainer struct { + // run executes a command in a container. Combined stdout and stderr output is always returned. An + // error is returned if one occurred. + run func() ([]byte, error) + writer io.Writer +} + +func (pb *prober) newExecInContainer(pod *desc.SGuestDesc, container *hostapi.ContainerDesc, cmd []string, timeout time.Duration) exec.Cmd { + return &execInContainer{ + run: func() ([]byte, error) { + return pb.runner.RunInContainer(pod, container.Id, cmd, timeout) + }, + } +} + +func (eic *execInContainer) Run() error { + return nil +} + +func (eic *execInContainer) CombinedOutput() ([]byte, error) { + return eic.run() +} + +func (eic *execInContainer) Output() ([]byte, error) { + return nil, fmt.Errorf("unimplemented") +} + +func (eic *execInContainer) SetDir(dir string) { + //unimplemented +} + +func (eic *execInContainer) SetStdin(in io.Reader) { + //unimplemented +} + +func (eic *execInContainer) SetStdout(out io.Writer) { + eic.writer = out +} + +func (eic *execInContainer) SetStderr(out io.Writer) { + eic.writer = out +} + +func (eic *execInContainer) SetEnv(env []string) { + //unimplemented +} + +func (eic *execInContainer) Stop() { + //unimplemented +} + +func (eic *execInContainer) Start() error { + data, err := eic.run() + if eic.writer != nil { + eic.writer.Write(data) + } + return err +} + +func (eic *execInContainer) Wait() error { + return nil +} + +func (eic *execInContainer) StdoutPipe() (io.ReadCloser, error) { + return nil, fmt.Errorf("unimplemented") +} + +func (eic *execInContainer) StderrPipe() (io.ReadCloser, error) { + return nil, fmt.Errorf("unimplemented") +} diff --git a/pkg/hostman/container/prober/prober_manager.go b/pkg/hostman/container/prober/prober_manager.go new file mode 100644 index 0000000000..869ff28131 --- /dev/null +++ b/pkg/hostman/container/prober/prober_manager.go @@ -0,0 +1,217 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prober + +import ( + "sync" + + "yunion.io/x/log" + "yunion.io/x/pkg/util/sets" + "yunion.io/x/pkg/util/wait" + + "yunion.io/x/onecloud/pkg/apis" + "yunion.io/x/onecloud/pkg/hostman/container/prober/results" + "yunion.io/x/onecloud/pkg/hostman/container/status" + "yunion.io/x/onecloud/pkg/hostman/guestman/container" + "yunion.io/x/onecloud/pkg/hostman/guestman/desc" +) + +// Key uniquely identifying container probes +type probeKey struct { + podUid string + containerName string + probeType apis.ContainerProbeType +} + +// Manager manages pod probing. It creates a probe "worker" for every container that specifies a +// probe (AddPod). The worker periodically probes its assigned container and caches the results. The +// manager use the cached probe results to set the appropriate Ready state in the PodStatus when +// requested (UpdatePodStatus). Updating probe parameters is not currently supported. +// TODO: Move liveness probing out of the runtime, to here. +type Manager interface { + // AddPod creates new probe workers for every container probe. This should be called for every + // pod created. + AddPod(pod *desc.SGuestDesc) + + // RemovePod handles cleaning up the removed pod state, including terminating probe workers and + // deleting cached results. + RemovePod(pod *desc.SGuestDesc) + + // CleanupPods handles cleaning up pods which should no longer be running. + // It takes a map of "desired pods" which should not be cleaned up. + CleanupPods(desiredPods map[string]sets.Empty) + + // UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each + // container based on container running status, cached probe results and worker states. + UpdatePodStatus(podId string) + + // Start starts the Manager sync loops. + Start() +} + +type manager struct { + // Map of active workers for probes + workers map[probeKey]*worker + // Lock for accessing & mutating workers + workerLock sync.RWMutex + + statusManager status.Manager + + // readinessManager manages the results of readiness probes + // readinessManager results.Manager + + // livenessManager manages the results of liveness probes + livenessManager results.Manager + + // startupManager manages the results of startup probes + startupManager results.Manager + + // prober executes the probe actions + prober *prober +} + +func NewManager( + statusManager status.Manager, + livenessManager results.Manager, + startupManager results.Manager, + runner container.CommandRunner) Manager { + prober := newProber(runner) + return &manager{ + statusManager: statusManager, + prober: prober, + livenessManager: livenessManager, + startupManager: startupManager, + workers: make(map[probeKey]*worker), + workerLock: sync.RWMutex{}, + } +} + +// Start syncing probe status. This should only be called once. +func (m *manager) Start() { + // start syncing readiness. + //go wait.Forever(m.updateReadiness, 0) + // start syncing startup. + go wait.Forever(m.updateStartup, 0) +} + +func (m *manager) AddPod(pod *desc.SGuestDesc) { + m.workerLock.Lock() + defer m.workerLock.Unlock() + + key := probeKey{podUid: pod.Uuid} + for _, c := range pod.Containers { + key.containerName = c.Name + if c.Spec.StartupProbe != nil { + key.probeType = apis.ContainerProbeTypeStartup + if _, ok := m.workers[key]; ok { + log.Errorf("Startup probe already exists: %s:%s", pod.Name, c.Name) + return + } + w := newWorker(m, key.probeType, pod, c) + m.workers[key] = w + go w.run() + } + + /*if c.Spec.LivenessProbe != nil { + key.probeType = apis.ContainerProbeTypeLiveness + if _, ok := m.workers[key]; ok { + log.Errorf("Liveness probe already exists: %s:%s", pod.Name, c.Name) + return + } + w := newWorker(m, key.probeType, pod, c) + m.workers[key] = w + go w.run() + }*/ + } +} + +func (m *manager) RemovePod(pod *desc.SGuestDesc) { + m.workerLock.RLock() + defer m.workerLock.RUnlock() + + key := probeKey{podUid: pod.Uuid} + for _, c := range pod.Containers { + key.containerName = c.Name + for _, probeType := range []apis.ContainerProbeType{apis.ContainerProbeTypeLiveness, apis.ContainerProbeTypeReadiness, apis.ContainerProbeTypeStartup} { + key.probeType = probeType + if worker, ok := m.workers[key]; ok { + worker.stop() + } + } + } +} + +func (m *manager) CleanupPods(desiredPods map[string]sets.Empty) { + m.workerLock.RLock() + defer m.workerLock.RUnlock() + + for key, worker := range m.workers { + if _, ok := desiredPods[key.podUid]; !ok { + worker.stop() + } + } +} + +func (m *manager) UpdatePodStatus(status string) {} + +func (m *manager) getWorker(podId string, containerName string, probeType apis.ContainerProbeType) (*worker, bool) { + m.workerLock.RLock() + defer m.workerLock.RUnlock() + worker, ok := m.workers[probeKey{podId, containerName, probeType}] + return worker, ok +} + +// Called by the worker after exiting +func (m *manager) removeWorker(podId string, containerName string, probeType apis.ContainerProbeType) { + m.workerLock.Lock() + defer m.workerLock.Unlock() + delete(m.workers, probeKey{podUid: podId, containerName: containerName, probeType: probeType}) +} + +func (m *manager) workerCount() int { + m.workerLock.RLock() + defer m.workerLock.RUnlock() + return len(m.workers) +} + +/*func (m *manager) updateReadiness() { + update := <-m.readinessManager.Updates() + + ready := update.Result == results.Success + m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready) +}*/ + +func (m *manager) updateStartup() { + update := <-m.startupManager.Updates() + + started := update.Result.Result == results.Success + m.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started, update.Result) +} diff --git a/pkg/hostman/container/prober/results/doc.go b/pkg/hostman/container/prober/results/doc.go new file mode 100644 index 0000000000..f8d949c0a9 --- /dev/null +++ b/pkg/hostman/container/prober/results/doc.go @@ -0,0 +1 @@ +package results // import "yunion.io/x/onecloud/pkg/hostman/container/prober/results" diff --git a/pkg/hostman/container/prober/results/results_manager.go b/pkg/hostman/container/prober/results/results_manager.go new file mode 100644 index 0000000000..4bd8fd1b64 --- /dev/null +++ b/pkg/hostman/container/prober/results/results_manager.go @@ -0,0 +1,166 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package results + +import ( + "fmt" + "sync" + + "yunion.io/x/onecloud/pkg/hostman/guestman/desc" +) + +func NewFailure(reason string) ProbeResult { + return newProbeResult(Failure, reason) +} + +func NewSuccess(reason string) ProbeResult { + return newProbeResult(Success, reason) +} + +func NewUnknown(reason string) ProbeResult { + return newProbeResult(Unknown, reason) +} + +func newProbeResult(r Result, reason string) ProbeResult { + return ProbeResult{ + Result: r, + Reason: reason, + } +} + +type ProbeResult struct { + Result + Reason string +} + +func (pr ProbeResult) String() string { + return fmt.Sprintf("%s: %s", pr.Result.String(), pr.Reason) +} + +// Result is the type for probe results. +type Result int + +const ( + // Unknown is encoded as -1 (type Result) + Unknown Result = iota - 1 + + // Success is encoded as 0 (type Result) + Success + + // Failure is encoded as 1 (type Result) + Failure +) + +func (r Result) String() string { + switch r { + case Success: + return "Success" + case Failure: + return "Failure" + default: + return "UNKNOWN" + } +} + +// Update is an enum of the types of updates sent over the Updates channel. +type Update struct { + ContainerID string + Result ProbeResult + PodUID string +} + +// Manager provides a probe results cache and channel of updates +type Manager interface { + // Get returns the cached result for the container with the given ID. + Get(containerId string) (Result, bool) + // Set sets the cached result for the container with the given ID. + // The pod is only included to be sent with the update. + Set(containerId string, result ProbeResult, pod *desc.SGuestDesc) + // Remove clears the cached result for the container with the given ID. + Remove(containerId string) + // Updates creates a channel that receives an Update whenever its result changes (but not + // removed). + // NOTE: The current implementation only supports a single updates channel. + Updates() <-chan Update +} + +var _ Manager = &manager{} + +type manager struct { + // guards the cache + sync.RWMutex + // map of container ID -> probe Result + cache map[string]Result + // channel of updates + updates chan Update +} + +func NewManager() Manager { + return &manager{ + cache: make(map[string]Result), + updates: make(chan Update, 20), + } +} + +func (m *manager) Get(id string) (Result, bool) { + m.RLock() + defer m.RUnlock() + result, found := m.cache[id] + return result, found +} + +func (m *manager) Set(id string, result ProbeResult, pod *desc.SGuestDesc) { + if m.setInternal(id, result) { + m.updates <- Update{ContainerID: id, Result: result, PodUID: pod.Uuid} + } +} + +// Internal helper for locked portion of set. Returns whether an update should be sent. +func (m *manager) setInternal(id string, result ProbeResult) bool { + m.Lock() + defer m.Unlock() + prev, exists := m.cache[id] + if !exists || prev != result.Result { + m.cache[id] = result.Result + return true + } + return false +} + +func (m *manager) Remove(id string) { + m.Lock() + defer m.Unlock() + delete(m.cache, id) +} + +func (m *manager) Updates() <-chan Update { + return m.updates +} diff --git a/pkg/hostman/container/prober/worker.go b/pkg/hostman/container/prober/worker.go new file mode 100644 index 0000000000..407886eabd --- /dev/null +++ b/pkg/hostman/container/prober/worker.go @@ -0,0 +1,191 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prober + +import ( + "math/rand" + "time" + + "yunion.io/x/pkg/util/runtime" + + "yunion.io/x/onecloud/pkg/apis" + hostapi "yunion.io/x/onecloud/pkg/apis/host" + "yunion.io/x/onecloud/pkg/hostman/container/prober/results" + "yunion.io/x/onecloud/pkg/hostman/guestman/desc" +) + +// worker handles the periodic probing of its assigned container. Each worker has a go-routine +// associated with it which runs the probe loop until the container permanently terminates, or the +// stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date +// container IDs. +type worker struct { + // Channel for stopping the probe. + stopCh chan struct{} + + // The pod containing this probe (read-only) + pod *desc.SGuestDesc + + // The container to probe (read-only) + container *hostapi.ContainerDesc + + // Describes the probe configuration (read-only) + spec *apis.ContainerProbe + + // The type of the worker. + probeType apis.ContainerProbeType + + // The probe value during the initial delay. + initialValue results.Result + + // Where to store this workers results. + resultsManager results.Manager + probeManager *manager + + // The last known container ID for this worker. + containerId string + // The last probe result for this worker. + lastResult results.Result + // How many times in a row the probe has returned the same result. + resultRun int + + // If set, skip probing + onHold bool +} + +// Creates and starts a new probe worker. +func newWorker( + m *manager, + probeType apis.ContainerProbeType, + pod *desc.SGuestDesc, + container *hostapi.ContainerDesc) *worker { + w := &worker{ + stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking. + pod: pod, + container: container, + probeType: probeType, + probeManager: m, + containerId: container.Id, + } + + switch probeType { + //case apis.ContainerProbeTypeLiveness: + // w.spec = container.Spec.LivenessProbe + // w.resultsManager = m.livenessManager + // w.initialValue = results.Success + case apis.ContainerProbeTypeStartup: + w.spec = container.Spec.StartupProbe + w.resultsManager = m.startupManager + w.initialValue = results.Unknown + } + + return w +} + +// run periodically probes the container. +func (w *worker) run() { + probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second + + // If host restarted the probes could be started in rapid succession. + // Let the worker wait for a random portion of tickerPeriod before probing. + time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod))) + + probeTicker := time.NewTicker(probeTickerPeriod) + + defer func() { + // Clean up. + probeTicker.Stop() + if len(w.containerId) != 0 { + w.resultsManager.Remove(w.containerId) + } + + w.probeManager.removeWorker(w.pod.Uuid, w.container.Name, w.probeType) + }() + +probeLoop: + for w.doProbe() { + // Wait for next probe tick. + select { + case <-w.stopCh: + break probeLoop + case <-probeTicker.C: + // continue + } + } +} + +// stop stops the probe worker. The worker handles cleanup and removes itself from its manager. +// It is safe to call stop multiple times. +func (w *worker) stop() { + select { + case w.stopCh <- struct{}{}: + default: // Non-blocking. + } +} + +// doProbe probes the container once and records the result. +// Returns whether the worker should continue. +func (w *worker) doProbe() (keepGoing bool) { + // Actually eat panics (HandleCrash takes care of logging) + defer func() { recover() }() + defer runtime.HandleCrash(func(_ interface{}) { + keepGoing = true + }) + + result, err := w.probeManager.prober.probe(w.probeType, w.pod, w.container) + if err != nil { + // prober error, throw away the result. + return true + } + + if w.lastResult == result.Result { + w.resultRun++ + } else { + w.lastResult = result.Result + w.resultRun = 1 + } + + if (result.Result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) || + (result.Result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) { + return true + } + w.resultsManager.Set(w.containerId, result, w.pod) + + if (w.probeType == apis.ContainerProbeTypeLiveness || w.probeType == apis.ContainerProbeTypeStartup) && result.Result == results.Failure { + // The container fails a liveness/startup check, it will need to be restarted. + // Stop probing until we see a new container ID. This is to reduce the + // chance of hitting #21751, where running `docker exec` when a + // container is being stopped may lead to corrupted container state. + w.onHold = true + w.resultRun = 0 + } + + return true +} diff --git a/pkg/hostman/container/status/doc.go b/pkg/hostman/container/status/doc.go new file mode 100644 index 0000000000..5806d047b9 --- /dev/null +++ b/pkg/hostman/container/status/doc.go @@ -0,0 +1 @@ +package status // import "yunion.io/x/onecloud/pkg/hostman/container/status" diff --git a/pkg/hostman/container/status/status_manager.go b/pkg/hostman/container/status/status_manager.go new file mode 100644 index 0000000000..bdc06cc8ca --- /dev/null +++ b/pkg/hostman/container/status/status_manager.go @@ -0,0 +1,55 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package status + +import ( + "context" + + "yunion.io/x/jsonutils" + "yunion.io/x/log" + + "yunion.io/x/onecloud/pkg/apis" + computeapi "yunion.io/x/onecloud/pkg/apis/compute" + "yunion.io/x/onecloud/pkg/hostman/container/prober/results" + "yunion.io/x/onecloud/pkg/hostman/hostutils" + computemodules "yunion.io/x/onecloud/pkg/mcclient/modules/compute" +) + +type Manager interface { + // SetContainerStartup updates the container status with the given startup + // and triggers a status update. + SetContainerStartup(podId string, containerId string, started bool, result results.ProbeResult) +} + +type manager struct{} + +func NewManager() Manager { + return &manager{} +} + +func (m *manager) SetContainerStartup(podId string, containerId string, started bool, result results.ProbeResult) { + s := hostutils.GetComputeSession(context.Background()) + status := computeapi.CONTAINER_STATUS_PROBE_FAILED + if started { + status = computeapi.CONTAINER_STATUS_RUNNING + } + input := apis.PerformStatusInput{ + Status: status, + Reason: result.Reason, + } + if _, err := computemodules.Containers.PerformAction(s, containerId, "status", jsonutils.Marshal(input)); err != nil { + log.Errorf("set container(%s/%s) status failed: %s", podId, containerId, err) + } +} diff --git a/pkg/hostman/guestman/container/doc.go b/pkg/hostman/guestman/container/doc.go new file mode 100644 index 0000000000..3b42baae77 --- /dev/null +++ b/pkg/hostman/guestman/container/doc.go @@ -0,0 +1 @@ +package container // import "yunion.io/x/onecloud/pkg/hostman/guestman/container" diff --git a/pkg/hostman/guestman/container/runtime.go b/pkg/hostman/guestman/container/runtime.go new file mode 100644 index 0000000000..98cc8cf814 --- /dev/null +++ b/pkg/hostman/guestman/container/runtime.go @@ -0,0 +1,27 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package container + +import ( + "time" + + "yunion.io/x/onecloud/pkg/hostman/guestman/desc" +) + +// CommandRunner interface allows to run command in a container. +type CommandRunner interface { + // RunInContainer synchronously executes the command in the container, and returns the output. + RunInContainer(pod *desc.SGuestDesc, containerId string, cmd []string, timeout time.Duration) ([]byte, error) +} diff --git a/pkg/hostman/guestman/guestman.go b/pkg/hostman/guestman/guestman.go index 0bf37d0f4d..61d31ab513 100644 --- a/pkg/hostman/guestman/guestman.go +++ b/pkg/hostman/guestman/guestman.go @@ -39,6 +39,7 @@ import ( "yunion.io/x/onecloud/pkg/apis/compute" hostapi "yunion.io/x/onecloud/pkg/apis/host" "yunion.io/x/onecloud/pkg/appsrv" + "yunion.io/x/onecloud/pkg/hostman/container/prober" "yunion.io/x/onecloud/pkg/hostman/guestman/arch" "yunion.io/x/onecloud/pkg/hostman/guestman/desc" fwd "yunion.io/x/onecloud/pkg/hostman/guestman/forwarder" @@ -106,6 +107,9 @@ type SGuestManager struct { numaAllocate bool cpuSet *CpuSetCounter pythonPath string + + // container related members + containerProbeManager prober.Manager } func NewGuestManager(host hostutils.IHost, serversPath string) (*SGuestManager, error) { @@ -128,6 +132,9 @@ func NewGuestManager(host hostutils.IHost, serversPath string) (*SGuestManager, if err != nil { return nil, errors.Wrap(err, "mkdir qemu log dir") } + if manager.host.IsContainerHost() { + manager.startContainerProbeManager() + } return manager, nil } diff --git a/pkg/hostman/guestman/pod.go b/pkg/hostman/guestman/pod.go index ddb3b8b4ad..b4f9d5a89a 100644 --- a/pkg/hostman/guestman/pod.go +++ b/pkg/hostman/guestman/pod.go @@ -38,6 +38,9 @@ import ( hostapi "yunion.io/x/onecloud/pkg/apis/host" "yunion.io/x/onecloud/pkg/hostman/container/device" "yunion.io/x/onecloud/pkg/hostman/container/lifecycle" + "yunion.io/x/onecloud/pkg/hostman/container/prober" + proberesults "yunion.io/x/onecloud/pkg/hostman/container/prober/results" + "yunion.io/x/onecloud/pkg/hostman/container/status" "yunion.io/x/onecloud/pkg/hostman/container/volume_mount" "yunion.io/x/onecloud/pkg/hostman/guestman/desc" deployapi "yunion.io/x/onecloud/pkg/hostman/hostdeployer/apis" @@ -59,6 +62,48 @@ import ( "yunion.io/x/onecloud/pkg/util/procutils" ) +func (m *SGuestManager) startContainerProbeManager() { + livenessManager := proberesults.NewManager() + startupManager := proberesults.NewManager() + man := prober.NewManager(status.NewManager(), livenessManager, startupManager, newContainerRunner(m)) + m.containerProbeManager = man + man.Start() +} + +func (m *SGuestManager) GetContainerProbeManager() prober.Manager { + return m.containerProbeManager +} + +func newContainerRunner(man *SGuestManager) *containerRunner { + return &containerRunner{man} +} + +type containerRunner struct { + manager *SGuestManager +} + +func (cr *containerRunner) RunInContainer(pod *desc.SGuestDesc, containerId string, cmd []string, timeout time.Duration) ([]byte, error) { + srv, ok := cr.manager.GetServer(pod.Uuid) + if !ok { + return nil, errors.Wrapf(httperrors.ErrNotFound, "server %s not found", pod.Uuid) + } + s := srv.(*sPodGuestInstance) + ctrCriId, err := s.getContainerCRIId(containerId) + if err != nil { + return nil, errors.Wrap(err, "get container cri id") + } + cli := s.getCRI().GetRuntimeClient() + resp, err := cli.ExecSync(context.Background(), &runtimeapi.ExecSyncRequest{ + ContainerId: ctrCriId, + Cmd: cmd, + Timeout: int64(timeout), + }) + if err != nil { + return nil, errors.Wrapf(err, "exec sync %#v to %s", cmd, ctrCriId) + } + return append(resp.Stdout, resp.Stderr...), nil +} + type PodInstance interface { GuestRuntimeInstance @@ -122,6 +167,7 @@ func (s *sPodGuestInstance) ImportServer(pendingDelete bool) { s.manager.SaveServer(s.Id, s) s.manager.RemoveCandidateServer(s) s.SyncStatus("sync status after host started") + s.getProbeManager().AddPod(s.Desc) } func (s *sPodGuestInstance) SyncStatus(reason string) { @@ -196,6 +242,10 @@ func (s *sPodGuestInstance) getCRI() pod.CRI { return s.manager.GetCRI() } +func (s *sPodGuestInstance) getProbeManager() prober.Manager { + return s.manager.GetContainerProbeManager() +} + func (s *sPodGuestInstance) getHostCPUMap() *pod.HostContainerCPUMap { return s.manager.GetContainerCPUMap() } @@ -317,6 +367,21 @@ func (s *sPodGuestInstance) umountPodVolumes() error { return nil } +func (s *sPodGuestInstance) GetContainers() []*hostapi.ContainerDesc { + return s.GetDesc().Containers +} + +func (s *sPodGuestInstance) GetContainerById(ctrId string) *hostapi.ContainerDesc { + ctrs := s.GetContainers() + for i := range ctrs { + ctr := ctrs[i] + if ctr.Id == ctrId { + return ctr + } + } + return nil +} + func (s *sPodGuestInstance) getContainerVolumeMounts() map[string][]*hostapi.ContainerVolumeMount { result := make(map[string][]*hostapi.ContainerVolumeMount, 0) for _, ctr := range s.GetDesc().Containers { @@ -503,6 +568,8 @@ func (s *sPodGuestInstance) startPod(ctx context.Context, userCred mcclient.Toke if err := s.setPodCgroupResources(criId, s.GetDesc().Mem, s.GetDesc().Cpu); err != nil { return nil, errors.Wrapf(err, "set pod %s cgroup memMB %d, cpu %d", criId, s.GetDesc().Mem, s.GetDesc().Cpu) } + + s.getProbeManager().AddPod(s.Desc) return &computeapi.PodStartResponse{ CRIId: criId, IsRunning: false, @@ -543,6 +610,8 @@ func (s *sPodGuestInstance) ensurePodRemoved(ctx context.Context, timeout int64) return errors.Wrapf(err, "remove cri pod: %s", p.GetId()) } } + + s.getProbeManager().RemovePod(s.Desc) return nil } @@ -605,8 +674,12 @@ func (s *sPodGuestInstance) SyncConfig(ctx context.Context, guestDesc *desc.SGue return nil, nil } +func (s *sPodGuestInstance) getContainerMeta(id string) *sContainer { + return s.containers[id] +} + func (s *sPodGuestInstance) getContainerCRIId(ctrId string) (string, error) { - ctr := s.getContainer(ctrId) + ctr := s.getContainerMeta(ctrId) if ctr == nil { return "", errors.Wrapf(errors.ErrNotFound, "Not found container %s", ctrId) } @@ -852,10 +925,6 @@ func (s *sPodGuestInstance) getContainersFilePath() string { return path.Join(s.HomeDir(), "containers") } -func (s *sPodGuestInstance) getContainer(id string) *sContainer { - return s.containers[id] -} - func (s *sPodGuestInstance) CreateContainer(ctx context.Context, userCred mcclient.TokenCredential, id string, input *hostapi.ContainerCreateInput) (jsonutils.JSONObject, error) { ctrCriId, err := s.createContainer(ctx, userCred, id, input) if err != nil { @@ -1263,6 +1332,15 @@ func (s *sPodGuestInstance) getContainerStatus(ctx context.Context, ctrId string case runtimeapi.ContainerState_CONTAINER_UNKNOWN: status = computeapi.CONTAINER_STATUS_UNKNOWN } + if status == computeapi.CONTAINER_STATUS_RUNNING { + ctr := s.GetContainerById(ctrId) + if ctr == nil { + return "", errors.Wrapf(httperrors.ErrNotFound, "not found container by id %s", ctrId) + } + if ctr.Spec.NeedProbe() { + status = computeapi.CONTAINER_STATUS_PROBING + } + } return status, nil } diff --git a/pkg/hostman/hostinfo/container.go b/pkg/hostman/hostinfo/container.go index 3f5885cfc9..bced7f6b85 100644 --- a/pkg/hostman/hostinfo/container.go +++ b/pkg/hostman/hostinfo/container.go @@ -61,7 +61,7 @@ func (h *SHostInfo) startContainerStatsProvider(cri pod.CRI) error { if err := ca.Start(); err != nil { return errors.Wrap(err, "start cadvisor") } - h.containerStatsProvier = stats.NewCRIContainerStatsProvider(ca, cri.GetRuntimeClient(), cri.GetImageClient()) + h.containerStatsProvider = stats.NewCRIContainerStatsProvider(ca, cri.GetRuntimeClient(), cri.GetImageClient()) return nil } @@ -74,5 +74,5 @@ func (h *SHostInfo) GetContainerCPUMap() *pod.HostContainerCPUMap { } func (h *SHostInfo) GetContainerStatsProvider() stats.ContainerStatsProvider { - return h.containerStatsProvier + return h.containerStatsProvider } diff --git a/pkg/hostman/hostinfo/hostinfo.go b/pkg/hostman/hostinfo/hostinfo.go index 513b3ef552..7ea735fc27 100644 --- a/pkg/hostman/hostinfo/hostinfo.go +++ b/pkg/hostman/hostinfo/hostinfo.go @@ -120,9 +120,10 @@ type SHostInfo struct { IoScheduler string - cri pod.CRI - containerCPUMap *pod.HostContainerCPUMap - containerStatsProvier stats.ContainerStatsProvider + // container related members + cri pod.CRI + containerCPUMap *pod.HostContainerCPUMap + containerStatsProvider stats.ContainerStatsProvider } func (h *SHostInfo) GetContainerDeviceConfigurationFilePath() string { diff --git a/pkg/util/exec/doc.go b/pkg/util/exec/doc.go new file mode 100644 index 0000000000..4d9a15a1ea --- /dev/null +++ b/pkg/util/exec/doc.go @@ -0,0 +1 @@ +package exec // import "yunion.io/x/onecloud/pkg/util/exec" diff --git a/pkg/util/exec/exec.go b/pkg/util/exec/exec.go new file mode 100644 index 0000000000..b6aeb74d21 --- /dev/null +++ b/pkg/util/exec/exec.go @@ -0,0 +1,266 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package exec + +import ( + "context" + "io" + osexec "os/exec" + "syscall" + "time" +) + +// ErrExecutableNotFound is returned if the executable is not found. +var ErrExecutableNotFound = osexec.ErrNotFound + +// Interface is an interface that presents a subset of the os/exec API. Use this +// when you want to inject fakeable/mockable exec behavior. +type Interface interface { + // Command returns a Cmd instance which can be used to run a single command. + // This follows the pattern of package os/exec. + Command(cmd string, args ...string) Cmd + + // CommandContext returns a Cmd instance which can be used to run a single command. + // + // The provided context is used to kill the process if the context becomes done + // before the command completes on its own. For example, a timeout can be set in + // the context. + CommandContext(ctx context.Context, cmd string, args ...string) Cmd + + // LookPath wraps os/exec.LookPath + LookPath(file string) (string, error) +} + +// Cmd is an interface that presents an API that is very similar to Cmd from os/exec. +// As more functionality is needed, this can grow. Since Cmd is a struct, we will have +// to replace fields with get/set method pairs. +type Cmd interface { + // Run runs the command to the completion. + Run() error + // CombinedOutput runs the command and returns its combined standard output + // and standard error. This follows the pattern of package os/exec. + CombinedOutput() ([]byte, error) + // Output runs the command and returns standard output, but not standard err + Output() ([]byte, error) + SetDir(dir string) + SetStdin(in io.Reader) + SetStdout(out io.Writer) + SetStderr(out io.Writer) + SetEnv(env []string) + + // StdoutPipe and StderrPipe for getting the process' Stdout and Stderr as + // Readers + StdoutPipe() (io.ReadCloser, error) + StderrPipe() (io.ReadCloser, error) + + // Start and Wait are for running a process non-blocking + Start() error + Wait() error + + // Stops the command by sending SIGTERM. It is not guaranteed the + // process will stop before this function returns. If the process is not + // responding, an internal timer function will send a SIGKILL to force + // terminate after 10 seconds. + Stop() +} + +// ExitError is an interface that presents an API similar to os.ProcessState, which is +// what ExitError from os/exec is. This is designed to make testing a bit easier and +// probably loses some of the cross-platform properties of the underlying library. +type ExitError interface { + String() string + Error() string + Exited() bool + ExitStatus() int +} + +// Implements Interface in terms of really exec()ing. +type executor struct{} + +// New returns a new Interface which will os/exec to run commands. +func New() Interface { + return &executor{} +} + +// Command is part of the Interface interface. +func (executor *executor) Command(cmd string, args ...string) Cmd { + return (*cmdWrapper)(osexec.Command(cmd, args...)) +} + +// CommandContext is part of the Interface interface. +func (executor *executor) CommandContext(ctx context.Context, cmd string, args ...string) Cmd { + return (*cmdWrapper)(osexec.CommandContext(ctx, cmd, args...)) +} + +// LookPath is part of the Interface interface +func (executor *executor) LookPath(file string) (string, error) { + return osexec.LookPath(file) +} + +// Wraps exec.Cmd so we can capture errors. +type cmdWrapper osexec.Cmd + +var _ Cmd = &cmdWrapper{} + +func (cmd *cmdWrapper) SetDir(dir string) { + cmd.Dir = dir +} + +func (cmd *cmdWrapper) SetStdin(in io.Reader) { + cmd.Stdin = in +} + +func (cmd *cmdWrapper) SetStdout(out io.Writer) { + cmd.Stdout = out +} + +func (cmd *cmdWrapper) SetStderr(out io.Writer) { + cmd.Stderr = out +} + +func (cmd *cmdWrapper) SetEnv(env []string) { + cmd.Env = env +} + +func (cmd *cmdWrapper) StdoutPipe() (io.ReadCloser, error) { + r, err := (*osexec.Cmd)(cmd).StdoutPipe() + return r, handleError(err) +} + +func (cmd *cmdWrapper) StderrPipe() (io.ReadCloser, error) { + r, err := (*osexec.Cmd)(cmd).StderrPipe() + return r, handleError(err) +} + +func (cmd *cmdWrapper) Start() error { + err := (*osexec.Cmd)(cmd).Start() + return handleError(err) +} + +func (cmd *cmdWrapper) Wait() error { + err := (*osexec.Cmd)(cmd).Wait() + return handleError(err) +} + +// Run is part of the Cmd interface. +func (cmd *cmdWrapper) Run() error { + err := (*osexec.Cmd)(cmd).Run() + return handleError(err) +} + +// CombinedOutput is part of the Cmd interface. +func (cmd *cmdWrapper) CombinedOutput() ([]byte, error) { + out, err := (*osexec.Cmd)(cmd).CombinedOutput() + return out, handleError(err) +} + +func (cmd *cmdWrapper) Output() ([]byte, error) { + out, err := (*osexec.Cmd)(cmd).Output() + return out, handleError(err) +} + +// Stop is part of the Cmd interface. +func (cmd *cmdWrapper) Stop() { + c := (*osexec.Cmd)(cmd) + + if c.Process == nil { + return + } + + c.Process.Signal(syscall.SIGTERM) + + time.AfterFunc(10*time.Second, func() { + if !c.ProcessState.Exited() { + c.Process.Signal(syscall.SIGKILL) + } + }) +} + +func handleError(err error) error { + if err == nil { + return nil + } + + switch e := err.(type) { + case *osexec.ExitError: + return &ExitErrorWrapper{e} + case *osexec.Error: + if e.Err == osexec.ErrNotFound { + return ErrExecutableNotFound + } + } + + return err +} + +// ExitErrorWrapper is an implementation of ExitError in terms of os/exec ExitError. +// Note: standard exec.ExitError is type *os.ProcessState, which already implements Exited(). +type ExitErrorWrapper struct { + *osexec.ExitError +} + +var _ ExitError = &ExitErrorWrapper{} + +// ExitStatus is part of the ExitError interface. +func (eew ExitErrorWrapper) ExitStatus() int { + ws, ok := eew.Sys().(syscall.WaitStatus) + if !ok { + panic("can't call ExitStatus() on a non-WaitStatus exitErrorWrapper") + } + return ws.ExitStatus() +} + +// CodeExitError is an implementation of ExitError consisting of an error object +// and an exit code (the upper bits of os.exec.ExitStatus). +type CodeExitError struct { + Err error + Code int +} + +var _ ExitError = CodeExitError{} + +func (e CodeExitError) Error() string { + return e.Err.Error() +} + +func (e CodeExitError) String() string { + return e.Err.Error() +} + +// Exited is to check if the process has finished +func (e CodeExitError) Exited() bool { + return true +} + +// ExitStatus is for checking the error code +func (e CodeExitError) ExitStatus() int { + return e.Code +} diff --git a/pkg/util/ioutils/doc.go b/pkg/util/ioutils/doc.go new file mode 100644 index 0000000000..d6c4abf105 --- /dev/null +++ b/pkg/util/ioutils/doc.go @@ -0,0 +1 @@ +package ioutils // import "yunion.io/x/onecloud/pkg/util/ioutils" diff --git a/pkg/util/ioutils/ioutils.go b/pkg/util/ioutils/ioutils.go new file mode 100644 index 0000000000..0fb3dc3f7c --- /dev/null +++ b/pkg/util/ioutils/ioutils.go @@ -0,0 +1,66 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ioutils + +import "io" + +// LimitWriter is a copy of the standard library ioutils.LimitReader, +// applied to the writer interface. +// LimitWriter returns a Writer that writes to w +// but stops with EOF after n bytes. +// The underlying implementation is a *LimitedWriter. +func LimitWriter(w io.Writer, n int64) io.Writer { return &LimitedWriter{w, n} } + +// A LimitedWriter writes to W but limits the amount of +// data returned to just N bytes. Each call to Write +// updates N to reflect the new amount remaining. +// Write returns EOF when N <= 0 or when the underlying W returns EOF. +type LimitedWriter struct { + W io.Writer // underlying writer + N int64 // max bytes remaining +} + +func (l *LimitedWriter) Write(p []byte) (n int, err error) { + if l.N <= 0 { + return 0, io.ErrShortWrite + } + truncated := false + if int64(len(p)) > l.N { + p = p[0:l.N] + truncated = true + } + n, err = l.W.Write(p) + l.N -= int64(n) + if err == nil && truncated { + err = io.ErrShortWrite + } + return +} diff --git a/pkg/util/probe/doc.go b/pkg/util/probe/doc.go new file mode 100644 index 0000000000..23ec43078a --- /dev/null +++ b/pkg/util/probe/doc.go @@ -0,0 +1 @@ +package probe // import "yunion.io/x/onecloud/pkg/util/probe" diff --git a/pkg/util/probe/exec/doc.go b/pkg/util/probe/exec/doc.go new file mode 100644 index 0000000000..438cf66452 --- /dev/null +++ b/pkg/util/probe/exec/doc.go @@ -0,0 +1 @@ +package exec // import "yunion.io/x/onecloud/pkg/util/probe/exec" diff --git a/pkg/util/probe/exec/errors.go b/pkg/util/probe/exec/errors.go new file mode 100644 index 0000000000..df011ba5c4 --- /dev/null +++ b/pkg/util/probe/exec/errors.go @@ -0,0 +1,59 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package exec + +import "time" + +// NewTimeoutError returns a new TimeoutError. +func NewTimeoutError(err error, timeout time.Duration) *TimeoutError { + return &TimeoutError{ + err: err, + timeout: timeout, + } +} + +// TimeoutError is an error returned on exec probe timeouts. It should be returned by CRI implementations +// in order for the exec prober to interpret exec timeouts as failed probes. +// TODO: this error type can likely be removed when we support CRI errors. +type TimeoutError struct { + err error + timeout time.Duration +} + +// Error returns the error string. +func (t *TimeoutError) Error() string { + return t.err.Error() +} + +// Timeout returns the timeout duration of the exec probe. +func (t *TimeoutError) Timeout() time.Duration { + return t.timeout +} diff --git a/pkg/util/probe/exec/exec.go b/pkg/util/probe/exec/exec.go new file mode 100644 index 0000000000..739e53cbba --- /dev/null +++ b/pkg/util/probe/exec/exec.go @@ -0,0 +1,93 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package exec + +import ( + "bytes" + + "yunion.io/x/log" + + "yunion.io/x/onecloud/pkg/util/exec" + "yunion.io/x/onecloud/pkg/util/ioutils" + "yunion.io/x/onecloud/pkg/util/probe" +) + +const ( + maxReadLength = 10 * 1 << 10 // 10KB +) + +// Prober is an interface defining the Probe object for container readiness/liveness checks. +type Prober interface { + Probe(e exec.Cmd) (probe.Result, string, error) +} + +// New creates a Prober. +func New() Prober { + return execProber{} +} + +type execProber struct{} + +// Probe executes a command to check the liveness/readiness of container +// from executing a command. Returns the Result status, command output, and +// errors if any. +func (pr execProber) Probe(e exec.Cmd) (probe.Result, string, error) { + var dataBuffer bytes.Buffer + writer := ioutils.LimitWriter(&dataBuffer, maxReadLength) + + e.SetStderr(writer) + e.SetStdout(writer) + err := e.Start() + if err == nil { + err = e.Wait() + } + data := dataBuffer.Bytes() + + log.Infof("Exec probe response: %q", string(data)) + if err != nil { + exit, ok := err.(exec.ExitError) + if ok { + if exit.ExitStatus() == 0 { + return probe.Success, string(data), nil + } + return probe.Failure, string(data), nil + } + + timeoutErr, ok := err.(*TimeoutError) + if ok { + log.Warningf("Exec probe timed out after %s", timeoutErr.Timeout()) + return probe.Failure, string(data), nil + } + + return probe.Unknown, "", err + } + return probe.Success, string(data), nil +} diff --git a/pkg/util/probe/exec/exec_test.go b/pkg/util/probe/exec/exec_test.go new file mode 100644 index 0000000000..640aaa85b0 --- /dev/null +++ b/pkg/util/probe/exec/exec_test.go @@ -0,0 +1,159 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package exec + +import ( + "fmt" + "io" + "strings" + "testing" + + "yunion.io/x/onecloud/pkg/util/probe" +) + +type FakeCmd struct { + out []byte + stdout []byte + err error + writer io.Writer +} + +func (f *FakeCmd) Run() error { + return nil +} + +func (f *FakeCmd) CombinedOutput() ([]byte, error) { + return f.out, f.err +} + +func (f *FakeCmd) Output() ([]byte, error) { + return f.stdout, f.err +} + +func (f *FakeCmd) SetDir(dir string) {} + +func (f *FakeCmd) SetStdin(in io.Reader) {} + +func (f *FakeCmd) SetStdout(out io.Writer) { + f.writer = out +} + +func (f *FakeCmd) SetStderr(out io.Writer) { + f.writer = out +} + +func (f *FakeCmd) SetEnv(env []string) {} + +func (f *FakeCmd) Stop() {} + +func (f *FakeCmd) Start() error { + if f.writer != nil { + f.writer.Write(f.out) + return f.err + } + return f.err +} + +func (f *FakeCmd) Wait() error { return nil } + +func (f *FakeCmd) StdoutPipe() (io.ReadCloser, error) { + return nil, nil +} + +func (f *FakeCmd) StderrPipe() (io.ReadCloser, error) { + return nil, nil +} + +type fakeExitError struct { + exited bool + statusCode int +} + +func (f *fakeExitError) String() string { + return f.Error() +} + +func (f *fakeExitError) Error() string { + return "fake exit" +} + +func (f *fakeExitError) Exited() bool { + return f.exited +} + +func (f *fakeExitError) ExitStatus() int { + return f.statusCode +} + +func TestExec(t *testing.T) { + prober := New() + + tenKilobyte := strings.Repeat("logs-123", 128*10) // 8*128*10=10240 = 10KB of text. + elevenKilobyte := strings.Repeat("logs-123", 8*128*11) // 8*128*11=11264 = 11KB of text. + + tests := []struct { + expectedStatus probe.Result + expectError bool + input string + output string + err error + }{ + // Ok + {probe.Success, false, "OK", "OK", nil}, + // Ok + {probe.Success, false, "OK", "OK", &fakeExitError{true, 0}}, + // Ok - truncated output + {probe.Success, false, elevenKilobyte, tenKilobyte, nil}, + // Run returns error + {probe.Unknown, true, "", "", fmt.Errorf("test error")}, + // Unhealthy + {probe.Failure, false, "Fail", "", &fakeExitError{true, 1}}, + } + for i, test := range tests { + fake := FakeCmd{ + out: []byte(test.output), + err: test.err, + } + status, output, err := prober.Probe(&fake) + if status != test.expectedStatus { + t.Errorf("[%d] expected %v, got %v", i, test.expectedStatus, status) + } + if err != nil && test.expectError == false { + t.Errorf("[%d] unexpected error: %v", i, err) + } + if err == nil && test.expectError == true { + t.Errorf("[%d] unexpected non-error", i) + } + if test.output != output { + t.Errorf("[%d] expected %s, got %s", i, test.output, output) + } + } +} diff --git a/pkg/util/probe/probe.go b/pkg/util/probe/probe.go new file mode 100644 index 0000000000..72f945878b --- /dev/null +++ b/pkg/util/probe/probe.go @@ -0,0 +1,49 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package probe + +// Result is a string used to handle the results for probing container readiness/liveness +type Result string + +const ( + // Success Result + Success Result = "success" + // Warning Result. Logically success, but with additional debugging information attached. + Warning Result = "warning" + // Failure Result + Failure Result = "failure" + // Unknown Result + Unknown Result = "unknown" +) + +func (r Result) IsEqual(r2 Result) bool { + return string(r) == string(r2) +} diff --git a/pkg/util/probe/tcp/doc.go b/pkg/util/probe/tcp/doc.go new file mode 100644 index 0000000000..a11482421b --- /dev/null +++ b/pkg/util/probe/tcp/doc.go @@ -0,0 +1 @@ +package tcp // import "yunion.io/x/onecloud/pkg/util/probe/tcp" diff --git a/pkg/util/probe/tcp/tcp.go b/pkg/util/probe/tcp/tcp.go new file mode 100644 index 0000000000..3d3b7d9a82 --- /dev/null +++ b/pkg/util/probe/tcp/tcp.go @@ -0,0 +1,75 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tcp + +import ( + "net" + "strconv" + "time" + + "yunion.io/x/log" + + "yunion.io/x/onecloud/pkg/util/probe" +) + +// New creates Prober. +func New() Prober { + return tcpProber{} +} + +// Prober is an interface that defines the Probe function for doing TCP readiness/liveness checks. +type Prober interface { + Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) +} + +type tcpProber struct{} + +// Probe returns a ProbeRunner capable of running an TCP check. +func (pr tcpProber) Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) { + return DoTCPProbe(net.JoinHostPort(host, strconv.Itoa(port)), timeout) +} + +// DoTCPProbe checks that a TCP socket to the address can be opened. +// If the socket can be opened, it returns Success +// If the socket fails to open, it returns Failure. +// This is exported because some other packages may want to do direct TCP probes. +func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) { + conn, err := net.DialTimeout("tcp", addr, timeout) + if err != nil { + // Convert errors to failures to handle timeouts. + return probe.Failure, err.Error(), nil + } + err = conn.Close() + if err != nil { + log.Errorf("Unexpected error closing TCP probe socket: %v (%#v)", err, err) + } + return probe.Success, "", nil +} diff --git a/pkg/util/probe/tcp/tcp_test.go b/pkg/util/probe/tcp/tcp_test.go new file mode 100644 index 0000000000..89aaea71b7 --- /dev/null +++ b/pkg/util/probe/tcp/tcp_test.go @@ -0,0 +1,82 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tcp + +import ( + "net" + "net/http" + "net/http/httptest" + "strconv" + "testing" + "time" + + "yunion.io/x/onecloud/pkg/util/probe" +) + +func TestTcpHealthChecker(t *testing.T) { + // Setup a test server that responds to probing correctly + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + tHost, tPortStr, err := net.SplitHostPort(server.Listener.Addr().String()) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + tPort, err := strconv.Atoi(tPortStr) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + tests := []struct { + host string + port int + + expectedStatus probe.Result + expectedError error + }{ + // A connection is made and probing would succeed + {tHost, tPort, probe.Success, nil}, + // No connection can be made and probing would fail + {tHost, -1, probe.Failure, nil}, + } + + prober := New() + for i, tt := range tests { + status, _, err := prober.Probe(tt.host, tt.port, 1*time.Second) + if status != tt.expectedStatus { + t.Errorf("#%d: expected status=%v, get=%v", i, tt.expectedStatus, status) + } + if err != tt.expectedError { + t.Errorf("#%d: expected error=%v, get=%v", i, tt.expectedError, err) + } + } +}