Files
cloudpods/pkg/hostman/guestman/guesttasks.go

3318 lines
88 KiB
Go

// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package guestman
import (
"context"
"fmt"
"math"
"net"
"os"
"path"
"regexp"
"strconv"
"strings"
"time"
"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/appctx"
"yunion.io/x/pkg/errors"
"yunion.io/x/pkg/util/version"
"yunion.io/x/pkg/utils"
"yunion.io/x/onecloud/pkg/apis"
api "yunion.io/x/onecloud/pkg/apis/compute"
hostapi "yunion.io/x/onecloud/pkg/apis/host"
"yunion.io/x/onecloud/pkg/hostman/guestman/desc"
"yunion.io/x/onecloud/pkg/hostman/guestman/qemu"
"yunion.io/x/onecloud/pkg/hostman/hostinfo"
"yunion.io/x/onecloud/pkg/hostman/hostutils"
"yunion.io/x/onecloud/pkg/hostman/isolated_device"
"yunion.io/x/onecloud/pkg/hostman/monitor"
"yunion.io/x/onecloud/pkg/hostman/options"
"yunion.io/x/onecloud/pkg/hostman/storageman"
"yunion.io/x/onecloud/pkg/mcclient/auth"
"yunion.io/x/onecloud/pkg/util/cgrouputils/cpuset"
"yunion.io/x/onecloud/pkg/util/procutils"
"yunion.io/x/onecloud/pkg/util/qemuimg"
"yunion.io/x/onecloud/pkg/util/timeutils2"
)
type IGuestTasks interface {
Start(func(...error))
}
/**
* GuestStopTask
**/
type SGuestStopTask struct {
*SKVMGuestInstance
ctx context.Context
timeout int64
startPowerdown time.Time
}
func NewGuestStopTask(guest *SKVMGuestInstance, ctx context.Context, timeout int64) *SGuestStopTask {
return &SGuestStopTask{
SKVMGuestInstance: guest,
ctx: ctx,
timeout: timeout,
startPowerdown: time.Time{},
}
}
func (s *SGuestStopTask) Start() {
s.stopping = true
s.startPowerdown = time.Now()
if s.IsRunning() && s.IsMonitorAlive() {
s.Monitor.SimpleCommand("system_powerdown", s.onPowerdownGuest)
}
s.checkGuestRunning()
}
func (s *SGuestStopTask) onPowerdownGuest(results string) {
//s.ExitCleanup(true)
log.Debugf("system_powerdown callback successfully")
// s.checkGuestRunning()
}
func (s *SGuestStopTask) checkGuestRunning() {
if !s.IsRunning() || time.Now().Sub(s.startPowerdown) > time.Duration(s.timeout)*time.Second {
s.Stop() // force stop
s.stopping = false
hostutils.TaskComplete(s.ctx, nil)
} else {
s.CheckGuestRunningLater()
}
}
func (s *SGuestStopTask) CheckGuestRunningLater() {
time.Sleep(time.Second * 1)
s.checkGuestRunning()
}
type SGuestSuspendTask struct {
*SKVMGuestInstance
ctx context.Context
onFinishCallback func(*SGuestSuspendTask, string)
}
func NewGuestSuspendTask(
guest *SKVMGuestInstance,
ctx context.Context,
onFinishCallback func(*SGuestSuspendTask, string),
) *SGuestSuspendTask {
t := &SGuestSuspendTask{
SKVMGuestInstance: guest,
ctx: ctx,
}
if onFinishCallback == nil {
onFinishCallback = t.onSaveMemStateComplete
}
t.onFinishCallback = onFinishCallback
return t
}
func (s *SGuestSuspendTask) Start() {
s.Monitor.SimpleCommand("stop", s.onSuspendGuest)
}
func (s *SGuestSuspendTask) GetStateFilePath() string {
return s.SKVMGuestInstance.GetStateFilePath("")
}
func (s *SGuestSuspendTask) onSuspendGuest(results string) {
if strings.Contains(strings.ToLower(results), "error") {
hostutils.TaskFailed(s.ctx, fmt.Sprintf("Suspend error: %s", results))
return
}
statFile := s.GetStateFilePath()
s.Monitor.SaveState(statFile, s.onSaveMemStateWait)
}
func (s *SGuestSuspendTask) onSaveMemStateWait(results string) {
if strings.Contains(strings.ToLower(results), "error") {
hostutils.TaskFailed(s.ctx, fmt.Sprintf("Save memory state error: %s", results))
// TODO: send cont command
return
}
s.Monitor.GetMigrateStatus(s.onSaveMemStateCheck)
}
func (s *SGuestSuspendTask) onSaveMemStateCheck(status string) {
if status == "failed" {
hostutils.TaskFailed(s.ctx, "Save memory state failed")
// TODO: send cont command
return
} else if status != "completed" {
time.Sleep(time.Second * 3)
log.Infof("Server %s saving memory state status %q", s.GetName(), status)
s.onSaveMemStateWait("")
} else {
log.Infof("Server %s save memory completed", s.GetName())
s.onFinishCallback(s, s.GetStateFilePath())
}
}
func (s *SGuestSuspendTask) onSaveMemStateComplete(_ *SGuestSuspendTask, _ string) {
log.Infof("Server %s memory state saved, stopping server", s.GetName())
s.ExecStopTask(s.ctx, int64(3))
}
/**
* GuestSyncConfigTaskExecutor
**/
type SGuestSyncConfigTaskExecutor struct {
ctx context.Context
guest *SKVMGuestInstance
tasks []IGuestTasks
errors []error
callback func([]error)
}
func NewGuestSyncConfigTaskExecutor(ctx context.Context, guest *SKVMGuestInstance, tasks []IGuestTasks, callback func([]error)) *SGuestSyncConfigTaskExecutor {
return &SGuestSyncConfigTaskExecutor{ctx, guest, tasks, make([]error, 0), callback}
}
func (t *SGuestSyncConfigTaskExecutor) Start(delay int) {
timeutils2.AddTimeout(1*time.Second, t.runNextTask)
}
func (t *SGuestSyncConfigTaskExecutor) runNextTask() {
if len(t.tasks) > 0 {
task := t.tasks[len(t.tasks)-1]
t.tasks = t.tasks[:len(t.tasks)-1]
task.Start(t.runNextTaskCallback)
} else {
t.doCallback()
}
}
func (t *SGuestSyncConfigTaskExecutor) doCallback() {
if t.callback != nil {
t.callback(t.errors)
t.callback = nil
}
}
func (t *SGuestSyncConfigTaskExecutor) runNextTaskCallback(err ...error) {
if err != nil {
t.errors = append(t.errors, err...)
}
t.runNextTask()
}
/**
* GuestDiskSyncTask
**/
type SGuestDiskSyncTask struct {
guest *SKVMGuestInstance
delDisks []*desc.SGuestDisk
addDisks []*desc.SGuestDisk
cdroms []*desc.SGuestCdrom
floppys []*desc.SGuestFloppy
errors []error
callback func(...error)
checkDrivers []string
}
func NewGuestDiskSyncTask(guest *SKVMGuestInstance, delDisks, addDisks []*desc.SGuestDisk, cdroms []*desc.SGuestCdrom, floppys []*desc.SGuestFloppy) *SGuestDiskSyncTask {
return &SGuestDiskSyncTask{guest, delDisks, addDisks, cdroms, floppys, make([]error, 0), nil, nil}
}
func (d *SGuestDiskSyncTask) Start(callback func(...error)) {
d.callback = callback
d.syncDisksConf()
}
func (d *SGuestDiskSyncTask) syncDisksConf() {
if len(d.delDisks) > 0 {
disk := d.delDisks[len(d.delDisks)-1]
d.delDisks = d.delDisks[:len(d.delDisks)-1]
d.removeDisk(disk)
return
}
if len(d.addDisks) > 0 {
disk := d.addDisks[len(d.addDisks)-1]
d.addDisks = d.addDisks[:len(d.addDisks)-1]
d.addDisk(disk)
return
}
if len(d.cdroms) > 0 {
cdrom := d.cdroms[len(d.cdroms)-1]
d.cdroms = d.cdroms[:len(d.cdroms)-1]
d.changeCdrom(cdrom)
return
}
if len(d.floppys) > 0 {
floppy := d.floppys[len(d.floppys)-1]
d.floppys = d.floppys[:len(d.floppys)-1]
d.changeFloppy(floppy)
return
}
if idxs := d.guest.GetNeedMergeBackingFileDiskIndexs(); len(idxs) > 0 {
d.guest.StreamDisks(context.Background(),
func() { d.guest.streamDisksComplete(context.Background()) }, idxs, -1, -1,
)
}
d.callback(d.errors...)
}
func (d *SGuestDiskSyncTask) changeCdrom(cdrom *desc.SGuestCdrom) {
d.guest.Monitor.GetBlocks(func(blocks []monitor.QemuBlock) {
var cdName string
for _, r := range blocks {
if regexp.MustCompile(fmt.Sprintf(`^(ide|scsi)?(%d-)?cd\d+$`, cdrom.Ordinal)).MatchString(r.Device) {
cdName = r.Device
break
}
}
if len(cdName) > 0 {
d.changeCdromContent(cdName, cdrom)
} else {
if cdrom.Path != "" && cdrom.Scsi != nil {
cb2 := func(res string) {
d.changeCdromContent(cdName, cdrom)
}
cb := func(res string) {
cdrom.Scsi.Options["drive"] = cdrom.Id
d.guest.Monitor.DeviceAdd(cdrom.Scsi.DevType, cdrom.Scsi.Options, cb2)
}
params := map[string]string{}
for k, v := range cdrom.DriveOptions {
params[k] = v
}
params["file"] = cdrom.Path
params["id"] = cdrom.Id
d.guest.Monitor.DriveAdd("0", "", params, cb)
} else {
cdrom.Path = ""
d.syncDisksConf()
}
}
})
}
func (d *SGuestDiskSyncTask) changeCdromContent(cdName string, cdrom *desc.SGuestCdrom) {
if cdrom.Path == "" {
d.guest.Monitor.EjectCdrom(cdName, func(s string) {
d.OnEjectCdromContentSucc(cdName)
})
} else {
d.guest.Monitor.ChangeCdrom(cdName, cdrom.Path, func(s string) {
d.OnChangeCdromContentSucc(cdrom)
})
}
}
func (d *SGuestDiskSyncTask) OnEjectCdromContentSucc(cdName string) {
for i, cdrom := range d.guest.Desc.Cdroms {
if cdrom.Id == cdName {
d.guest.Desc.Cdroms[i].Path = ""
d.guest.Desc.Cdroms[i].BootIndex = nil
}
}
d.syncDisksConf()
}
func (d *SGuestDiskSyncTask) OnChangeCdromContentSucc(cdrom *desc.SGuestCdrom) {
if d.guest.Desc.Cdroms == nil {
d.guest.Desc.Cdroms = make([]*desc.SGuestCdrom, options.HostOptions.CdromCount)
for i := range d.guest.Desc.Cdroms {
d.guest.Desc.Cdroms[i] = new(desc.SGuestCdrom)
d.guest.Desc.Cdroms[i].Ordinal = int64(i)
}
}
for i := range d.guest.Desc.Cdroms {
if cdrom.Ordinal == d.guest.Desc.Cdroms[i].Ordinal {
d.guest.Desc.Cdroms[i] = cdrom
}
}
d.syncDisksConf()
}
func (d *SGuestDiskSyncTask) changeFloppy(floppy *desc.SGuestFloppy) {
d.guest.Monitor.GetBlocks(func(blocks []monitor.QemuBlock) {
var flName string
for _, r := range blocks {
if regexp.MustCompile(fmt.Sprintf(`^floppy%d$`, floppy.Ordinal)).MatchString(r.Device) {
flName = r.Device
break
}
}
if len(flName) > 0 {
d.changeFloppyContent(flName, floppy)
} else {
floppy.Path = ""
d.syncDisksConf()
}
})
}
func (d *SGuestDiskSyncTask) changeFloppyContent(flName string, floppy *desc.SGuestFloppy) {
if floppy.Path == "" {
d.guest.Monitor.EjectCdrom(flName, func(s string) {
d.OnEjectFloppyContentSucc(flName)
})
} else {
d.guest.Monitor.ChangeCdrom(flName, floppy.Path, func(s string) {
d.OnChangeFloppyContentSucc(floppy)
})
}
}
func (d *SGuestDiskSyncTask) OnEjectFloppyContentSucc(flName string) {
for i, floppy := range d.guest.Desc.Floppys {
if floppy.Id == flName {
d.guest.Desc.Floppys[i].Path = ""
}
}
d.syncDisksConf()
}
func (d *SGuestDiskSyncTask) OnChangeFloppyContentSucc(floppy *desc.SGuestFloppy) {
if d.guest.Desc.Floppys == nil {
d.guest.Desc.Floppys = make([]*desc.SGuestFloppy, options.HostOptions.FloppyCount)
for i := range d.guest.Desc.Floppys {
d.guest.Desc.Floppys[i] = new(desc.SGuestFloppy)
d.guest.Desc.Floppys[i].Ordinal = int64(i)
}
}
for i := range d.guest.Desc.Floppys {
if floppy.Ordinal == d.guest.Desc.Floppys[i].Ordinal {
d.guest.Desc.Floppys[i] = floppy
}
}
d.syncDisksConf()
}
func (d *SGuestDiskSyncTask) removeDisk(disk *desc.SGuestDisk) {
devId := fmt.Sprintf("drive_%d", disk.Index)
d.guest.Monitor.DriveDel(devId, func(results string) {
d.onRemoveDriveSucc(devId, results, disk.Index)
})
}
func (d *SGuestDiskSyncTask) onRemoveDriveSucc(devId, results string, diskIdx int8) {
log.Infof("remove drive %s results: %s", devId, results)
d.guest.Monitor.DeviceDel(devId, func(results string) {
d.onRemoveDiskSucc(results, diskIdx)
})
}
func (d *SGuestDiskSyncTask) onRemoveDiskSucc(results string, diskIdx int8) {
var i = 0
for ; i < len(d.guest.Desc.Disks); i++ {
if d.guest.Desc.Disks[i].Index == diskIdx {
if d.guest.Desc.Disks[i].Pci != nil {
err := d.guest.pciAddrs.ReleasePCIAddress(d.guest.Desc.Disks[i].Pci.PCIAddr)
if err != nil {
log.Errorf("failed release disk pci addr %s", d.guest.Desc.Disks[i].Pci.PCIAddr)
}
}
break
}
}
if i < len(d.guest.Desc.Disks) {
d.guest.Desc.Disks = append(d.guest.Desc.Disks[:i], d.guest.Desc.Disks[i+1:]...)
}
d.syncDisksConf()
}
func (d *SGuestDiskSyncTask) checkDiskDriver(disk *desc.SGuestDisk) {
if d.checkDrivers == nil {
d.checkDrivers = make([]string, 0)
}
log.Debugf("sync disk driver: %s", disk.Driver)
if disk.Driver == DISK_DRIVER_SCSI && d.guest.Desc.VirtioScsi == nil {
// insert virtio scsi
var cType = d.guest.getHotPlugPciControllerType()
if cType == nil {
err := errors.Errorf("failed get hotplugable pci controller")
d.errors = append(d.errors, err)
d.syncDisksConf()
return
}
d.guest.Desc.VirtioScsi = &desc.SGuestVirtioScsi{
PCIDevice: desc.NewPCIDevice(*cType, "virtio-scsi-pci", "scsi"),
}
if err := d.guest.ensureDevicePciAddress(d.guest.Desc.VirtioScsi.PCIDevice, -1, nil); err != nil {
d.guest.Desc.VirtioScsi = nil
err = errors.Wrap(err, "ensureDevicePciAddress for virtio scsi device")
d.errors = append(d.errors, err)
d.syncDisksConf()
return
}
cb := func(ret string) {
log.Infof("Add scsi controller %s", ret)
d.checkDrivers = append(d.checkDrivers, DISK_DRIVER_SCSI)
d.startAddDisk(disk)
}
params := map[string]string{
"id": d.guest.Desc.VirtioScsi.Id,
"bus": d.guest.Desc.VirtioScsi.BusStr(),
"addr": d.guest.Desc.VirtioScsi.SlotFunc(),
}
d.guest.Monitor.DeviceAdd(d.guest.Desc.VirtioScsi.DevType, params, cb)
} else if disk.Driver == DISK_DRIVER_SATA && d.guest.Desc.SataController == nil {
// insert sata ahci controller
var cType = d.guest.getHotPlugPciControllerType()
if cType == nil {
err := errors.Errorf("failed get hotplugable pci controller")
d.errors = append(d.errors, err)
d.syncDisksConf()
return
}
d.guest.Desc.SataController = &desc.SGuestAhciDevice{
PCIDevice: desc.NewPCIDevice(*cType, "ahci", "ahci0"),
}
err := d.guest.ensureDevicePciAddress(d.guest.Desc.SataController.PCIDevice, -1, nil)
if err != nil {
d.guest.Desc.SataController = nil
err = errors.Wrap(err, "ensureDevicePciAddress sata controller")
d.errors = append(d.errors, err)
d.syncDisksConf()
return
}
cb := func(ret string) {
log.Infof("Add sata ahci controller %s", ret)
d.checkDrivers = append(d.checkDrivers, DISK_DRIVER_SATA)
d.startAddDisk(disk)
}
params := map[string]string{
"id": d.guest.Desc.SataController.Id,
"bus": d.guest.Desc.SataController.BusStr(),
"addr": d.guest.Desc.SataController.SlotFunc(),
}
d.guest.Monitor.DeviceAdd(d.guest.Desc.SataController.DevType, params, cb)
} else {
d.startAddDisk(disk)
}
}
func (d *SGuestDiskSyncTask) addDisk(disk *desc.SGuestDisk) {
d.checkDiskDriver(disk)
}
func (d *SGuestDiskSyncTask) startAddDisk(disk *desc.SGuestDisk) {
iDisk, _ := storageman.GetManager().GetDiskByPath(disk.Path)
if iDisk == nil {
d.syncDisksConf()
return
}
var (
diskIndex = disk.Index
aio = disk.AioMode
diskDriver = disk.Driver
cacheMode = disk.CacheMode
)
var params = map[string]string{
"file": iDisk.GetPath(),
"if": "none",
"id": fmt.Sprintf("drive_%d", diskIndex),
"cache": cacheMode,
"aio": aio,
}
if iDisk.IsFile() {
params["file.locking"] = "off"
}
if d.guest.isEncrypted() {
params["encrypt.format"] = "luks"
params["encrypt.key-secret"] = "sec0"
}
var bus string
var cType *desc.PCI_CONTROLLER_TYPE
switch diskDriver {
case DISK_DRIVER_SCSI:
bus = "scsi.0"
case DISK_DRIVER_VIRTIO:
cType = d.guest.getHotPlugPciControllerType()
if cType == nil {
log.Errorf("no hotplugable pci controller found")
d.errors = append(d.errors, errors.Errorf("no hotplugable pci controller found"))
d.syncDisksConf()
return
}
bus = d.guest.GetPciBus()
case DISK_DRIVER_IDE:
bus = fmt.Sprintf("ide.%d", diskIndex/2)
case DISK_DRIVER_SATA:
bus = fmt.Sprintf("ahci0.%d", diskIndex)
}
// drive_add bus is a placeholder
d.guest.Monitor.DriveAdd(bus, "", params, func(result string) { d.onAddDiskSucc(disk, result, cType) })
}
func (d *SGuestDiskSyncTask) onAddDiskSucc(disk *desc.SGuestDisk, results string, cType *desc.PCI_CONTROLLER_TYPE) {
var (
diskIndex = disk.Index
diskDriver = disk.Driver
devType = qemu.GetDiskDeviceModel(diskDriver)
id = fmt.Sprintf("drive_%d", diskIndex)
)
switch diskDriver {
case DISK_DRIVER_VIRTIO:
disk.Pci = desc.NewPCIDevice(*cType, devType, id)
err := d.guest.ensureDevicePciAddress(disk.Pci, -1, nil)
if err != nil {
log.Errorln(err)
d.guest.Monitor.DriveDel(id, func(res string) {
log.Infof("drive %s del %s", id, res)
})
d.errors = append(d.errors, err)
d.syncDisksConf()
return
}
case DISK_DRIVER_SCSI:
disk.Scsi = desc.NewScsiDevice(d.guest.Desc.VirtioScsi.Id, devType, id)
case DISK_DRIVER_PVSCSI:
disk.Scsi = desc.NewScsiDevice(d.guest.Desc.PvScsi.Id, devType, id)
case DISK_DRIVER_IDE:
disk.Ide = desc.NewIdeDevice(devType, id)
case DISK_DRIVER_SATA: // -device ahci,id=ahci pci device
disk.Ide = desc.NewIdeDevice(devType, id)
}
d.guest.Desc.Disks = append(d.guest.Desc.Disks, disk)
var params = map[string]string{
"drive": fmt.Sprintf("drive_%d", diskIndex),
"id": fmt.Sprintf("drive_%d", diskIndex),
}
if diskDriver == DISK_DRIVER_VIRTIO && disk.Pci != nil {
params["bus"] = disk.Pci.BusStr()
params["addr"] = disk.Pci.SlotFunc()
} else if DISK_DRIVER_IDE == diskDriver {
params["unit"] = strconv.Itoa(int(diskIndex % 2))
}
d.guest.Monitor.DeviceAdd(devType, params, d.onAddDeviceSucc)
}
func (d *SGuestDiskSyncTask) onAddDeviceSucc(results string) {
log.Infof("%s device add res: %s", d.guest.GetName(), results)
d.syncDisksConf()
}
/**
* GuestNetworkSyncTask
**/
type SGuestNetworkSyncTask struct {
guest *SKVMGuestInstance
delNics []*desc.SGuestNetwork
addNics []*desc.SGuestNetwork
errors []error
delNicCnt int
addNicMacs []string
addNicConfs []*monitor.NetworkModify
callback func(...error)
}
func (n *SGuestNetworkSyncTask) Start(callback func(...error)) {
n.callback = callback
if len(n.addNics) > 0 {
addNicMacs := make([]string, 0)
addNicConfs := make([]*monitor.NetworkModify, 0)
for i := range n.addNics {
addNicMacs = append(addNicMacs, n.addNics[i].Mac)
netMod := &monitor.NetworkModify{}
if len(n.addNics[i].Ip) > 0 {
netMod.Ipmask = fmt.Sprintf("%s/%d", n.addNics[i].Ip, n.addNics[i].Masklen)
netMod.Gateway = n.addNics[i].Gateway
}
if len(n.addNics[i].Ip6) > 0 {
netMod.Ip6mask = fmt.Sprintf("%s/%d", n.addNics[i].Ip6, n.addNics[i].Masklen6)
}
addNicConfs = append(addNicConfs, netMod)
}
n.addNicMacs = addNicMacs
n.addNicConfs = addNicConfs
// deploy nics configure before do add nics
allNics := append(n.guest.Desc.Nics, n.addNics...)
if err := n.guest.QgaDeployNicsConfigure(allNics); err != nil {
log.Errorf("failed do QgaDeployNicsConfigure %s", err)
}
}
n.delNicCnt = len(n.delNics)
n.syncNetworkConf()
}
func (n *SGuestNetworkSyncTask) syncNetworkConf() {
if len(n.delNics) > 0 {
nic := n.delNics[len(n.delNics)-1]
n.delNics = n.delNics[:len(n.delNics)-1]
n.removeNic(nic)
} else if len(n.addNics) > 0 {
nic := n.addNics[len(n.addNics)-1]
n.addNics = n.addNics[:len(n.addNics)-1]
n.addNic(nic)
} else {
func() {
if len(n.addNicMacs) > 0 || n.delNicCnt > 0 {
// redeploy nics config after add/del nics
if err := n.guest.QgaDeployNicsConfigure(n.guest.Desc.Nics); err != nil {
log.Errorf("failed do QgaDeployNicsConfigure %s", err)
return
}
}
if len(n.addNicMacs) > 0 {
// try restart added nics, wait for added nic ready
time.Sleep(6 * time.Second)
if err := n.qgaRestartAddedNics(); err != nil {
log.Errorf("failed qgaRestartAddedNics %s", err)
return
}
}
}()
n.callback(n.errors...)
}
}
func (n *SGuestNetworkSyncTask) qgaRestartAddedNics() error {
err := n.qgaGetAddedNicDevs()
if err != nil {
return err
}
for i := range n.addNicConfs {
if n.addNicConfs[i].Device != "" {
err = n.guest.guestAgent.QgaRestartNetwork(n.addNicConfs[i])
if err != nil {
log.Errorf("Failed QgaRestartNetwork %s %s", n.addNicConfs[i].Device, err)
}
}
}
return nil
}
func (n *SGuestNetworkSyncTask) qgaGetAddedNicDevs() error {
data, err := n.guest.guestAgent.QgaGetNetwork()
if err != nil {
return errors.Wrap(err, "QgaGetNetwork")
}
var parsedData []api.IfnameDetail
ifnames, err := jsonutils.Parse(data)
if err != nil {
return errors.Wrapf(err, "parse qga network output %s", data)
}
err = ifnames.Unmarshal(&parsedData)
if err != nil {
return errors.Wrap(err, "unmarshal ifnames")
}
for i := range n.addNicMacs {
for j := range parsedData {
if n.addNicMacs[i] == parsedData[j].HardwareAddress {
n.addNicConfs[i].Device = parsedData[j].Name
break
}
}
}
return nil
}
func (n *SGuestNetworkSyncTask) removeNic(nic *desc.SGuestNetwork) {
if nic.Driver == "vfio-pci" {
n.onDeviceDel(nic)
return
}
callback := func(res string) {
if len(res) > 0 && !strings.Contains(res, "not found") {
log.Errorf("netdev del failed %s", res)
n.errors = append(n.errors, fmt.Errorf("netdev del failed %s", res))
n.syncNetworkConf()
} else {
n.onNetdevDel(nic)
}
}
n.guest.Monitor.NetdevDel(nic.Ifname, callback)
}
func (n *SGuestNetworkSyncTask) onNetdevDel(nic *desc.SGuestNetwork) {
downScript := n.guest.getNicDownScriptPath(nic)
output, err := procutils.NewCommand("sh", downScript).Output()
if err != nil {
log.Errorf("script down nic failed %s", output)
n.errors = append(n.errors, err)
}
n.delNicDevice(nic)
}
func (n *SGuestNetworkSyncTask) onDeviceDel(nic *desc.SGuestNetwork) {
var i = 0
for ; i < len(n.guest.Desc.Nics); i++ {
if n.guest.Desc.Nics[i].Index == nic.Index {
if nic.Pci != nil {
err := n.guest.pciAddrs.ReleasePCIAddress(nic.Pci.PCIAddr)
if err != nil {
log.Errorf("failed release nic pci addr %s", nic.Pci.PCIAddr)
}
}
break
}
}
if i < len(n.guest.Desc.Nics) {
n.guest.Desc.Nics = append(n.guest.Desc.Nics[:i], n.guest.Desc.Nics[i+1:]...)
}
n.syncNetworkConf()
}
func (n *SGuestNetworkSyncTask) delNicDevice(nic *desc.SGuestNetwork) {
callback := func(res string) {
if len(res) > 0 {
log.Errorf("network device del failed %s", res)
n.errors = append(n.errors, fmt.Errorf("network device del failed %s", res))
} else {
n.onDeviceDel(nic)
}
}
n.guest.Monitor.DeviceDel(fmt.Sprintf("netdev-%s", nic.Ifname), callback)
}
func (n *SGuestNetworkSyncTask) addNic(nic *desc.SGuestNetwork) {
if err := n.guest.generateNicScripts(nic); err != nil {
log.Errorln(err)
n.errors = append(n.errors, err)
n.syncNetworkConf()
return
}
if nic.Driver == "vfio-pci" {
// vfio device will add on isolated devices sync task
n.onDeviceAdd(nic)
return
}
var (
netType = "tap"
upscript = n.guest.getNicUpScriptPath(nic)
downscript = n.guest.getNicDownScriptPath(nic)
params = map[string]string{
"ifname": nic.Ifname, "script": upscript, "downscript": downscript,
"vhost": "on", "vhostforce": "off",
}
cType = n.guest.getHotPlugPciControllerType()
)
if cType == nil {
err := errors.Errorf("no hotplugable pci controller found")
log.Errorln(err)
n.errors = append(n.errors, err)
n.syncNetworkConf()
return
}
callback := func(res string) {
if len(res) > 0 {
log.Errorf("netdev add failed %s", res)
n.errors = append(n.errors, fmt.Errorf("netdev add failed %s", res))
n.syncNetworkConf()
} else {
nic.UpscriptPath = upscript
nic.DownscriptPath = downscript
n.onNetdevAdd(nic, cType)
}
}
n.guest.Monitor.NetdevAdd(nic.Ifname, netType, params, callback)
}
func (n *SGuestNetworkSyncTask) onNetdevAdd(nic *desc.SGuestNetwork, cType *desc.PCI_CONTROLLER_TYPE) {
id := fmt.Sprintf("netdev-%s", nic.Ifname)
devType := n.guest.getNicDeviceModel(nic.Driver)
onFail := func(e error) {
log.Errorln(e)
n.errors = append(n.errors, e)
n.guest.Monitor.NetdevDel(nic.Ifname, func(res string) {
log.Infof("netdev %s del %s", id, res)
})
n.syncNetworkConf()
}
switch nic.Driver {
case "virtio":
nic.Pci = desc.NewPCIDevice(*cType, devType, id)
case "e1000":
nic.Pci = desc.NewPCIDevice(*cType, devType, id)
case "vmxnet3":
nic.Pci = desc.NewPCIDevice(*cType, devType, id)
default:
err := errors.Errorf("unknown nic driver %s", nic.Driver)
onFail(err)
return
}
err := n.guest.ensureDevicePciAddress(nic.Pci, -1, nil)
if err != nil {
onFail(err)
return
}
params := map[string]string{
"id": fmt.Sprintf("netdev-%s", nic.Ifname),
"netdev": nic.Ifname,
"mac": nic.Mac,
"bus": nic.Pci.BusStr(),
"addr": nic.Pci.SlotFunc(),
}
callback := func(res string) {
if len(res) > 0 {
err := fmt.Errorf("device add failed %s", res)
onFail(err)
return
} else {
n.onDeviceAdd(nic)
}
}
n.guest.Monitor.DeviceAdd(devType, params, callback)
}
func (n *SGuestNetworkSyncTask) onDeviceAdd(nic *desc.SGuestNetwork) {
n.guest.Desc.Nics = append(n.guest.Desc.Nics, nic)
n.syncNetworkConf()
}
func NewGuestNetworkSyncTask(
guest *SKVMGuestInstance, delNics, addNics []*desc.SGuestNetwork,
) *SGuestNetworkSyncTask {
return &SGuestNetworkSyncTask{guest, delNics, addNics, make([]error, 0), 0, nil, nil, nil}
}
/**
* GuestIsolatedDeviceSyncTask
**/
type SGuestIsolatedDeviceSyncTask struct {
guest *SKVMGuestInstance
delDevs []*desc.SGuestIsolatedDevice
addDevs []*desc.SGuestIsolatedDevice
errors []error
callback func(...error)
}
func NewGuestIsolatedDeviceSyncTask(guest *SKVMGuestInstance, delDevs, addDevs []*desc.SGuestIsolatedDevice) *SGuestIsolatedDeviceSyncTask {
return &SGuestIsolatedDeviceSyncTask{guest, delDevs, addDevs, make([]error, 0), nil}
}
func (t *SGuestIsolatedDeviceSyncTask) Start(cb func(...error)) {
t.callback = cb
t.syncDevice()
}
func (t *SGuestIsolatedDeviceSyncTask) syncDevice() {
if len(t.delDevs) > 0 {
dev := t.delDevs[len(t.delDevs)-1]
t.delDevs = t.delDevs[:len(t.delDevs)-1]
t.removeDevice(dev)
} else if len(t.addDevs) > 0 {
dev := t.addDevs[len(t.addDevs)-1]
t.addDevs = t.addDevs[:len(t.addDevs)-1]
t.addDevice(dev)
} else {
t.callback(t.errors...)
}
}
func (t *SGuestIsolatedDeviceSyncTask) removeDevice(dev *desc.SGuestIsolatedDevice) {
cb := func(res string) {
if len(res) > 0 {
t.errors = append(t.errors, fmt.Errorf("device del failed: %s", res))
} else {
var i = 0
for ; i < len(t.guest.Desc.IsolatedDevices); i++ {
if t.guest.Desc.IsolatedDevices[i].Id == dev.Id {
for j := 0; j < len(t.guest.Desc.IsolatedDevices[i].VfioDevs); j++ {
pciaddr := t.guest.Desc.IsolatedDevices[i].VfioDevs[j].PCIAddr
if pciaddr != nil {
if e := t.guest.pciAddrs.ReleasePCIAddress(pciaddr); e != nil {
log.Errorf("failed release vfio pci address %s", pciaddr)
}
}
}
break
}
}
if i < len(t.guest.Desc.IsolatedDevices) {
// remove device
t.guest.Desc.IsolatedDevices = append(t.guest.Desc.IsolatedDevices[:i], t.guest.Desc.IsolatedDevices[i+1:]...)
}
}
t.syncDevice()
}
devObj := hostinfo.Instance().IsolatedDeviceMan.GetDeviceByIdent(dev.VendorDeviceId, dev.Addr, dev.MdevId)
if devObj == nil {
cb(fmt.Sprintf("Not found host isolated_device by %s %s", dev.VendorDeviceId, dev.Addr))
return
}
opts, err := devObj.GetHotUnplugOptions(dev)
if err != nil {
cb(errors.Wrap(err, "GetHotPlugOptions").Error())
return
}
t.delDeviceCallBack(opts, 0, cb)
}
func (t *SGuestIsolatedDeviceSyncTask) addDevice(dev *desc.SGuestIsolatedDevice) {
var err error
devObj := hostinfo.Instance().IsolatedDeviceMan.GetDeviceByIdent(dev.VendorDeviceId, dev.Addr, dev.MdevId)
if devObj == nil {
err = errors.Errorf("Not found host isolated_device by %s %s", dev.VendorDeviceId, dev.Addr)
log.Errorln(err)
t.errors = append(t.errors, err)
t.syncDevice()
return
}
var setupScript string
if dev.DevType == api.NIC_TYPE {
setupScript, err = t.guest.sriovNicAttachInitScript(dev.NetworkIndex, devObj)
if err != nil {
err = errors.Errorf("sriovNicAttachInitScript %s", err)
log.Errorln(err)
t.errors = append(t.errors, err)
t.syncDevice()
return
}
}
onFail := func(err error) {
for i := 0; i < len(dev.VfioDevs); i++ {
if dev.VfioDevs[i].PCIAddr != nil {
if eRelease := t.guest.pciAddrs.ReleasePCIAddress(dev.VfioDevs[i].PCIAddr); eRelease != nil {
log.Errorf("failed release pci pci address %s: %s", dev.VfioDevs[i].PCIAddr, eRelease)
}
}
}
log.Errorln(err)
t.errors = append(t.errors, err)
t.syncDevice()
return
}
var cType *desc.PCI_CONTROLLER_TYPE
if dev.DevType == api.USB_TYPE {
dev.Usb = desc.NewUsbDevice("usb-host", devObj.GetQemuId())
dev.Usb.Options = devObj.GetPassthroughOptions()
} else {
cType = t.guest.getVfioDeviceHotPlugPciControllerType()
if cType == nil {
log.Errorf("no hotplugable pci controller found")
t.errors = append(t.errors, errors.Errorf("no hotplugable pci controller found"))
t.syncDevice()
return
}
id := devObj.GetQemuId()
dev.VfioDevs = make([]*desc.VFIODevice, 0)
vfioDev := desc.NewVfioDevice(
*cType, "vfio-pci", id, devObj.GetAddr(), dev.DevType == api.GPU_VGA_TYPE,
)
dev.VfioDevs = append(dev.VfioDevs, vfioDev)
groupDevAddrs := devObj.GetIOMMUGroupRestAddrs()
for j := 0; j < len(groupDevAddrs); j++ {
gid := fmt.Sprintf("%s-%d", id, j+1)
vfioDev = desc.NewVfioDevice(*cType, "vfio-pci", gid, groupDevAddrs[j], false)
dev.VfioDevs = append(dev.VfioDevs, vfioDev)
}
multiFunc := true
err := t.guest.ensureDevicePciAddress(dev.VfioDevs[0].PCIDevice, 0, &multiFunc)
if err != nil {
err = errors.Wrapf(err, "ensure isolated device %s pci address", dev.VfioDevs[0].PCIAddr)
} else {
for j := 1; j < len(dev.VfioDevs); j++ {
dev.VfioDevs[j].PCIAddr = dev.VfioDevs[0].PCIAddr.Copy()
err = t.guest.ensureDevicePciAddress(dev.VfioDevs[j].PCIDevice, j, nil)
if err != nil {
err = errors.Wrapf(err, "ensure isolated device %s pci address", dev.VfioDevs[j].PCIAddr)
break
}
}
}
if err != nil {
onFail(err)
return
}
}
cb := func(res string) {
if len(res) > 0 {
onFail(fmt.Errorf("device add failed: %s", res))
} else {
t.guest.Desc.IsolatedDevices = append(t.guest.Desc.IsolatedDevices, dev)
t.syncDevice()
}
}
if len(setupScript) > 0 {
output, err := procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", setupScript).Output()
if err != nil {
log.Errorf("isolated device setup error %s, %s", output, err)
}
}
opts, err := devObj.GetHotPlugOptions(dev, t.guest.Desc)
if err != nil {
cb(errors.Wrap(err, "GetHotPlugOptions").Error())
return
}
t.addDeviceCallBack(opts, 0, cb)
}
func (t *SGuestIsolatedDeviceSyncTask) addDeviceCallBack(opts []*isolated_device.HotPlugOption, idx int, onAddFinish func(string)) {
if idx >= len(opts) {
onAddFinish("")
return
}
opt := opts[idx]
t.guest.Monitor.DeviceAdd(opt.Device, opt.Options, func(err string) {
if err != "" {
onAddFinish(fmt.Sprintf("monitor add %d device: %s", idx, err))
return
}
t.addDeviceCallBack(opts, idx+1, onAddFinish)
})
}
func (t *SGuestIsolatedDeviceSyncTask) delDeviceCallBack(opts []*isolated_device.HotUnplugOption, idx int, onDelFinish func(string)) {
if idx >= len(opts) {
onDelFinish("")
return
}
opt := opts[idx]
t.guest.Monitor.DeviceDel(opt.Id, func(err string) {
if err != "" {
onDelFinish(fmt.Sprintf("monitor del %d device: %s", idx, err))
return
}
t.delDeviceCallBack(opts, idx+1, onDelFinish)
})
}
/**
* GuestLiveMigrateTask
**/
type SGuestLiveMigrateTask struct {
*SKVMGuestInstance
ctx context.Context
params *SLiveMigrate
c chan struct{}
timeoutAt time.Time
doTimeoutMigrate bool
cancelled bool
expectDowntime int64
dirtySyncCount int64
diskDriverMirrorIndex int
onBlockJobsCancelled func()
totalTransferMb int64
}
func NewGuestLiveMigrateTask(
ctx context.Context, guest *SKVMGuestInstance, params *SLiveMigrate,
) *SGuestLiveMigrateTask {
task := &SGuestLiveMigrateTask{SKVMGuestInstance: guest, ctx: ctx, params: params}
task.expectDowntime = 300 // qemu default downtime 300ms
task.MigrateTask = task
task.totalTransferMb = task.Desc.Mem
for i := 0; i < len(task.Desc.Disks); i++ {
if utils.IsInStringArray(task.Desc.Disks[i].StorageType, api.STORAGE_LOCAL_TYPES) {
task.totalTransferMb += int64(task.Desc.Disks[i].Size)
}
}
return task
}
func (s *SGuestLiveMigrateTask) Start() {
s.Monitor.MigrateSetCapability("zero-blocks", "on", s.onSetZeroBlocks)
}
func (s *SGuestLiveMigrateTask) onSetZeroBlocks(res string) {
if strings.Contains(strings.ToLower(res), "error") {
s.migrateFailed(fmt.Sprintf("Migrate set capability zero-blocks error: %s", res))
return
}
// https://wiki.qemu.org/Features/AutoconvergeLiveMigration
s.Monitor.MigrateSetCapability("auto-converge", "on", s.onSetAutoConverge)
}
func (s *SGuestLiveMigrateTask) onSetAutoConverge(res string) {
if strings.Contains(strings.ToLower(res), "error") {
s.migrateFailed(fmt.Sprintf("Migrate set capability zero-blocks error: %s", res))
return
}
s.Monitor.MigrateSetParameter("cpu-throttle-initial", options.HostOptions.LiveMigrateCpuThrottleInitial, s.onSetCpuThrottleInitial)
}
func (s *SGuestLiveMigrateTask) onSetCpuThrottleInitial(res string) {
if strings.Contains(strings.ToLower(res), "error") {
s.migrateFailed(fmt.Sprintf("Migrate set params cpu-throttle-initial error: %s", res))
return
}
s.Monitor.MigrateSetParameter("cpu-throttle-increment", options.HostOptions.LiveMigrateCpuThrottleIncrement, s.onSetCpuThrottleIncrement)
}
func (s *SGuestLiveMigrateTask) onSetCpuThrottleIncrement(res string) {
if strings.Contains(strings.ToLower(res), "error") {
s.migrateFailed(fmt.Sprintf("Migrate set params cpu-throttle-increment error: %s", res))
return
}
s.Monitor.MigrateSetCapability("events", "on", s.onMigrateEnableEvents)
}
func (s *SGuestLiveMigrateTask) onMigrateEnableEvents(res string) {
if strings.Contains(strings.ToLower(res), "error") {
s.migrateFailed(fmt.Sprintf("Migrate set capability events error: %s", res))
return
}
s.Monitor.MigrateSetCapability("pause-before-switchover", "on", s.onMigrateSetPauseBeforeSwitchover)
}
func (s *SGuestLiveMigrateTask) onMigrateSetPauseBeforeSwitchover(res string) {
if strings.Contains(strings.ToLower(res), "error") {
s.migrateFailed(fmt.Sprintf("Migrate set capability pause-before-switchover error: %s", res))
return
}
if version.LT(s.QemuVersion, "4.0.0") {
s.startMigrate()
return
}
if s.params.EnableTLS {
s.Monitor.MigrateSetCapability("multifd", "off", s.onSetMulitfd)
return
}
log.Infof("migrate src guest enable multifd")
s.Monitor.MigrateSetCapability("multifd", "on", s.onSetMulitfd)
}
func (s *SGuestLiveMigrateTask) onSetMulitfd(res string) {
if strings.Contains(strings.ToLower(res), "error") {
s.migrateFailed(fmt.Sprintf("Migrate set capability multifd error: %s", res))
return
}
s.startMigrate()
}
func (s *SGuestLiveMigrateTask) startRamMigrateTimeout() {
if !s.timeoutAt.IsZero() {
// timeout has been set
return
}
memMb := s.Desc.Mem
migSeconds := int(memMb) / options.HostOptions.MigrateExpectRate
if migSeconds < options.HostOptions.MinMigrateTimeoutSeconds {
migSeconds = options.HostOptions.MinMigrateTimeoutSeconds
}
s.timeoutAt = time.Now().Add(time.Second * time.Duration(migSeconds))
log.Infof("migrate timeout seconds: %d now: %v expectfinial: %v", migSeconds, time.Now(), s.timeoutAt)
}
func (s *SGuestLiveMigrateTask) startMigrate() {
if s.params.EnableTLS {
// https://wiki.qemu.org/Features/MigrationTLS
// first remove possible existing tls0
s.Monitor.ObjectDel("tls0", func(res string) {
log.Infof("cleanup possible existing tls0: %s", res)
s.Monitor.ObjectAdd("tls-creds-x509", map[string]string{
"dir": s.getPKIDirPath(),
"endpoint": "client",
"id": "tls0",
"verify-peer": "no",
}, func(res string) {
if strings.Contains(strings.ToLower(res), "error") {
s.migrateFailed(fmt.Sprintf("Migrate add tls-creds-x509 object client tls0 error: %s", res))
return
}
s.Monitor.MigrateSetParameter("tls-creds", "tls0", func(res string) {
if strings.Contains(strings.ToLower(res), "error") {
s.migrateFailed(fmt.Sprintf("Migrate set tls-creds tls0 error: %s", res))
return
}
s.doMigrate()
})
})
})
} else {
s.Monitor.MigrateSetParameter("tls-creds", "", func(res string) {
if strings.Contains(strings.ToLower(res), "error") {
s.migrateFailed(fmt.Sprintf("Migrate set tls-creds to empty error: %s", res))
return
}
s.doMigrate()
})
}
}
func (s *SGuestLiveMigrateTask) waitMirrorJobsReady() {
cb := func(jobs []monitor.BlockJob) {
var allReady = true
var remaining = s.Desc.Mem * 1024 * 1024
var mbps float64
for i := 0; i < len(jobs); i++ {
if jobs[i].Status != "ready" {
allReady = false
remaining += (jobs[i].Len - jobs[i].Offset)
mbps += float64(jobs[i].Speed) / 1024 / 1024
}
}
if !allReady {
progress := (1 - float64(remaining)/float64(s.totalTransferMb*1024*1024)) * 100.0
hostutils.UpdateServerProgress(context.Background(), s.Id, progress, mbps)
time.Sleep(time.Second * 3)
s.waitMirrorJobsReady()
return
}
s.Monitor.Migrate(fmt.Sprintf("tcp:%s", net.JoinHostPort(s.params.DestIp, strconv.Itoa(s.params.DestPort))),
false, false, s.setMaxBandwidth)
}
s.Monitor.GetBlockJobs(cb)
}
func (s *SGuestLiveMigrateTask) mirrorDisks(res string) {
if len(res) > 0 {
log.Errorf("disk %d driver mirror failed %s", s.diskDriverMirrorIndex, res)
s.onDriveMirrorDisksFailed(res)
return
}
if s.diskDriverMirrorIndex == len(s.Desc.Disks) {
s.waitMirrorJobsReady()
return
}
i := s.diskDriverMirrorIndex
s.diskDriverMirrorIndex += 1
storageType := s.Desc.Disks[i].StorageType
if storageType == "" {
storageType = storageman.GetManager().GetStorage(s.Desc.Disks[i].StorageId).StorageType()
}
if utils.IsInStringArray(storageType, api.STORAGE_LOCAL_TYPES) {
var drive = fmt.Sprintf("drive_%d", s.Desc.Disks[i].Index)
var target = fmt.Sprintf("nbd:%s:%d:exportname=drive_%d", s.params.DestIp, s.params.NbdServerPort, s.Desc.Disks[i].Index)
var speed int64 = 0
if s.params.MaxBandwidthMB != nil {
speed = *s.params.MaxBandwidthMB * 1024 * 1024
}
s.Monitor.DriveMirror(s.mirrorDisks, drive, target, "top", s.Desc.Disks[i].Format, true, false, speed)
} else {
s.mirrorDisks("")
}
}
func (s *SGuestLiveMigrateTask) onDriveMirrorDisksFailed(res string) {
s.migrateFailed(fmt.Sprintf("Migrate error: %s", res))
}
func (s *SGuestLiveMigrateTask) doMigrate() {
if s.params.NbdServerPort > 0 {
s.mirrorDisks("")
} else {
var copyIncremental = false
if s.params.IsLocal {
// copy disk data
copyIncremental = true
}
s.Monitor.Migrate(fmt.Sprintf("tcp:%s", net.JoinHostPort(s.params.DestIp, strconv.Itoa(s.params.DestPort))),
copyIncremental, false, s.setMaxBandwidth)
}
}
func (s *SGuestLiveMigrateTask) setMaxBandwidth(res string) {
if strings.Contains(strings.ToLower(res), "error") {
s.migrateFailed(fmt.Sprintf("Migrate set capability auto-converge error: %s", res))
return
}
// default set bandwidth no limit
var maxBandwidth int64 = math.MaxInt64
if s.params.MaxBandwidthMB != nil && *s.params.MaxBandwidthMB > 0 {
maxBandwidth = *s.params.MaxBandwidthMB * 1024 * 1024
}
s.Monitor.MigrateSetParameter("max-bandwidth", maxBandwidth, s.startMigrateStatusCheck)
}
func (s *SGuestLiveMigrateTask) startMigrateStatusCheck(res string) {
if strings.Contains(strings.ToLower(res), "error") {
s.migrateFailed(fmt.Sprintf("Migrate error: %s", res))
return
}
s.startRamMigrateTimeout()
s.c = make(chan struct{})
for s.c != nil {
select {
case <-s.c: // on c close
s.c = nil
break
case <-time.After(time.Second * 5):
if s.Monitor != nil {
s.Monitor.GetMigrateStats(s.onGetMigrateStats)
} else {
log.Errorf("server %s(%s) migrate stopped unexpectedly", s.GetId(), s.GetName())
s.migrateFailed(fmt.Sprintf("Migrate error: %s", res))
return
}
}
}
}
func (s *SGuestLiveMigrateTask) onGetMigrateStats(stats *monitor.MigrationInfo, err error) {
if err != nil {
log.Errorf("%s get migrate stats failed %s", s.GetName(), err)
s.migrateFailed(fmt.Sprintf("%s get migrate stats failed %s", s.GetName(), err))
return
}
s.onGetMigrateStatus(stats)
}
/*
{ 'enum': 'MigrationStatus',
'data': [ 'none', 'setup', 'cancelling', 'cancelled',
'active', 'postcopy-active', 'postcopy-paused',
'postcopy-recover', 'completed', 'failed', 'colo',
'pre-switchover', 'device', 'wait-unplug' ] }
*/
func (s *SGuestLiveMigrateTask) onGetMigrateStatus(stats *monitor.MigrationInfo) {
status := string(*stats.Status)
if status == "completed" {
jsonStats := jsonutils.Marshal(stats)
log.Infof("migration info %s", jsonStats)
} else if status == "failed" {
s.migrateFailed(fmt.Sprintf("Query migrate got status: %s", status))
} else if status == "cancelled" {
s.migrateFailed(status)
} else if status == "active" {
if !s.doTimeoutMigrate && s.timeoutAt.Before(time.Now()) {
s.Monitor.SimpleCommand("stop", s.onMigrateStartPostcopy)
s.doTimeoutMigrate = true
}
if s.doTimeoutMigrate {
return
}
var (
ramRemain int64
mbps float64
)
if stats.RAM != nil {
mbps = stats.RAM.Mbps
ramRemain = stats.RAM.Remaining
}
if ramRemain > 0 {
progress := (1 - float64(ramRemain)/float64(s.totalTransferMb*1024*1024)) * 100.0
hostutils.UpdateServerProgress(context.Background(), s.Id, progress, mbps)
}
if s.params.QuicklyFinish && stats.RAM != nil && stats.RAM.Remaining > 0 {
if stats.CPUThrottlePercentage == nil {
// qemu do not enable cpu throttle, don't need set downtime
return
}
if *stats.CPUThrottlePercentage < options.HostOptions.LiveMigrateCpuThrottleMax {
return
}
if stats.ExpectedDowntime != nil && *stats.ExpectedDowntime > s.expectDowntime {
if s.dirtySyncCount == 0 {
// record dirty sync count
s.dirtySyncCount = stats.RAM.DirtySyncCount
return
}
// run more than one round dirty ram sync after cpu throttle 99%
if stats.RAM.DirtySyncCount <= s.dirtySyncCount+1 {
return
}
cb := func(res string) {
if len(res) == 0 {
s.expectDowntime = *stats.ExpectedDowntime
log.Infof("migrate update downtime to %d", *stats.ExpectedDowntime)
} else {
log.Errorf("failed set migrate downtime %s", res)
}
}
s.Monitor.MigrateSetDowntime(float64(*stats.ExpectedDowntime)/1000.0, cb)
}
}
}
}
func (s *SGuestLiveMigrateTask) onMigrateStartPostcopy(res string) {
if strings.Contains(strings.ToLower(res), "error") {
s.migrateFailed(fmt.Sprintf("onMigrateStartPostcopy error: %s", res))
return
} else {
log.Infof("onMigrateStartPostcopy success")
}
}
func (s *SGuestLiveMigrateTask) migrateContinueFromPreSwitchover() {
s.Monitor.MigrateContinue("pre-switchover", s.onMigrateContinue)
}
func (s *SGuestLiveMigrateTask) onMigrateContinue(res string) {
if len(res) > 0 {
s.migrateFailed(res)
}
}
func (s *SGuestLiveMigrateTask) onMigrateReceivedPreSwitchoverEvent() {
s.onBlockJobsCancelled = s.migrateContinueFromPreSwitchover
s.cancelBlockJobs("")
}
func (s *SGuestLiveMigrateTask) onMigrateReceivedBlockJobError(res string) {
if !s.cancelled {
s.migrateFailed(res)
}
}
func (s *SGuestLiveMigrateTask) migrateComplete(stats jsonutils.JSONObject) {
s.MigrateTask = nil
if s.c != nil {
close(s.c)
s.c = nil
}
s.Monitor.Disconnect()
s.Monitor = nil
res := jsonutils.NewDict()
if stats != nil {
res.Set("migration_info", stats)
}
hostutils.TaskComplete(s.ctx, res)
hostutils.UpdateServerProgress(context.Background(), s.Id, 0.0, 0)
}
func (s *SGuestLiveMigrateTask) cancelBlockJobs(res string) {
log.Infof("%s cancel block jobs %s", s.GetName(), res)
if s.diskDriverMirrorIndex == 0 {
s.onBlockJobsCancelled()
return
}
s.diskDriverMirrorIndex -= 1
i := s.diskDriverMirrorIndex
if utils.IsInStringArray(s.Desc.Disks[i].StorageType, api.STORAGE_LOCAL_TYPES) {
s.Monitor.CancelBlockJob(fmt.Sprintf("drive_%d", s.Desc.Disks[i].Index), false, s.cancelBlockJobs)
} else {
s.cancelBlockJobs("")
}
}
func (s *SGuestLiveMigrateTask) migrateFailed(msg string) {
s.onBlockJobsCancelled = func() {
s.onMigrateFailBlockJobsCancelled(msg)
}
s.cancelBlockJobs("")
}
func (s *SGuestLiveMigrateTask) onMigrateFailBlockJobsCancelled(msg string) {
cleanup := func() {
s.MigrateTask = nil
if s.c != nil {
close(s.c)
s.c = nil
}
hostutils.TaskFailed(s.ctx, msg)
}
if s.params.EnableTLS {
s.Monitor.ObjectDel("tls0", func(res string) {
log.Infof("cleanup possible existing tls0: %s", res)
cleanup()
})
} else {
cleanup()
}
}
func (s *SGuestLiveMigrateTask) SetLiveMigrateCancelled() {
s.cancelled = true
}
/**
* GuestResumeTask
**/
type SGuestResumeTask struct {
*SKVMGuestInstance
ctx context.Context
startTime time.Time
isTimeout bool
cleanTLS bool
resumed bool
isResumeFromMigrate bool
getTaskData func() (jsonutils.JSONObject, error)
}
func NewGuestResumeTask(ctx context.Context, s *SKVMGuestInstance, isTimeout bool, cleanTLS bool) *SGuestResumeTask {
return &SGuestResumeTask{
SKVMGuestInstance: s,
ctx: ctx,
isTimeout: isTimeout,
cleanTLS: cleanTLS,
getTaskData: nil,
}
}
func (s *SGuestResumeTask) Start() {
log.Debugf("[%s] GuestResumeTask start", s.GetId())
s.startTime = time.Now()
if s.cleanTLS {
s.Monitor.ObjectDel("tls0", func(res string) {
log.Infof("Clean %s tls0 object: %s", s.GetName(), res)
pkiPath := s.getPKIDirPath()
if err := os.RemoveAll(pkiPath); err != nil {
log.Warningf("Remove tls pki dir %s error: %v", pkiPath, err)
}
s.confirmRunning()
})
return
}
s.confirmRunning()
}
func (s *SGuestResumeTask) GetStateFilePath() string {
return s.SKVMGuestInstance.GetStateFilePath("")
}
func (s *SGuestResumeTask) Stop() {
// TODO
// stop stream disk
s.taskFailed(fmt.Sprintf("[%s] qemu quit unexpectedly on resume", s.GetId()))
}
func (s *SGuestResumeTask) confirmRunning() {
if s.Monitor != nil {
s.Monitor.QueryStatus(s.onConfirmRunning)
} else {
s.taskFailed(fmt.Sprintf("[%s] qemu quit unexpectedly on resume confirmRunning", s.GetId()))
}
}
func (s *SGuestResumeTask) onConfirmRunning(status string) {
log.Infof("[%s] onConfirmRunning status %s", s.GetId(), status)
switch status {
case "paused (prelaunch)":
/* ref: qemu/src/qapi/run-state.json
* prelaunch: QEMU was started with -S and guest has not started.
* we need resume guest at state prelaunch */
if err := s.onGuestPrelaunch(); err != nil {
s.ForceStop()
s.taskFailed(err.Error())
return
}
s.resumeGuest()
case "running", "paused (suspended)":
s.onStartRunning()
case "paused (postmigrate)":
s.isResumeFromMigrate = true
s.resumeGuest()
case "paused (inmigrate)":
// guest is paused waiting for an incoming migration
time.Sleep(time.Second * 1)
s.confirmRunning()
default:
switch {
case strings.Contains(status, "error"):
// handle error first, results may be 'paused (internal-error)'
s.taskFailed(status)
case strings.Contains(status, "paused"):
if s.resumed {
s.taskFailed("resume guest twice")
return
}
if err := s.onGuestPrelaunch(); err != nil {
s.ForceStop()
s.taskFailed(err.Error())
return
}
s.Monitor.GetBlocks(s.onGetBlockInfo)
default:
memMb := s.Desc.Mem
migSeconds := int(memMb) / options.HostOptions.MigrateExpectRate
if migSeconds < options.HostOptions.MinMigrateTimeoutSeconds {
migSeconds = options.HostOptions.MinMigrateTimeoutSeconds
}
log.Infof("start guest timeout seconds: %d", migSeconds)
if s.isTimeout && time.Now().Sub(s.startTime) >= time.Second*time.Duration(migSeconds) {
s.taskFailed("Timeout")
return
} else {
time.Sleep(time.Second * 3)
s.confirmRunning()
}
}
}
}
func (s *SGuestResumeTask) taskFailed(reason string) {
log.Infof("Start guest %s failed: %s", s.Id, reason)
s.ForceStop()
if s.ctx != nil && len(appctx.AppContextTaskId(s.ctx)) > 0 {
hostutils.TaskFailed(s.ctx, reason)
} else {
s.SyncStatus(reason)
}
}
func (s *SGuestResumeTask) onGetBlockInfo(blocks []monitor.QemuBlock) {
log.Debugf("onGetBlockInfo %v", blocks)
// for _, drv := range results.GetArray() {
// // encryption not work
// }
//time.Sleep(time.Second * 1)
s.resumeGuest()
}
func (s *SGuestResumeTask) resumeGuest() {
if s.resumed {
s.taskFailed("resume guest twice")
return
}
if s.Desc.IsVolatileHost {
if err := s.prepareNicsForVolatileGuestResume(); err != nil {
s.taskFailed(err.Error())
return
}
s.Desc.IsVolatileHost = false
SaveLiveDesc(s, s.Desc)
}
s.startTime = time.Now()
s.Monitor.SimpleCommand("cont", s.onResumeSucc)
}
func (s *SGuestResumeTask) onResumeSucc(res string) {
s.resumed = true
s.confirmRunning()
}
func (s *SGuestResumeTask) SetGetTaskData(f func() (jsonutils.JSONObject, error)) {
s.getTaskData = f
}
func (s *SGuestResumeTask) onStartRunning() {
if s.Desc.IsVolatileHost {
s.Desc.IsVolatileHost = false
SaveLiveDesc(s, s.Desc)
}
s.setCgroupPid()
s.removeStatefile()
if s.ctx != nil && len(appctx.AppContextTaskId(s.ctx)) > 0 {
var (
data jsonutils.JSONObject
err error
)
if s.getTaskData != nil {
data, err = s.getTaskData()
if err != nil {
s.taskFailed(err.Error())
return
}
}
hostutils.TaskComplete(s.ctx, data)
if s.isResumeFromMigrate {
return
}
}
disksIdx := s.GetNeedMergeBackingFileDiskIndexs()
if len(disksIdx) > 0 {
s.startStreamDisks(disksIdx)
//s.SyncStatus("")
} else if options.HostOptions.AutoMergeBackingTemplate {
s.SyncStatus("")
timeutils2.AddTimeout(
time.Second*time.Duration(options.HostOptions.AutoMergeDelaySeconds),
func() { s.startStreamDisks(nil) })
} else {
s.SyncStatus("")
s.detachStartupTask()
}
}
func (s *SGuestResumeTask) startStreamDisks(disksIdx []int) {
s.startTime = time.Time{}
s.detachStartupTask()
if s.IsMonitorAlive() {
s.StreamDisks(s.ctx, func() { s.onStreamComplete(disksIdx) }, disksIdx, -1, -1)
}
}
func (s *SGuestResumeTask) onStreamComplete(disksIdx []int) {
if len(disksIdx) == 0 {
// if disks idx length == 0 indicate merge backing template
s.SyncStatus("")
} else {
s.streamDisksComplete(context.Background())
}
}
func (s *SGuestResumeTask) removeStatefile() {
go s.CleanStatefiles()
}
type IGuestBlockProgressTask interface {
OnGetBlockJobs(jobs []monitor.BlockJob)
StreamingDiskCompletedCount() int
StreamingDiskCount() int
}
type SGuestBlockProgressBaseTask struct {
*SKVMGuestInstance
ctx context.Context
totalSizeMb int
completedSizeMb int
c chan struct{}
jobs map[string]monitor.BlockJob
task IGuestBlockProgressTask
}
func NewGuestBlockProgressBaseTask(
ctx context.Context, guest *SKVMGuestInstance, blkTask IGuestBlockProgressTask,
) *SGuestBlockProgressBaseTask {
return &SGuestBlockProgressBaseTask{
SKVMGuestInstance: guest,
ctx: ctx,
task: blkTask,
jobs: map[string]monitor.BlockJob{},
}
}
func (s *SGuestBlockProgressBaseTask) startWaitBlockJob(res string) {
s.c = make(chan struct{})
for {
select {
case <-s.c:
s.c = nil
return
case <-time.After(time.Second * 10):
s.Monitor.GetBlockJobs(s.onGetBlockJobs)
}
}
}
func (s *SGuestBlockProgressBaseTask) onGetBlockJobs(jobs []monitor.BlockJob) {
if len(jobs) == 0 && s.c != nil {
close(s.c)
s.c = nil
}
for i := range jobs {
job := jobs[i]
_job, ok := s.jobs[job.Device]
if !ok {
job.CalcOffset(0)
s.jobs[job.Device] = job
continue
}
if _job.Status == "ready" {
delete(s.jobs, _job.Device)
continue
}
job.Start, job.Now = _job.Start, _job.Now
job.CalcOffset(_job.Offset)
s.jobs[job.Device] = job
}
mbps, progress := 0.0, 0.0
totalSize, totalOffset := int64(1), int64(0)
for _, job := range s.jobs {
mbps += job.SpeedMbps
totalSize += job.Len
totalOffset += job.Offset
}
if len(s.jobs) == 0 && len(jobs) == 0 {
progress = 100.0
} else {
progress = float64(totalOffset) / float64(totalSize) * 100
}
diskCount := s.task.StreamingDiskCount()
streamedDiskCount := s.task.StreamingDiskCompletedCount()
if diskCount > 0 {
progress = float64(streamedDiskCount)/float64(diskCount)*100.0 + 1.0/float64(diskCount)*progress
log.Debugf("stream disk111111 progress %v, streamedDiskCount %v, diskCount %v ", progress, streamedDiskCount, diskCount)
}
hostutils.UpdateServerProgress(context.Background(), s.GetId(), progress, mbps)
s.task.OnGetBlockJobs(jobs)
}
func (s *SGuestBlockProgressBaseTask) cancelWaitBlockJobs() {
if s.c != nil {
close(s.c)
s.c = nil
}
}
/**
* GuestStreamDisksTask
**/
type SGuestStreamDisksTask struct {
*SGuestBlockProgressBaseTask
callback func()
disksIdx []int
c chan struct{}
streamDevs []string
lvmBacking []string
progressTotalDiskCnt int
progressCompletedDiskCnt int
}
func NewGuestStreamDisksTask(ctx context.Context, guest *SKVMGuestInstance, callback func(), disksIdx []int, totalCnt, completedCnt int) *SGuestStreamDisksTask {
task := &SGuestStreamDisksTask{
callback: callback,
disksIdx: disksIdx,
progressTotalDiskCnt: totalCnt,
progressCompletedDiskCnt: completedCnt,
}
task.SGuestBlockProgressBaseTask = NewGuestBlockProgressBaseTask(ctx, guest, task)
return task
}
func (s *SGuestStreamDisksTask) Start() {
s.Monitor.GetBlockJobCounts(s.onInitCheckStreamJobs)
}
func (s *SGuestStreamDisksTask) onInitCheckStreamJobs(jobs int) {
if jobs > 0 {
log.Warningf("GuestStreamDisksTask: duplicate block streaming???")
s.startWaitBlockJob("")
} else if jobs == 0 {
s.startBlockStreaming()
}
}
func (s *SGuestStreamDisksTask) startBlockStreaming() {
s.checkBlockDrives()
}
func (s *SGuestStreamDisksTask) checkBlockDrives() {
s.Monitor.GetBlocks(s.onBlockDrivesSucc)
}
func (s *SGuestStreamDisksTask) onBlockDrivesSucc(blocks []monitor.QemuBlock) {
s.streamDevs = []string{}
s.lvmBacking = []string{}
for _, block := range blocks {
if len(block.Inserted.File) > 0 && len(block.Inserted.BackingFile) > 0 {
if len(s.disksIdx) > 0 {
var stream = false
idx := block.Device[len(block.Device)-1] - '0'
for i := 0; i < len(s.disksIdx); i++ {
if int(idx) == s.disksIdx[i] {
stream = true
}
}
if !stream {
continue
}
}
s.streamDevs = append(s.streamDevs, block.Device)
disk, err := storageman.GetManager().GetDiskByPath(block.Inserted.File)
if err == nil && disk.GetType() == api.STORAGE_SLVM {
s.lvmBacking = append(s.lvmBacking, block.Inserted.BackingFile)
} else {
log.Errorf("failed get disk by path %s: %s", block.Inserted.File, err)
}
}
}
log.Infof("Stream devices %s: %v , backingfiles %v", s.GetName(), s.streamDevs, s.lvmBacking)
if len(s.streamDevs) == 0 {
s.taskComplete()
} else {
if len(s.disksIdx) == 0 {
for i := range s.streamDevs {
dev := s.streamDevs[i]
idx := dev[len(dev)-1] - '0'
s.disksIdx = append(s.disksIdx, int(idx))
}
}
s.startDoBlockStream()
s.SyncStatus("")
}
}
func (s *SGuestStreamDisksTask) startDoBlockStream() {
if len(s.streamDevs) > 0 {
dev := s.streamDevs[0]
s.streamDevs = s.streamDevs[1:]
s.Monitor.BlockStream(dev, s.startWaitBlockJob)
} else {
s.taskComplete()
}
}
func (s *SGuestStreamDisksTask) StreamingDiskCompletedCount() int {
completedCnt := len(s.disksIdx) - len(s.streamDevs) - 1
if s.progressCompletedDiskCnt > 0 {
completedCnt += s.progressCompletedDiskCnt
}
return completedCnt
}
func (s *SGuestStreamDisksTask) StreamingDiskCount() int {
if s.progressTotalDiskCnt > 0 {
return s.progressTotalDiskCnt
}
return len(s.disksIdx)
}
func (s *SGuestStreamDisksTask) OnGetBlockJobs(jobs []monitor.BlockJob) {
if len(jobs) == 0 {
s.cancelWaitBlockJobs()
if s.streamDevs == nil {
s.checkBlockDrives()
} else {
s.startDoBlockStream()
}
}
}
func (s *SGuestStreamDisksTask) deactivateLvmBackingFile() {
for _, lvPath := range s.lvmBacking {
storageman.TryDeactivateBackingLvs(lvPath)
}
}
func (s *SGuestStreamDisksTask) taskComplete() {
s.deactivateLvmBackingFile()
hostutils.UpdateServerProgress(context.Background(), s.Id, 100.0, 0.0)
s.SyncStatus("Guest Disks Block Stream Complete")
if s.callback != nil {
s.callback()
}
}
/**
* GuestReloadDiskTask
**/
type SGuestReloadDiskTask struct {
*SKVMGuestInstance
ctx context.Context
disk storageman.IDisk
}
func NewGuestReloadDiskTask(
ctx context.Context, s *SKVMGuestInstance, disk storageman.IDisk,
) *SGuestReloadDiskTask {
return &SGuestReloadDiskTask{
SKVMGuestInstance: s,
ctx: ctx,
disk: disk,
}
}
func (s *SGuestReloadDiskTask) WaitSnapshotReplaced(callback func()) error {
var retry = 0
for {
retry += 1
if retry == 300 {
return fmt.Errorf(
"SnapshotDeleteJob.deleting_disk_snapshot always has %s", s.disk.GetId())
}
if _, ok := storageman.DELETEING_SNAPSHOTS.Load(s.disk.GetId()); ok {
time.Sleep(time.Second * 1)
} else {
break
}
}
callback()
return nil
}
func (s *SGuestReloadDiskTask) Start() {
s.fetchDisksInfo(s.startReloadDisk)
}
func (s *SGuestReloadDiskTask) fetchDisksInfo(callback func(string)) {
s.Monitor.GetBlocks(func(blocks []monitor.QemuBlock) { s.onGetBlocksSucc(blocks, callback) })
}
func (s *SGuestReloadDiskTask) onGetBlocksSucc(blocks []monitor.QemuBlock, callback func(string)) {
var device string
for i := range blocks {
device = s.getDiskOfDrive(blocks[i])
if len(device) > 0 {
callback(device)
break
}
}
if len(device) == 0 {
s.taskFailed("Device not found")
}
}
func (s *SGuestReloadDiskTask) getDiskOfDrive(block monitor.QemuBlock) string {
if len(block.Inserted.File) == 0 {
return ""
}
filePath, err := qemuimg.ParseQemuFilepath(block.Inserted.File)
if err != nil {
log.Errorf("qemuimg.ParseQemuFilepath %s fail %s", block.Inserted.File, err)
return ""
}
if filePath == s.disk.GetPath() {
return block.Device
}
return ""
}
func (s *SGuestReloadDiskTask) startReloadDisk(device string) {
s.doReloadDisk(device, s.onReloadSucc)
}
func (s *SGuestReloadDiskTask) doReloadDisk(device string, callback func(string)) {
s.Monitor.SimpleCommand("stop", func(string) {
path := s.disk.GetPath()
if s.isEncrypted() {
path = qemuimg.GetQemuFilepath(path, "sec0", qemuimg.EncryptFormatLuks)
}
s.Monitor.ReloadDiskBlkdev(device, path, callback)
})
}
func (s *SGuestReloadDiskTask) onReloadSucc(err string) {
if len(err) > 0 {
log.Errorf("monitor new snapshot blkdev error: %s", err)
}
s.Monitor.SimpleCommand("cont", s.onResumeSucc)
}
func (s *SGuestReloadDiskTask) onResumeSucc(results string) {
log.Infof("guest reload disk task resume succ %s", results)
params := jsonutils.NewDict()
params.Set("reopen", jsonutils.JSONTrue)
hostutils.TaskComplete(s.ctx, params)
}
func (s *SGuestReloadDiskTask) taskFailed(reason string) {
hostutils.TaskFailed(s.ctx, reason)
}
/**
* GuestDiskSnapshotTask
**/
type SGuestDiskSnapshotTask struct {
*SGuestReloadDiskTask
snapshotId string
}
func NewGuestDiskSnapshotTask(
ctx context.Context, s *SKVMGuestInstance, disk storageman.IDisk, snapshotId string,
) *SGuestDiskSnapshotTask {
return &SGuestDiskSnapshotTask{
SGuestReloadDiskTask: NewGuestReloadDiskTask(ctx, s, disk),
snapshotId: snapshotId,
}
}
func (s *SGuestDiskSnapshotTask) Start() {
s.fetchDisksInfo(s.startSnapshot)
}
func (s *SGuestDiskSnapshotTask) startSnapshot(device string) {
s.doReloadDisk(device, s.onReloadBlkdevSucc)
}
func (s *SGuestDiskSnapshotTask) onReloadBlkdevSucc(res string) {
var cb = s.onResumeSucc
if len(res) > 0 {
cb = func(string) {
s.onSnapshotBlkdevFail(fmt.Sprintf("onReloadBlkdevFail: %s", res))
}
}
s.Monitor.SimpleCommand("cont", cb)
}
func (s *SGuestDiskSnapshotTask) onSnapshotBlkdevFail(reason string) {
// rollback snapshot to disk file
if err := s.disk.RollbackDiskOnSnapshotFail(s.snapshotId); err != nil {
log.Errorf("failed do rollback %s", err)
}
hostutils.TaskFailed(s.ctx, fmt.Sprintf("Reload blkdev error: %s", reason))
}
func (s *SGuestDiskSnapshotTask) onResumeSucc(res string) {
log.Infof("guest disk snapshot task resume succ %s", res)
snapshotLocation := path.Join(s.disk.GetSnapshotLocation(), s.snapshotId)
body := jsonutils.NewDict()
body.Set("location", jsonutils.NewString(snapshotLocation))
hostutils.TaskComplete(s.ctx, body)
}
/**
* GuestSnapshotDeleteTask
**/
type SGuestSnapshotDeleteTask struct {
*SGuestReloadDiskTask
deleteSnapshot string
convertSnapshot string
blockStream bool
encryptInfo apis.SEncryptInfo
tmpPath string
delSnapshotPathAfterReload func() error
}
func NewGuestSnapshotDeleteTask(
ctx context.Context, s *SKVMGuestInstance, disk storageman.IDisk,
deleteSnapshot, convertSnapshot string, blockStream bool, encryptInfo apis.SEncryptInfo,
) *SGuestSnapshotDeleteTask {
return &SGuestSnapshotDeleteTask{
SGuestReloadDiskTask: NewGuestReloadDiskTask(ctx, s, disk),
deleteSnapshot: deleteSnapshot,
convertSnapshot: convertSnapshot,
blockStream: blockStream,
encryptInfo: encryptInfo,
}
}
func (s *SGuestSnapshotDeleteTask) Start(totalDeleteSnapshotCount, deletedSnapshotCount int) {
if s.blockStream {
s.startBlockStream(totalDeleteSnapshotCount, deletedSnapshotCount)
return
}
cb, err := s.disk.ConvertSnapshotRelyOnReloadDisk(s.convertSnapshot, s.encryptInfo)
if err != nil {
s.taskFailed(err.Error())
return
}
s.delSnapshotPathAfterReload = cb
s.fetchDisksInfo(s.doReloadDisk)
}
func (s *SGuestSnapshotDeleteTask) startBlockStream(totalDeleteSnapshotCount, deletedSnapshotCount int) {
diskIdx := []int{}
for i := range s.Desc.Disks {
if s.Desc.Disks[i].DiskId == s.disk.GetId() {
diskIdx = append(diskIdx, int(s.Desc.Disks[i].Index))
break
}
}
s.StreamDisks(s.ctx, s.onStreamDiskComplete, diskIdx, totalDeleteSnapshotCount, deletedSnapshotCount)
}
func (s *SGuestSnapshotDeleteTask) onStreamDiskComplete() {
// remove snapshot file
if err := s.disk.DoDeleteSnapshot(s.deleteSnapshot); err != nil {
hostutils.TaskFailed(s.ctx, err.Error())
return
}
body := jsonutils.NewDict()
body.Set("deleted", jsonutils.JSONTrue)
hostutils.TaskComplete(s.ctx, body)
}
func (s *SGuestSnapshotDeleteTask) doReloadDisk(device string) {
s.SGuestReloadDiskTask.doReloadDisk(device, s.onReloadBlkdevSucc)
}
func (s *SGuestSnapshotDeleteTask) onReloadBlkdevSucc(err string) {
if s.delSnapshotPathAfterReload != nil {
if e := s.delSnapshotPathAfterReload(); e != nil {
log.Errorf("failed do delSnapshotPathAfterReload: %s", e)
}
}
var callback = s.onResumeSucc
if len(err) > 0 {
callback = func(string) {
s.onSnapshotBlkdevFail(fmt.Sprintf("onReloadBlkdevFail %s", err))
}
}
s.Monitor.SimpleCommand("cont", callback)
}
func (s *SGuestSnapshotDeleteTask) onSnapshotBlkdevFail(res string) {
snapshotPath := path.Join(s.disk.GetSnapshotDir(), s.convertSnapshot)
if output, err := procutils.NewCommand("mv", "-f", s.tmpPath, snapshotPath).Output(); err != nil {
log.Errorf("mv %s to %s failed: %s, %s", s.tmpPath, snapshotPath, err, output)
}
s.taskFailed(fmt.Sprintf("Reload blkdev failed %s", res))
}
func (s *SGuestSnapshotDeleteTask) onResumeSucc(res string) {
log.Infof("guest do new snapshot task resume succ %s", res)
if len(s.tmpPath) > 0 {
output, err := procutils.NewCommand("rm", "-f", s.tmpPath).Output()
if err != nil {
log.Errorf("rm %s failed: %s, %s", s.tmpPath, err, output)
}
}
s.disk.DoDeleteSnapshot(s.deleteSnapshot)
body := jsonutils.NewDict()
body.Set("deleted", jsonutils.JSONTrue)
hostutils.TaskComplete(s.ctx, body)
}
/**
* GuestDriveMirrorTask
**/
type SDriveMirrorTask struct {
*SKVMGuestInstance
ctx context.Context
nbdUri string
onSucc func()
syncMode string
index int
blockReplication bool
}
func NewDriveMirrorTask(
ctx context.Context, s *SKVMGuestInstance, nbdUri, syncMode string,
blockReplication bool, onSucc func(),
) *SDriveMirrorTask {
return &SDriveMirrorTask{
SKVMGuestInstance: s,
ctx: ctx,
nbdUri: nbdUri,
syncMode: syncMode,
onSucc: onSucc,
blockReplication: blockReplication,
}
}
func (s *SDriveMirrorTask) Start() {
s.startMirror("")
}
func (s *SDriveMirrorTask) supportBlockReplication() bool {
c := make(chan bool)
s.Monitor.HumanMonitorCommand("help drive_mirror", func(res string) {
if strings.Index(res, "[-c]") > 0 {
c <- true
} else {
c <- false
}
})
return <-c
}
func (s *SDriveMirrorTask) startMirror(res string) {
log.Infof("drive mirror results:%s", res)
if len(res) > 0 {
hostutils.TaskFailed(s.ctx, res)
return
}
var blockReplication = false
if s.blockReplication && s.supportBlockReplication() {
blockReplication = true
log.Infof("mirror block replication supported")
}
if s.index < len(s.Desc.Disks) {
target := fmt.Sprintf("%s:exportname=drive_%d_backend", s.nbdUri, s.index)
s.Monitor.DriveMirror(s.startMirror, fmt.Sprintf("drive_%d", s.index), target, s.syncMode, "", true, blockReplication, 0)
s.index += 1
} else {
if s.onSucc != nil {
s.onSucc()
} else {
hostutils.TaskComplete(s.ctx, nil)
}
}
}
type SDriveBackupTask struct {
*SKVMGuestInstance
ctx context.Context
nbdUri string
onSucc func()
syncMode string
index int
}
func NewDriveBackupTask(
ctx context.Context, s *SKVMGuestInstance, nbdUri, syncMode string, onSucc func(),
) *SDriveBackupTask {
return &SDriveBackupTask{
SKVMGuestInstance: s,
ctx: ctx,
nbdUri: nbdUri,
syncMode: syncMode,
onSucc: onSucc,
}
}
func (s *SDriveBackupTask) Start() {
s.startBackup("")
}
func (s *SDriveBackupTask) startBackup(res string) {
log.Infof("drive backup results:%s", res)
if len(res) > 0 {
hostutils.TaskFailed(s.ctx, res)
return
}
disks := s.Desc.Disks
if s.index < len(disks) {
target := fmt.Sprintf("%s:exportname=drive_%d_backend", s.nbdUri, s.index)
s.Monitor.DriveBackup(s.startBackup, fmt.Sprintf("drive_%d", s.index), target, s.syncMode, "raw")
s.index += 1
} else {
if s.onSucc != nil {
s.onSucc()
} else {
hostutils.TaskComplete(s.ctx, nil)
}
}
}
type SGuestBlockReplicationTask struct {
*SKVMGuestInstance
ctx context.Context
nbdHost string
nbdPort string
onSucc func()
onFail func(string)
syncMode string
index int
}
func NewGuestBlockReplicationTask(
ctx context.Context, s *SKVMGuestInstance,
nbdHost, nbdPort, syncMode string, onSucc func(), onFail func(string),
) *SGuestBlockReplicationTask {
return &SGuestBlockReplicationTask{
SKVMGuestInstance: s,
ctx: ctx,
nbdHost: nbdHost,
nbdPort: nbdPort,
syncMode: syncMode,
onSucc: onSucc,
onFail: onFail,
}
}
func (s *SGuestBlockReplicationTask) Start() {
s.onXBlockdevChange("")
}
func (s *SGuestBlockReplicationTask) onXBlockdevChange(res string) {
if len(res) > 0 {
log.Errorf("SGuestBlockReplicationTask onXBlockdevChange %s", res)
if s.onFail != nil {
s.onFail(res)
} else {
hostutils.TaskFailed(s.ctx, res)
}
return
}
disks := s.Desc.Disks
if s.index < len(disks) {
diskIndex := disks[s.index].Index
drive := fmt.Sprintf("drive_%d", diskIndex)
node := fmt.Sprintf("node_%d", diskIndex)
s.Monitor.DriveAdd("", "buddy", map[string]string{
"file.driver": "nbd", "file.host": s.nbdHost, "file.port": s.nbdPort,
"file.export": drive, "node-name": node,
}, s.onNbdDriveAddSucc(drive, node))
s.index += 1
} else {
if s.onSucc != nil {
s.onSucc()
} else {
hostutils.TaskComplete(s.ctx, nil)
}
}
}
func (s *SGuestBlockReplicationTask) onNbdDriveAddSucc(parent, node string) monitor.StringCallback {
return func(res string) {
if len(res) > 0 {
log.Errorf("SGuestBlockReplicationTask onNbdDriveAddSucc %s", res)
if s.onFail != nil {
s.onFail(res)
} else {
hostutils.TaskFailed(s.ctx, res)
}
return
}
s.Monitor.XBlockdevChange(parent, node, "", s.onXBlockdevChange)
}
}
/**
* GuestOnlineResizeDiskTask
**/
type SGuestOnlineResizeDiskTask struct {
*SKVMGuestInstance
ctx context.Context
disk storageman.IDisk
sizeMB int64
}
func NewGuestOnlineResizeDiskTask(
ctx context.Context, s *SKVMGuestInstance, disk storageman.IDisk, sizeMB int64,
) *SGuestOnlineResizeDiskTask {
return &SGuestOnlineResizeDiskTask{
SKVMGuestInstance: s,
ctx: ctx,
disk: disk,
sizeMB: sizeMB,
}
}
func (task *SGuestOnlineResizeDiskTask) Start() {
task.Monitor.GetBlocks(task.OnGetBlocksSucc)
}
func (task *SGuestOnlineResizeDiskTask) OnGetBlocksSucc(blocks []monitor.QemuBlock) {
for i := 0; i < len(blocks); i += 1 {
image := ""
if strings.HasPrefix(blocks[i].Inserted.File, "json:") {
//RBD磁盘格式如下
//json:{"driver": "raw", "file": {"pool": "testpool01", "image": "952636e3-73ed-4a19-8648-05e69e6bb57a", "driver": "rbd", "=keyvalue-pairs": "[\"mon_host\", \"10.127.10.230;10.127.10.237;10.127.10.238\", \"key\", \"AQBZ/Ddd0j5BCxAAfuvl5oHWsmuTGer6T9LzeQ==\", \"rados_mon_op_timeout\", \"5\", \"rados_osd_op_timeout\", \"1200\", \"client_mount_timeout\", \"120\"]"}
fileJson, err := jsonutils.ParseString(blocks[i].Inserted.File[5:])
if err != nil {
hostutils.TaskFailed(task.ctx, fmt.Sprintf("parse file json %s error: %v", blocks[i].Inserted.File, err))
return
}
image, _ = fileJson.GetString("file", "image")
}
if len(blocks[i].Inserted.File) > 0 && strings.HasSuffix(blocks[i].Inserted.File, task.disk.GetId()) || image == task.disk.GetId() {
if err := task.disk.PreResize(task.ctx, task.sizeMB); err != nil {
hostutils.TaskFailed(task.ctx, fmt.Sprintf("disk %s preResize failed %s", task.disk.GetId(), err))
return
}
task.Monitor.ResizeDisk(blocks[i].Device, task.sizeMB, task.OnResizeSucc)
return
}
}
hostutils.TaskFailed(task.ctx, fmt.Sprintf("disk %s not found on this guest", task.disk.GetId()))
}
func (task *SGuestOnlineResizeDiskTask) OnResizeSucc(err string) {
if len(err) == 0 {
if e := task.guestAgent.GuestPing(1); e == nil {
if e := task.guestAgent.QgaResizeDisk(task.disk.GetId()); e != nil {
log.Errorf("failed qga resize disk %s: %s", task.disk.GetId(), e)
}
}
params := jsonutils.NewDict()
params.Add(jsonutils.NewInt(task.sizeMB), "disk_size")
hostutils.TaskComplete(task.ctx, params)
return
}
hostutils.TaskFailed(task.ctx, fmt.Sprintf("resize disk %s %dMb error: %v", task.disk.GetId(), task.sizeMB, err))
}
/**
* GuestHotplugCpuMem
**/
type SGuestHotplugCpuMemTask struct {
*SKVMGuestInstance
ctx context.Context
addCpuCount int
addMemSize int
addMemNodeIndex int
originalCpuCount int
addedCpuCount int
addedVcpuIds []int
cpuNumaPin []*desc.SCpuNumaPin
cpuList []monitor.HotpluggableCPU
addedMemSize int
memSlotNewIndex *int
memSlotNewIndexs []int
memSlots []*desc.SMemSlot
}
func NewGuestHotplugCpuMemTask(
ctx context.Context, s *SKVMGuestInstance, input *SGuestHotplugCpuMem,
) *SGuestHotplugCpuMemTask {
t := &SGuestHotplugCpuMemTask{
SKVMGuestInstance: s,
ctx: ctx,
addCpuCount: int(input.AddCpuCount),
addMemSize: int(input.AddMemSize),
cpuNumaPin: input.CpuNumaPin,
}
if input.TotalCpuCount != nil && input.AddCpuCount > 0 {
if s.Desc.Cpu > *input.TotalCpuCount {
addedCpuCount := int(s.Desc.Cpu - *input.TotalCpuCount)
t.addCpuCount -= addedCpuCount
}
}
log.Infof("guest %s add cpu count %d", s.Id, t.addCpuCount)
if input.TotalMemSize != nil && input.AddMemSize > 0 {
if s.Desc.Mem > *input.TotalMemSize {
addedMemSize := int(s.Desc.Mem - *input.TotalMemSize)
t.addMemSize -= addedMemSize
}
}
log.Infof("guest %s add mem size %d", s.Id, t.addMemSize)
return t
}
// First at all add cpu count, second add mem size
func (task *SGuestHotplugCpuMemTask) Start() {
if task.addCpuCount > 0 {
res, err := task.getHotpluggableCPUList()
if err != nil {
task.onFail(err.Error())
return
}
task.cpuList = res
task.startAddCpu()
} else if task.addMemSize > 0 {
task.startAddMem()
} else {
task.onSucc()
}
}
func (task *SGuestHotplugCpuMemTask) startAddCpu() {
if task.Desc.MemDesc.Mem != nil && task.Desc.MemDesc.Mem.Cpus != nil && len(task.Desc.CpuNumaPin) > 0 {
task.buildVcpusMap()
} else {
task.Monitor.GetCpuCount(task.onGetCpuCount)
}
}
func (task *SGuestHotplugCpuMemTask) buildVcpusMap() {
vcpuSet, _ := cpuset.Parse(*task.Desc.MemDesc.Mem.Cpus)
for i := range task.Desc.MemDesc.Mem.Mems {
if task.Desc.MemDesc.Mem.Mems[i].Cpus != nil {
memVcpuSet, _ := cpuset.Parse(*task.Desc.MemDesc.Mem.Mems[i].Cpus)
vcpuSet = vcpuSet.Union(memVcpuSet)
}
}
allocatedVcpus := make([]int, 0)
for i := range task.Desc.CpuNumaPin {
for j := range task.Desc.CpuNumaPin[i].VcpuPin {
allocatedVcpus = append(allocatedVcpus, task.Desc.CpuNumaPin[i].VcpuPin[j].Vcpu)
}
}
allocatedCpuset := cpuset.NewCPUSet(allocatedVcpus...)
vcpuSet = vcpuSet.Difference(allocatedCpuset)
task.startAddCpusWithFreeVcpuSet(vcpuSet.ToSlice())
}
func (task *SGuestHotplugCpuMemTask) startAddCpusWithFreeVcpuSet(vcpuSet []int) {
for i := range vcpuSet {
vcpuId := vcpuSet[i]
err := task.AddCpu(task.cpuList, vcpuSet[i])
if err != nil {
log.Errorf("failed add cpu %d: %s", vcpuSet[i], err)
task.onFail(err.Error())
return
}
if len(task.cpuNumaPin) > 0 {
for i := range task.cpuNumaPin {
for j := range task.cpuNumaPin[i].VcpuPin {
if task.cpuNumaPin[i].VcpuPin[j].Vcpu == -1 {
task.cpuNumaPin[i].VcpuPin[j].Vcpu = vcpuId
}
}
}
} else {
cpus, _ := task.manager.cpuSet.AllocCpuset(1, 0, nil, task.GetId())
for _, cpus := range cpus {
//pcpus := cpuset.NewCPUSet(cpus.Cpuset...).String()
//vcpus := fmt.Sprintf("%d-%d", vcpuId, vcpuId)
vcpuPin := make([]desc.SVCpuPin, 1)
vcpuPin[0].Pcpu = cpus.Cpuset[0]
vcpuPin[0].Vcpu = vcpuId
cpuPin := &desc.SCpuNumaPin{
SizeMB: 0,
VcpuPin: vcpuPin,
Unregular: false,
}
task.Desc.CpuNumaPin = append(task.Desc.CpuNumaPin, cpuPin)
}
}
if task.addedVcpuIds == nil {
task.addedVcpuIds = []int{vcpuId}
} else {
task.addedVcpuIds = append(task.addedVcpuIds, vcpuId)
}
task.addedCpuCount += 1
}
task.startAddMem()
}
func (task *SGuestHotplugCpuMemTask) onGetCpuCount(count int) {
task.originalCpuCount = count
task.doAddCpu()
}
func (task *SGuestHotplugCpuMemTask) doAddCpu() {
if task.addedCpuCount < task.addCpuCount {
err := task.AddCpu(task.cpuList, task.originalCpuCount+task.addedCpuCount)
ret := ""
if err != nil {
ret = err.Error()
}
task.onAddCpu(ret)
} else {
task.startAddMem()
}
}
func (task *SGuestHotplugCpuMemTask) onAddCpu(reason string) {
if len(reason) > 0 {
log.Errorln(reason)
task.onFail(reason)
return
}
task.addedCpuCount += 1
task.doAddCpu()
}
func (task *SGuestHotplugCpuMemTask) startAddMem() {
if task.addMemSize > 0 {
task.Monitor.GeMemtSlotIndex(task.onGetSlotIndex)
} else {
task.onSucc()
}
}
func (task *SGuestHotplugCpuMemTask) onGetSlotIndex(index int) {
var newIndex = index
if task.Desc.MemDesc.Mem != nil {
newIndex += len(task.Desc.MemDesc.Mem.Mems)
}
task.memSlotNewIndex = &newIndex
var addMemSize = task.addMemSize
var numaNodeDesc *desc.SCpuNumaPin
for i := task.addMemNodeIndex; i < len(task.cpuNumaPin); i++ {
if task.cpuNumaPin[i].SizeMB > 0 {
task.addMemNodeIndex = i + 1
numaNodeDesc = task.cpuNumaPin[i]
addMemSize = int(task.cpuNumaPin[i].SizeMB)
break
}
}
var hostNodes *uint16
if numaNodeDesc != nil {
hostNodes = numaNodeDesc.NodeId
}
var objType string
var id = fmt.Sprintf("mem%d", *task.memSlotNewIndex)
var opts map[string]string
if task.manager.host.IsHugepagesEnabled() {
memPath := fmt.Sprintf("/dev/hugepages/%s-%d", task.GetId(), index)
err := procutils.NewRemoteCommandAsFarAsPossible("mkdir", "-p", memPath).Run()
if err != nil {
reason := fmt.Sprintf("mkdir %s fail: %s", memPath, err)
log.Errorf("%s", reason)
task.onFail(reason)
return
}
err = procutils.NewRemoteCommandAsFarAsPossible("mount", "-t", "hugetlbfs", "-o",
fmt.Sprintf("pagesize=%dK,size=%dM", task.manager.host.HugepageSizeKb(), addMemSize),
fmt.Sprintf("hugetlbfs-%s-%d", task.GetId(), index),
memPath,
).Run()
if err != nil {
reason := fmt.Sprintf("mount %s fail: %s", memPath, err)
log.Errorf("%s", reason)
task.onFail(reason)
return
}
objType = "memory-backend-file"
opts = map[string]string{
"size": fmt.Sprintf("%dM", addMemSize),
"mem-path": memPath,
"share": "on",
"prealloc": "on",
}
if hostNodes != nil {
opts["host-nodes"] = fmt.Sprintf("%d", *hostNodes)
opts["policy"] = "bind"
}
} else {
objType = "memory-backend-ram"
opts = map[string]string{
"size": fmt.Sprintf("%dM", task.addMemSize),
}
}
opts["id"] = id
cb := func(reason string) {
memObj := desc.NewMemDesc(objType, id, nil, nil)
memObj.Options = opts
memSlot := new(desc.SMemSlot)
memSlot.MemObj = memObj
memSlot.SizeMB = int64(addMemSize)
task.onAddMemObject(reason, memSlot)
}
task.Monitor.ObjectAdd(objType, opts, cb)
}
func (task *SGuestHotplugCpuMemTask) onAddMemFailed(reason string) {
log.Errorln(reason)
cb := func(res string) { log.Infof("%s", res) }
task.Monitor.ObjectDel(fmt.Sprintf("mem%d", *task.memSlotNewIndex), cb)
task.onFail(reason)
}
func (task *SGuestHotplugCpuMemTask) onAddMemObject(reason string, memSlot *desc.SMemSlot) {
if len(reason) > 0 {
task.onAddMemFailed(reason)
return
}
params := map[string]string{
"id": fmt.Sprintf("dimm%d", *task.memSlotNewIndex),
"memdev": fmt.Sprintf("mem%d", *task.memSlotNewIndex),
}
cb := func(reason string) {
if reason == "" {
memSlot.MemDev = &desc.SMemDevice{
Type: "pc-dimm",
Id: fmt.Sprintf("dimm%d", *task.memSlotNewIndex),
}
}
task.onAddMemDevice(reason, memSlot)
}
task.Monitor.DeviceAdd("pc-dimm", params, cb)
}
func (task *SGuestHotplugCpuMemTask) onAddMemDevice(reason string, memSlot *desc.SMemSlot) {
if len(reason) > 0 {
task.onAddMemFailed(reason)
return
}
task.addedMemSize = int(memSlot.SizeMB)
task.addMemSize -= int(memSlot.SizeMB)
if task.memSlots == nil {
task.memSlots = []*desc.SMemSlot{memSlot}
task.memSlotNewIndexs = []int{*task.memSlotNewIndex}
} else {
task.memSlots = append(task.memSlots, memSlot)
task.memSlotNewIndexs = append(task.memSlotNewIndexs, *task.memSlotNewIndex)
}
if task.addMemSize > 0 {
task.startAddMem()
} else {
task.onSucc()
}
}
func (task *SGuestHotplugCpuMemTask) updateGuestDesc() {
task.Desc.Cpu += int64(task.addedCpuCount)
task.Desc.CpuDesc.Cpus += uint(task.addedCpuCount)
task.Desc.Mem += int64(task.addedMemSize)
if len(task.cpuNumaPin) > 0 {
task.Desc.CpuNumaPin = append(task.Desc.CpuNumaPin, task.cpuNumaPin...)
}
if task.addedMemSize > 0 {
if task.Desc.MemDesc.MemSlots == nil {
task.Desc.MemDesc.MemSlots = make([]*desc.SMemSlot, 0)
}
task.Desc.MemDesc.MemSlots = append(task.Desc.MemDesc.MemSlots, task.memSlots...)
if task.manager.hostagentNumaAllocate {
for i := range task.memSlotNewIndexs {
hugepageId := fmt.Sprintf("%s-%d", task.getOriginId(), task.memSlotNewIndexs[i])
task.validateNumaAllocated(hugepageId, false, true, nil)
}
}
}
if len(task.addedVcpuIds) > 0 {
task.setCgroupCPUSet()
}
if task.addedCpuCount > 0 && len(task.Desc.VcpuPin) == 1 {
task.Desc.VcpuPin[0].Vcpus = fmt.Sprintf("0-%d", task.Desc.Cpu-1)
}
if task.addedCpuCount > 0 || task.addedMemSize > 0 {
SaveLiveDesc(task, task.Desc)
}
if task.addedMemSize > 0 {
vncPort := task.GetVncPort()
data := jsonutils.NewDict()
data.Set("vnc_port", jsonutils.NewInt(int64(vncPort)))
data.Set("sync_qemu_cmdline", jsonutils.JSONTrue)
if err := task.saveScripts(data); err != nil {
log.Errorf("failed save script: %s", err)
}
}
}
func (task *SGuestHotplugCpuMemTask) onFail(reason string) {
body := jsonutils.NewDict()
if task.addedCpuCount < task.addCpuCount {
body.Set("add_cpu_failed", jsonutils.JSONTrue)
body.Set("added_cpu", jsonutils.NewInt(int64(task.addedCpuCount)))
} else if task.memSlotNewIndex != nil {
body.Set("add_mem_failed", jsonutils.JSONTrue)
}
task.updateGuestDesc()
hostutils.TaskFailed2(task.ctx, reason, body)
}
func (task *SGuestHotplugCpuMemTask) onSucc() {
task.updateGuestDesc()
res := jsonutils.NewDict()
if len(task.cpuNumaPin) > 0 {
res.Set("cpu_numa_pin", jsonutils.Marshal(task.Desc.CpuNumaPin))
}
hostutils.TaskComplete(task.ctx, res)
}
type SGuestBlockIoThrottleTask struct {
*SKVMGuestInstance
ctx context.Context
}
func (task *SGuestBlockIoThrottleTask) Start() {
go task.startDoIoThrottle(0)
}
func (task *SGuestBlockIoThrottleTask) startDoIoThrottle(idx int) {
if idx < len(task.Desc.Disks) {
_cb := func(res string) {
if len(res) > 0 {
task.taskFail(res)
} else {
task.startDoIoThrottle(idx + 1)
}
}
task.Monitor.BlockIoThrottle(
fmt.Sprintf("drive_%d", task.Desc.Disks[idx].Index),
int64(task.Desc.Disks[idx].Bps), int64(task.Desc.Disks[idx].Iops), _cb)
} else {
task.taskComplete(nil)
}
}
func (task *SGuestBlockIoThrottleTask) taskFail(reason string) {
if taskId := task.ctx.Value(appctx.APP_CONTEXT_KEY_TASK_ID); taskId != nil {
hostutils.TaskFailed(task.ctx, reason)
} else {
log.Errorln(reason)
}
}
func (task *SGuestBlockIoThrottleTask) taskComplete(data jsonutils.JSONObject) {
if taskId := task.ctx.Value(appctx.APP_CONTEXT_KEY_TASK_ID); taskId != nil {
hostutils.TaskComplete(task.ctx, data)
}
}
type CancelBlockReplication struct {
SCancelBlockJobs
}
func NewCancelBlockReplicationTask(ctx context.Context, guest *SKVMGuestInstance) *CancelBlockReplication {
return &CancelBlockReplication{SCancelBlockJobs{guest, ctx}}
}
func (task *CancelBlockReplication) Start() {
// start remove child node of block device
disks := task.Desc.Disks
for i := 0; i < len(disks); i++ {
diskIndex := disks[i].Index
drive := fmt.Sprintf("drive_%d", diskIndex)
node := fmt.Sprintf("node_%d", diskIndex)
child := fmt.Sprintf("children.%d", task.getQuorumChildIndex())
task.Monitor.XBlockdevChange(drive, "", child, func(res string) {
if len(res) > 0 {
log.Errorf("failed remove child %s for parent %s: %s", drive, node, res)
return
}
task.Monitor.DriveDel(node, func(res string) {
if len(res) > 0 {
log.Errorf("failed remove drive %s: %s", node, res)
return
}
})
})
}
if task.ctx != nil {
hostutils.TaskComplete(task.ctx, nil)
}
}
type SCancelBlockJobs struct {
*SKVMGuestInstance
ctx context.Context
}
func NewCancelBlockJobsTask(ctx context.Context, guest *SKVMGuestInstance) *SCancelBlockJobs {
return &SCancelBlockJobs{guest, ctx}
}
func (task *SCancelBlockJobs) Start() {
task.findBlockDevices()
}
func (task *SCancelBlockJobs) findBlockDevices() {
task.Monitor.GetBlocks(task.onBlockDriversSucc)
}
func (task *SCancelBlockJobs) onBlockDriversSucc(blocks []monitor.QemuBlock) {
drivers := make([]string, 0)
for i := 0; i < len(blocks); i++ {
if strings.HasPrefix(blocks[i].Device, "drive_") {
drivers = append(drivers, blocks[i].Device)
}
}
task.StartCancelBlockJobs(drivers)
}
func (task *SCancelBlockJobs) StartCancelBlockJobs(drivers []string) {
if len(drivers) > 0 {
driver := drivers[0]
drivers = drivers[1:]
onCancelBlockJob := func(res string) {
if len(res) > 0 {
log.Errorln(res)
}
task.StartCancelBlockJobs(drivers)
}
task.Monitor.CancelBlockJob(driver, false, onCancelBlockJob)
} else {
task.taskComplete()
}
}
func (task *SCancelBlockJobs) taskComplete() {
if task.ctx != nil {
hostutils.TaskComplete(task.ctx, nil)
}
}
type SGuestStorageCloneDiskTask struct {
*SGuestBlockProgressBaseTask
started bool
diskIndex int
resp *hostapi.ServerCloneDiskFromStorageResponse
params *SStorageCloneDisk
}
func NewGuestStorageCloneDiskTask(ctx context.Context, guest *SKVMGuestInstance, params *SStorageCloneDisk) *SGuestStorageCloneDiskTask {
task := &SGuestStorageCloneDiskTask{
params: params,
}
task.SGuestBlockProgressBaseTask = NewGuestBlockProgressBaseTask(ctx, guest, task)
return task
}
func (t *SGuestStorageCloneDiskTask) Start(guestRunning bool) {
var diskIndex = -1
disks := t.Desc.Disks
for diskIndex = 0; diskIndex < len(disks); diskIndex++ {
if disks[diskIndex].DiskId == t.params.SourceDisk.GetId() {
diskIndex = int(disks[diskIndex].Index)
break
}
}
if diskIndex < 0 {
hostutils.TaskFailed(
t.ctx, fmt.Sprintf("failed find disk index %s", t.params.SourceDisk.GetId()),
)
return
}
encryptInfo, err := t.getEncryptKey(t.ctx, auth.AdminCredential())
if err != nil {
hostutils.TaskFailed(
t.ctx, fmt.Sprintf("failed get guest encrypt info %s", t.params.SourceDisk.GetId()),
)
return
}
resp, err := t.params.TargetStorage.CloneDiskFromStorage(t.ctx, t.params.SourceStorage, t.params.SourceDisk, t.params.TargetDiskId, !guestRunning, encryptInfo)
if err != nil {
hostutils.TaskFailed(
t.ctx, fmt.Sprintf("Clone disk %s to storage %s failed %s",
t.params.SourceDisk.GetPath(), t.params.TargetStorage.GetId(), err),
)
return
}
targetDisk, err := t.params.TargetStorage.GetDiskById(t.params.TargetDiskId)
if err != nil {
hostutils.TaskFailed(
t.ctx, fmt.Sprintf("Failed get target disk %s %s", t.params.TargetDiskId, err),
)
return
}
targetDiskFormat, err := targetDisk.GetFormat()
if err != nil {
hostutils.TaskFailed(
t.ctx, fmt.Sprintf("Failed get target disk format %s %s", t.params.TargetDiskId, err),
)
return
}
if !guestRunning {
hostutils.TaskComplete(t.ctx, jsonutils.Marshal(resp))
return
} else {
t.resp = resp
t.diskIndex = diskIndex
}
t.Monitor.DriveMirror(t.onDriveMirror, fmt.Sprintf("drive_%d", diskIndex), targetDisk.GetPath(), "full", targetDiskFormat, true, false, 0)
}
func (t *SGuestStorageCloneDiskTask) onDriveMirror(res string) {
if len(res) > 0 {
hostutils.TaskFailed(
t.ctx, fmt.Sprintf("Clone disk %s to storage %s drive mirror failed %s",
t.params.SourceDisk.GetPath(), t.params.TargetStorage.GetId(), res),
)
} else {
t.started = true
hostutils.TaskComplete(t.ctx, jsonutils.Marshal(t.resp))
go t.startWaitBlockJob(res)
timeutils2.AddTimeout(time.Second*3, func() {
t.SyncStatus("drive mirror started")
})
}
}
func (t *SGuestStorageCloneDiskTask) OnGetBlockJobs(jobs []monitor.BlockJob) {
if len(jobs) == 0 {
if !t.started {
targetDisk, err := t.params.TargetStorage.GetDiskById(t.params.TargetDiskId)
if err != nil {
hostutils.TaskFailed(
t.ctx, fmt.Sprintf("Failed get target disk %s %s", t.params.TargetDiskId, err),
)
return
}
targetDiskFormat, err := targetDisk.GetFormat()
if err != nil {
hostutils.TaskFailed(
t.ctx, fmt.Sprintf("Failed get target disk format %s %s", t.params.TargetDiskId, err),
)
return
}
t.Monitor.DriveMirror(t.onDriveMirror, fmt.Sprintf("drive_%d", t.diskIndex), targetDisk.GetPath(), "full", targetDiskFormat, true, false, 0)
} else {
hostutils.TaskFailed(t.ctx, fmt.Sprintf("Disk %s Block job not found", t.params.SourceDisk.GetId()))
}
return
}
for i := range jobs {
if jobs[i].Status == "ready" && jobs[i].Device == fmt.Sprintf("drive_%d", t.diskIndex) {
t.cancelWaitBlockJobs()
params := jsonutils.NewDict()
params.Set("block_jobs_ready", jsonutils.JSONTrue)
hostutils.TaskComplete(t.ctx, params)
break
}
}
}
func (t *SGuestStorageCloneDiskTask) StreamingDiskCompletedCount() int {
return t.params.CompletedDiskCount
}
func (t *SGuestStorageCloneDiskTask) StreamingDiskCount() int {
return t.params.CloneDiskCount
}
type SGuestLiveChangeDisk struct {
*SKVMGuestInstance
ctx context.Context
params *SStorageCloneDisk
guestNeedResume bool
diskIndex int
targetDisk storageman.IDisk
}
func NewGuestLiveChangeDiskTask(ctx context.Context, guest *SKVMGuestInstance, params *SStorageCloneDisk) (*SGuestLiveChangeDisk, error) {
disk, err := params.TargetStorage.GetDiskById(params.TargetDiskId)
if err != nil {
return nil, err
}
var diskIndex = -1
disks := guest.Desc.Disks
for diskIndex = 0; diskIndex < len(disks); diskIndex++ {
if disks[diskIndex].DiskId == params.SourceDisk.GetId() {
diskIndex = int(disks[diskIndex].Index)
break
}
}
if diskIndex < 0 {
return nil, fmt.Errorf("failed found disk %s index", params.SourceDisk.GetId())
}
diskFormat, err := params.SourceDisk.GetFormat()
if err != nil {
return nil, errors.Wrap(err, "failed get disk fromat")
}
params.DiskFormat = diskFormat
return &SGuestLiveChangeDisk{
SKVMGuestInstance: guest,
ctx: ctx,
params: params,
guestNeedResume: false,
diskIndex: diskIndex,
targetDisk: disk,
}, nil
}
func (t *SGuestLiveChangeDisk) Start() {
// pause guest first
if !t.IsSuspend() {
t.Monitor.SimpleCommand("stop", t.onGuestPaused)
t.guestNeedResume = true
} else {
t.onGuestPaused("")
}
}
func (t *SGuestLiveChangeDisk) onGuestPaused(res string) {
if strings.Contains(strings.ToLower(res), "error") {
hostutils.TaskFailed(t.ctx, fmt.Sprintf("pause error: %s", res))
return
}
t.Monitor.BlockJobComplete(fmt.Sprintf("drive_%d", t.diskIndex), t.onReopenImageSuccess)
}
func (t *SGuestLiveChangeDisk) onReopenImageSuccess(res string) {
// resume guest first
if t.guestNeedResume {
t.Monitor.SimpleCommand("cont", nil)
}
if len(res) > 0 {
hostutils.TaskFailed(t.ctx, fmt.Sprintf("reopen image failed: %s", res))
return
}
if t.params.TargetDiskDesc != nil {
for i := 0; i < len(t.Desc.Disks); i++ {
if t.Desc.Disks[i].Index == int8(t.diskIndex) {
log.Debugf("update guest disk %s desc", t.Desc.Disks[i].DiskId)
t.Desc.Disks[i].GuestdiskJsonDesc = *t.params.TargetDiskDesc
SaveLiveDesc(t, t.Desc)
break
}
}
}
resp := &hostapi.ServerCloneDiskFromStorageResponse{
TargetAccessPath: t.targetDisk.GetPath(),
}
hostutils.TaskComplete(t.ctx, jsonutils.Marshal(resp))
}