mirror of
https://github.com/yunionio/cloudpods.git
synced 2026-07-01 02:24:47 +08:00
feat(region,host): add disk migrate support
This commit is contained in:
@@ -43,6 +43,7 @@ func init() {
|
||||
cmd := shell.NewResourceCmd(&modules.Disks)
|
||||
cmd.Perform("set-class-metadata", &options.ResourceMetadataOptions{})
|
||||
cmd.Perform("rebuild", &options.ResourceIdOptions{})
|
||||
cmd.Perform("migrate", &compute_options.DiskMigrateOptions{})
|
||||
|
||||
type DiskListOptions struct {
|
||||
options.BaseListOptions
|
||||
|
||||
@@ -318,6 +318,7 @@ type DiskAllocateFromBackupInput struct {
|
||||
type DiskDeleteInput struct {
|
||||
SkipRecycle *bool
|
||||
EsxiFlatFilePath string
|
||||
CleanSnapshots bool
|
||||
}
|
||||
|
||||
type DiskResetInput struct {
|
||||
@@ -325,6 +326,10 @@ type DiskResetInput struct {
|
||||
AutoStart bool `json:"auto_start"`
|
||||
}
|
||||
|
||||
type DiskMigrateInput struct {
|
||||
TargetStorageId string `json:"target_storage_id"`
|
||||
}
|
||||
|
||||
type DiskSnapshotpolicyInput struct {
|
||||
SnapshotpolicyId string `json:"snapshotpolicy_id"`
|
||||
}
|
||||
|
||||
@@ -44,6 +44,8 @@ const (
|
||||
DISK_START_MIGRATE = "start_migrate"
|
||||
DISK_POST_MIGRATE = "post_migrate"
|
||||
DISK_MIGRATING = "migrating"
|
||||
DISK_MIGRATE_FAIL = "migrate_failed"
|
||||
DISK_IMAGE_CACHING = "image_caching" // 缓存镜像中
|
||||
|
||||
DISK_CLONE = "clone"
|
||||
DISK_CLONE_FAIL = "clone_failed"
|
||||
|
||||
@@ -87,7 +87,7 @@ func (self *SBaremetalHostDriver) RequestAllocateDiskOnStorage(ctx context.Conte
|
||||
return fmt.Errorf("not supported")
|
||||
}
|
||||
|
||||
func (self *SBaremetalHostDriver) RequestDeallocateDiskOnHost(ctx context.Context, host *models.SHost, storage *models.SStorage, disk *models.SDisk, task taskman.ITask) error {
|
||||
func (self *SBaremetalHostDriver) RequestDeallocateDiskOnHost(ctx context.Context, host *models.SHost, storage *models.SStorage, disk *models.SDisk, cleanSnapshots bool, task taskman.ITask) error {
|
||||
return fmt.Errorf("not supported")
|
||||
}
|
||||
|
||||
|
||||
@@ -270,3 +270,11 @@ func (driver *SBaseHostDriver) RequestSyncOnHost(ctx context.Context, host *mode
|
||||
func (driver *SBaseHostDriver) RequestProbeIsolatedDevices(ctx context.Context, userCred mcclient.TokenCredential, host *models.SHost, input jsonutils.JSONObject) (*jsonutils.JSONArray, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (driver *SBaseHostDriver) RequestDiskSrcMigratePrepare(ctx context.Context, host *models.SHost, disk *models.SDisk, task taskman.ITask) (jsonutils.JSONObject, error) {
|
||||
return nil, fmt.Errorf("not supported")
|
||||
}
|
||||
|
||||
func (driver *SBaseHostDriver) RequestDiskMigrate(ctx context.Context, targetHost *models.SHost, targetStorage *models.SStorage, disk *models.SDisk, task taskman.ITask, body *jsonutils.JSONDict) error {
|
||||
return fmt.Errorf("not supported")
|
||||
}
|
||||
|
||||
@@ -321,7 +321,7 @@ func (self *SKVMHostDriver) RequestRebuildDiskOnStorage(ctx context.Context, hos
|
||||
return self.RequestAllocateDiskOnStorage(ctx, task.GetUserCred(), host, storage, disk, task, input)
|
||||
}
|
||||
|
||||
func (self *SKVMHostDriver) RequestDeallocateDiskOnHost(ctx context.Context, host *models.SHost, storage *models.SStorage, disk *models.SDisk, task taskman.ITask) error {
|
||||
func (self *SKVMHostDriver) RequestDeallocateDiskOnHost(ctx context.Context, host *models.SHost, storage *models.SStorage, disk *models.SDisk, cleanSnapshots bool, task taskman.ITask) error {
|
||||
log.Infof("Deallocating disk on host %s", host.GetName())
|
||||
header := task.GetTaskRequestHeader()
|
||||
|
||||
@@ -330,6 +330,7 @@ func (self *SKVMHostDriver) RequestDeallocateDiskOnHost(ctx context.Context, hos
|
||||
if flatPath := disk.GetMetadata(ctx, api.DISK_META_REMOTE_ACCESS_PATH, nil); flatPath != "" {
|
||||
body.Set("esxi_flat_file_path", jsonutils.NewString(flatPath))
|
||||
}
|
||||
body.Set("clean_snapshots", jsonutils.NewBool(cleanSnapshots))
|
||||
_, err := host.Request(ctx, task.GetUserCred(), "POST", url, header, body)
|
||||
if err != nil {
|
||||
if errors.Cause(err) == cloudprovider.ErrNotFound {
|
||||
@@ -372,6 +373,22 @@ func (self *SKVMHostDriver) RequestResizeDiskOnHost(ctx context.Context, host *m
|
||||
return err
|
||||
}
|
||||
|
||||
func (self *SKVMHostDriver) RequestDiskSrcMigratePrepare(ctx context.Context, host *models.SHost, disk *models.SDisk, task taskman.ITask) (jsonutils.JSONObject, error) {
|
||||
body := jsonutils.NewDict()
|
||||
destUrl := fmt.Sprintf("/disks/%s/src-migrate-prepare/%s", disk.StorageId, disk.Id)
|
||||
|
||||
header := task.GetTaskRequestHeader()
|
||||
return host.Request(ctx, task.GetUserCred(), "POST", destUrl, header, body)
|
||||
}
|
||||
|
||||
func (self *SKVMHostDriver) RequestDiskMigrate(ctx context.Context, targetHost *models.SHost, targetStorage *models.SStorage, disk *models.SDisk, task taskman.ITask, body *jsonutils.JSONDict) error {
|
||||
destUrl := fmt.Sprintf("/disks/%s/migrate/%s", targetStorage.Id, disk.Id)
|
||||
|
||||
header := task.GetTaskRequestHeader()
|
||||
_, err := targetHost.Request(ctx, task.GetUserCred(), "POST", destUrl, header, body)
|
||||
return err
|
||||
}
|
||||
|
||||
func (self *SKVMHostDriver) RequestPrepareSaveDiskOnHost(ctx context.Context, host *models.SHost, disk *models.SDisk, imageId string, task taskman.ITask) error {
|
||||
body := jsonutils.NewDict()
|
||||
body.Add(jsonutils.Marshal(map[string]string{"image_id": imageId}), "disk")
|
||||
|
||||
@@ -287,7 +287,7 @@ func (self *SManagedVirtualizationHostDriver) RequestAllocateDiskOnStorage(ctx c
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *SManagedVirtualizationHostDriver) RequestDeallocateDiskOnHost(ctx context.Context, host *models.SHost, storage *models.SStorage, disk *models.SDisk, task taskman.ITask) error {
|
||||
func (self *SManagedVirtualizationHostDriver) RequestDeallocateDiskOnHost(ctx context.Context, host *models.SHost, storage *models.SStorage, disk *models.SDisk, cleanSnapshots bool, task taskman.ITask) error {
|
||||
taskman.LocalTaskRun(task, func() (jsonutils.JSONObject, error) {
|
||||
idisk, err := disk.GetIDisk(ctx)
|
||||
if err != nil {
|
||||
|
||||
@@ -39,6 +39,7 @@ import (
|
||||
billing_api "yunion.io/x/onecloud/pkg/apis/billing"
|
||||
api "yunion.io/x/onecloud/pkg/apis/compute"
|
||||
imageapi "yunion.io/x/onecloud/pkg/apis/image"
|
||||
schedapi "yunion.io/x/onecloud/pkg/apis/scheduler"
|
||||
"yunion.io/x/onecloud/pkg/cloudcommon/db"
|
||||
"yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
|
||||
"yunion.io/x/onecloud/pkg/cloudcommon/db/quotas"
|
||||
@@ -931,6 +932,75 @@ func (self *SDisk) PerformDiskReset(ctx context.Context, userCred mcclient.Token
|
||||
return nil, self.StartResetDisk(ctx, userCred, snapshot.Id, input.AutoStart, guest, "")
|
||||
}
|
||||
|
||||
func (self *SDisk) validateMigrate(ctx context.Context, userCred mcclient.TokenCredential, input *api.DiskMigrateInput) error {
|
||||
hypervisor := self.getHypervisor()
|
||||
if !utils.IsInStringArray(hypervisor, []string{api.HYPERVISOR_KVM, api.HYPERVISOR_POD}) {
|
||||
return httperrors.NewNotAcceptableError("Not allow for hypervisor %s", hypervisor)
|
||||
}
|
||||
if guest := self.GetGuest(); guest != nil {
|
||||
return httperrors.NewBadRequestError("Disk attached guest, cannot migrate")
|
||||
}
|
||||
if input.TargetStorageId != "" {
|
||||
srcStorage, err := self.GetStorage()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "get src storage")
|
||||
}
|
||||
iDstStorage, err := StorageManager.FetchByIdOrName(ctx, userCred, input.TargetStorageId)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "get target storage")
|
||||
}
|
||||
if srcStorage.StorageType != iDstStorage.(*SStorage).StorageType {
|
||||
return httperrors.NewBadRequestError("Cannot migrate disk from storage type %s to %s", srcStorage.StorageType, iDstStorage.(*SStorage).StorageType)
|
||||
}
|
||||
input.TargetStorageId = iDstStorage.GetId()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *SDisk) PerformMigrate(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input *api.DiskMigrateInput) (jsonutils.JSONObject, error) {
|
||||
err := self.validateMigrate(ctx, userCred, input)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
self.SetStatus(ctx, userCred, api.DISK_START_MIGRATE, "")
|
||||
params := jsonutils.NewDict()
|
||||
params.Set("target_storage_id", jsonutils.NewString(input.TargetStorageId))
|
||||
task, err := taskman.TaskManager.NewTask(ctx, "DiskMigrateTask", self, userCred, params, "", "", nil)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "NewTask")
|
||||
}
|
||||
return nil, task.ScheduleRun(nil)
|
||||
}
|
||||
|
||||
func (self *SDisk) GetSchedMigrateParams(targetStorageId string) (*schedapi.ScheduleInput, error) {
|
||||
diskConfig := self.ToDiskConfig()
|
||||
diskConfig.Medium = ""
|
||||
diskConfig.Storage = targetStorageId
|
||||
input := new(api.DiskCreateInput)
|
||||
input.DiskConfig = diskConfig
|
||||
|
||||
srvInput := input.ToServerCreateInput()
|
||||
ret := new(schedapi.ScheduleInput)
|
||||
err := srvInput.JSON(srvInput).Unmarshal(ret)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if targetStorageId == "" {
|
||||
storage, err := self.GetStorage()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
host, err := storage.GetMasterHost()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ret.HostId = host.Id
|
||||
ret.LiveMigrate = false
|
||||
}
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (self *SDisk) StartResetDisk(
|
||||
ctx context.Context, userCred mcclient.TokenCredential,
|
||||
snapshotId string, autoStart bool, guest *SGuest, parentTaskId string,
|
||||
|
||||
@@ -43,11 +43,13 @@ type IHostDriver interface {
|
||||
RequestRebuildDiskOnStorage(ctx context.Context, host *SHost, storage *SStorage, disk *SDisk, task taskman.ITask, input api.DiskAllocateInput) error
|
||||
|
||||
// delete disk
|
||||
RequestDeallocateDiskOnHost(ctx context.Context, host *SHost, storage *SStorage, disk *SDisk, task taskman.ITask) error
|
||||
RequestDeallocateDiskOnHost(ctx context.Context, host *SHost, storage *SStorage, disk *SDisk, cleanSnapshots bool, task taskman.ITask) error
|
||||
RequestDeallocateBackupDiskOnHost(ctx context.Context, host *SHost, storage *SStorage, disk *SDisk, task taskman.ITask) error
|
||||
|
||||
// resize disk
|
||||
RequestResizeDiskOnHost(ctx context.Context, host *SHost, storage *SStorage, disk *SDisk, size int64, task taskman.ITask) error
|
||||
RequestDiskSrcMigratePrepare(ctx context.Context, host *SHost, disk *SDisk, task taskman.ITask) (jsonutils.JSONObject, error)
|
||||
RequestDiskMigrate(ctx context.Context, targetHost *SHost, targetStorage *SStorage, disk *SDisk, task taskman.ITask, body *jsonutils.JSONDict) error
|
||||
|
||||
RequestDeleteSnapshotsWithStorage(ctx context.Context, host *SHost, snapshot *SSnapshot, task taskman.ITask) error
|
||||
RequestResetDisk(ctx context.Context, host *SHost, disk *SDisk, params *jsonutils.JSONDict, task taskman.ITask) error
|
||||
|
||||
@@ -146,7 +146,7 @@ func (self *DiskDeleteTask) startDeleteDisk(ctx context.Context, disk *models.SD
|
||||
self.OnGuestDiskDeleteCompleteFailed(ctx, disk, jsonutils.NewString("fail to find master host"))
|
||||
return
|
||||
}
|
||||
err = host.GetHostDriver().RequestDeallocateDiskOnHost(ctx, host, storage, disk, self)
|
||||
err = host.GetHostDriver().RequestDeallocateDiskOnHost(ctx, host, storage, disk, false, self)
|
||||
if err != nil {
|
||||
self.OnGuestDiskDeleteCompleteFailed(ctx, disk, jsonutils.NewString(err.Error()))
|
||||
return
|
||||
@@ -161,7 +161,7 @@ func (self *DiskDeleteTask) OnMasterStorageDeleteDiskComplete(ctx context.Contex
|
||||
self.OnGuestDiskDeleteCompleteFailed(ctx, disk, jsonutils.NewString(fmt.Sprintf("backup storage %s fail to find master host", disk.BackupStorageId)))
|
||||
return
|
||||
}
|
||||
err = host.GetHostDriver().RequestDeallocateDiskOnHost(ctx, host, storage, disk, self)
|
||||
err = host.GetHostDriver().RequestDeallocateDiskOnHost(ctx, host, storage, disk, false, self)
|
||||
if err != nil {
|
||||
self.OnGuestDiskDeleteCompleteFailed(ctx, disk, jsonutils.NewString(err.Error()))
|
||||
}
|
||||
|
||||
242
pkg/compute/tasks/disk_migrate_task.go
Normal file
242
pkg/compute/tasks/disk_migrate_task.go
Normal file
@@ -0,0 +1,242 @@
|
||||
package tasks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"yunion.io/x/jsonutils"
|
||||
"yunion.io/x/pkg/errors"
|
||||
"yunion.io/x/pkg/utils"
|
||||
|
||||
"yunion.io/x/onecloud/pkg/apis/compute"
|
||||
schedapi "yunion.io/x/onecloud/pkg/apis/scheduler"
|
||||
"yunion.io/x/onecloud/pkg/cloudcommon/db"
|
||||
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
|
||||
"yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
|
||||
"yunion.io/x/onecloud/pkg/compute/models"
|
||||
"yunion.io/x/onecloud/pkg/util/logclient"
|
||||
)
|
||||
|
||||
type DiskMigrateTask struct {
|
||||
SSchedTask
|
||||
}
|
||||
|
||||
func init() {
|
||||
taskman.RegisterTask(DiskMigrateTask{})
|
||||
}
|
||||
|
||||
func (task *DiskMigrateTask) TaskComplete(ctx context.Context, disk *models.SDisk) {
|
||||
task.SetStageComplete(ctx, nil)
|
||||
db.OpsLog.LogEvent(disk, db.ACT_MIGRATE, "Migrate success", task.UserCred)
|
||||
logclient.AddActionLogWithContext(ctx, disk, logclient.ACT_MIGRATE, task.Params, task.UserCred, true)
|
||||
}
|
||||
|
||||
func (task *DiskMigrateTask) markFailed(ctx context.Context, disk *models.SDisk, reason jsonutils.JSONObject) {
|
||||
disk.SetStatus(ctx, task.UserCred, compute.DISK_MIGRATE_FAIL, reason.String())
|
||||
db.OpsLog.LogEvent(disk, db.ACT_MIGRATE_FAIL, reason, task.UserCred)
|
||||
logclient.AddActionLogWithContext(ctx, disk, logclient.ACT_MIGRATE, reason, task.UserCred, false)
|
||||
notifyclient.NotifySystemErrorWithCtx(ctx, disk.Id, disk.Name, compute.DISK_MIGRATE_FAIL, reason.String())
|
||||
notifyclient.EventNotify(ctx, task.GetUserCred(), notifyclient.SEventNotifyParam{
|
||||
Obj: disk,
|
||||
Action: notifyclient.ActionMigrate,
|
||||
IsFail: true,
|
||||
})
|
||||
}
|
||||
|
||||
func (task *DiskMigrateTask) TaskFailed(ctx context.Context, disk *models.SDisk, reason jsonutils.JSONObject) {
|
||||
task.markFailed(ctx, disk, reason)
|
||||
task.SetStageFailed(ctx, reason)
|
||||
}
|
||||
|
||||
func (task *DiskMigrateTask) GetSchedParams() (*schedapi.ScheduleInput, error) {
|
||||
obj := task.GetObject()
|
||||
disk := obj.(*models.SDisk)
|
||||
|
||||
targetStorageId, _ := task.Params.GetString("target_storage_id")
|
||||
return disk.GetSchedMigrateParams(targetStorageId)
|
||||
}
|
||||
|
||||
func (task *DiskMigrateTask) OnInit(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
|
||||
StartScheduleObjects(ctx, task, []db.IStandaloneModel{obj})
|
||||
}
|
||||
|
||||
func (task *DiskMigrateTask) OnScheduleFailCallback(ctx context.Context, obj IScheduleModel, reason jsonutils.JSONObject, index int) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
func (task *DiskMigrateTask) OnScheduleFailed(ctx context.Context, reason jsonutils.JSONObject) {
|
||||
obj := task.GetObject()
|
||||
disk := obj.(*models.SDisk)
|
||||
task.TaskFailed(ctx, disk, reason)
|
||||
}
|
||||
|
||||
func (task *DiskMigrateTask) SaveScheduleResult(ctx context.Context, obj IScheduleModel, candidate *schedapi.CandidateResource, index int) {
|
||||
disk := obj.(*models.SDisk)
|
||||
targetHostId := candidate.HostId
|
||||
storageIds := candidate.Disks[0].StorageIds
|
||||
targetHost := models.HostManager.FetchHostById(targetHostId)
|
||||
if targetHost == nil {
|
||||
task.TaskFailed(ctx, disk, jsonutils.NewString("target host not found?"))
|
||||
return
|
||||
}
|
||||
if len(storageIds) == 0 {
|
||||
task.TaskFailed(ctx, disk, jsonutils.NewString("no target storage found?"))
|
||||
return
|
||||
}
|
||||
var storageId string
|
||||
for i := range storageIds {
|
||||
if storageIds[i] != disk.StorageId {
|
||||
storageId = storageIds[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
storage := models.StorageManager.FetchStorageById(storageId)
|
||||
if storage == nil {
|
||||
task.TaskFailed(ctx, disk, jsonutils.NewString("target storage not found?"))
|
||||
return
|
||||
}
|
||||
|
||||
task.Params.Set("target_host_id", jsonutils.NewString(targetHostId))
|
||||
task.Params.Set("target_storage_id", jsonutils.NewString(storage.Id))
|
||||
task.SetStage("OnStorageCacheImage", nil)
|
||||
|
||||
if disk.TemplateId != "" {
|
||||
format, err := disk.GetCacheImageFormat(ctx)
|
||||
if err != nil {
|
||||
task.TaskFailed(ctx, disk, jsonutils.NewString(fmt.Sprintf("disk get cache image format failed %s", err)))
|
||||
return
|
||||
}
|
||||
|
||||
input := compute.CacheImageInput{
|
||||
ImageId: disk.GetTemplateId(),
|
||||
Format: format,
|
||||
ParentTaskId: task.GetTaskId(),
|
||||
}
|
||||
disk.SetStatus(ctx, task.UserCred, compute.DISK_IMAGE_CACHING, "On disk migrate save schedule result")
|
||||
storagecache := storage.GetStoragecache()
|
||||
storagecache.StartImageCacheTask(ctx, task.UserCred, input)
|
||||
} else {
|
||||
task.OnStorageCacheImage(ctx, disk, nil)
|
||||
}
|
||||
}
|
||||
|
||||
func (task *DiskMigrateTask) OnStorageCacheImage(ctx context.Context, disk *models.SDisk, data jsonutils.JSONObject) {
|
||||
disk.SetStatus(ctx, task.UserCred, compute.DISK_MIGRATING, "On disk migrate start migrate")
|
||||
|
||||
storage, err := disk.GetStorage()
|
||||
if err != nil {
|
||||
task.TaskFailed(ctx, disk, jsonutils.NewString(err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
sourceHost, err := storage.GetMasterHost()
|
||||
if err != nil {
|
||||
task.TaskFailed(ctx, disk, jsonutils.NewString(err.Error()))
|
||||
return
|
||||
}
|
||||
ret, err := sourceHost.GetHostDriver().RequestDiskSrcMigratePrepare(ctx, sourceHost, disk, task)
|
||||
if err != nil {
|
||||
task.TaskFailed(ctx, disk, jsonutils.NewString(err.Error()))
|
||||
return
|
||||
}
|
||||
snapshotsUri := fmt.Sprintf("%s/download/snapshots/", sourceHost.ManagerUri)
|
||||
diskUri := fmt.Sprintf("%s/download/disks/", sourceHost.ManagerUri)
|
||||
|
||||
body := jsonutils.NewDict()
|
||||
if ret != nil {
|
||||
body.Update(ret.(*jsonutils.JSONDict))
|
||||
}
|
||||
|
||||
snapChain := []string{}
|
||||
if ret.Contains("disk_snaps_chain") {
|
||||
err = ret.Unmarshal(&snapChain, "disk_snaps_chain")
|
||||
if err != nil {
|
||||
task.TaskFailed(ctx, disk, jsonutils.NewString(errors.Wrap(err, "unmarshal snap chain").Error()))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
outChainSnapshotIds := jsonutils.NewArray()
|
||||
snapshots := models.SnapshotManager.GetDiskSnapshots(disk.Id)
|
||||
for j := 0; j < len(snapshots); j++ {
|
||||
if !utils.IsInStringArray(snapshots[j].Id, snapChain) {
|
||||
outChainSnapshotIds.Add(jsonutils.NewString(snapshots[j].Id))
|
||||
}
|
||||
}
|
||||
|
||||
body.Set("out_chain_snapshots", outChainSnapshotIds)
|
||||
body.Set("snapshots_uri", jsonutils.NewString(snapshotsUri))
|
||||
body.Set("disk_uri", jsonutils.NewString(diskUri))
|
||||
body.Set("src_storage_id", jsonutils.NewString(disk.StorageId))
|
||||
if disk.TemplateId != "" {
|
||||
body.Set("template_id", jsonutils.NewString(disk.TemplateId))
|
||||
}
|
||||
|
||||
targetHostId, _ := task.Params.GetString("target_host_id")
|
||||
targetHost := models.HostManager.FetchHostById(targetHostId)
|
||||
targetStorageId, _ := task.Params.GetString("target_storage_id")
|
||||
targetStorage := models.StorageManager.FetchStorageById(targetStorageId)
|
||||
|
||||
task.SetStage("OnDiskMigrate", nil)
|
||||
if err = targetHost.GetHostDriver().RequestDiskMigrate(ctx, targetHost, targetStorage, disk, task, body); err != nil {
|
||||
task.TaskFailed(ctx, disk, jsonutils.NewString(fmt.Sprintf("failed request disk migrate %s", err)))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (task *DiskMigrateTask) OnDiskMigrate(ctx context.Context, disk *models.SDisk, data jsonutils.JSONObject) {
|
||||
srcStorage, _ := disk.GetStorage()
|
||||
srcHost, _ := srcStorage.GetMasterHost()
|
||||
|
||||
diskPath, _ := data.GetString("disk_path")
|
||||
targetStorageId, _ := task.Params.GetString("target_storage_id")
|
||||
_, err := db.Update(disk, func() error {
|
||||
//disk.Status = compute.DISK_READY
|
||||
disk.StorageId = targetStorageId
|
||||
if diskPath != "" {
|
||||
disk.AccessPath = diskPath
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
task.TaskFailed(ctx, disk, jsonutils.NewString(fmt.Sprintf("db failed update disk %s", err)))
|
||||
return
|
||||
}
|
||||
snapshots := models.SnapshotManager.GetDiskSnapshots(disk.Id)
|
||||
for _, snapshot := range snapshots {
|
||||
_, err := db.Update(&snapshot, func() error {
|
||||
snapshot.StorageId = targetStorageId
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
task.TaskFailed(ctx, disk, jsonutils.NewString(fmt.Sprintf("db failed update disk snapshot %s %s", snapshot.Id, err)))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
task.SetStage("OnDeallocateSourceDisk", nil)
|
||||
err = srcHost.GetHostDriver().RequestDeallocateDiskOnHost(ctx, srcHost, srcStorage, disk, true, task)
|
||||
if err != nil {
|
||||
task.TaskFailed(ctx, disk, jsonutils.NewString(fmt.Sprintf("failed deallocate disk on src storage %s", err)))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (task *DiskMigrateTask) OnDiskMigrateFailed(ctx context.Context, disk *models.SDisk, data jsonutils.JSONObject) {
|
||||
task.TaskFailed(ctx, disk, data)
|
||||
}
|
||||
|
||||
func (task *DiskMigrateTask) OnDeallocateSourceDisk(ctx context.Context, disk *models.SDisk, data jsonutils.JSONObject) {
|
||||
db.Update(disk, func() error {
|
||||
disk.Status = compute.DISK_READY
|
||||
return nil
|
||||
})
|
||||
task.SetStageComplete(ctx, nil)
|
||||
db.OpsLog.LogEvent(disk, db.ACT_MIGRATE, "OnDeallocateSourceDisk", task.UserCred)
|
||||
logclient.AddActionLogWithContext(ctx, disk, logclient.ACT_MIGRATE, task.Params, task.UserCred, true)
|
||||
}
|
||||
|
||||
func (task *DiskMigrateTask) OnDeallocateSourceDiskFailed(ctx context.Context, disk *models.SDisk, data jsonutils.JSONObject) {
|
||||
task.TaskFailed(ctx, disk, data)
|
||||
}
|
||||
@@ -140,6 +140,11 @@ func (d *SLocalDisk) Delete(ctx context.Context, params interface{}) (jsonutils.
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if p.CleanSnapshots {
|
||||
if err := d.DeleteAllSnapshot(p.SkipRecycle != nil && *p.SkipRecycle); err != nil {
|
||||
return nil, errors.Wrap(err, "delete snapshots")
|
||||
}
|
||||
}
|
||||
|
||||
d.Storage.RemoveDisk(d)
|
||||
return nil, nil
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
"yunion.io/x/log"
|
||||
"yunion.io/x/pkg/errors"
|
||||
"yunion.io/x/pkg/util/regutils"
|
||||
"yunion.io/x/pkg/utils"
|
||||
|
||||
"yunion.io/x/onecloud/pkg/apis/compute"
|
||||
"yunion.io/x/onecloud/pkg/appsrv"
|
||||
@@ -41,15 +42,17 @@ var (
|
||||
snapshotKeywords = []string{"snapshots"}
|
||||
|
||||
actionFuncs = map[string]actionFunc{
|
||||
"create": diskCreate,
|
||||
"delete": diskDelete,
|
||||
"resize": diskResize,
|
||||
"save-prepare": diskSavePrepare,
|
||||
"reset": diskReset,
|
||||
"snapshot": diskSnapshot,
|
||||
"delete-snapshot": diskDeleteSnapshot,
|
||||
"cleanup-snapshots": diskCleanupSnapshots,
|
||||
"backup": diskBackup,
|
||||
"create": diskCreate,
|
||||
"delete": diskDelete,
|
||||
"resize": diskResize,
|
||||
"save-prepare": diskSavePrepare,
|
||||
"reset": diskReset,
|
||||
"snapshot": diskSnapshot,
|
||||
"delete-snapshot": diskDeleteSnapshot,
|
||||
"cleanup-snapshots": diskCleanupSnapshots,
|
||||
"backup": diskBackup,
|
||||
"src-migrate-prepare": diskSrcMigratePrepare,
|
||||
"migrate": diskMigrate,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -239,7 +242,7 @@ func performDiskActions(ctx context.Context, w http.ResponseWriter, r *http.Requ
|
||||
var err error
|
||||
|
||||
rebuild, _ := body.Bool("disk", "rebuild")
|
||||
if action != "create" || rebuild {
|
||||
if !utils.IsInStringArray(action, []string{"create", "migrate"}) || rebuild {
|
||||
disk, err = storage.GetDiskById(diskId)
|
||||
if err != nil {
|
||||
hostutils.Response(ctx, w, httperrors.NewGeneralError(errors.Wrapf(err, "GetDiskById(%s)", diskId)))
|
||||
@@ -281,8 +284,14 @@ func diskCreate(ctx context.Context, userCred mcclient.TokenCredential, storage
|
||||
|
||||
func diskDelete(ctx context.Context, userCred mcclient.TokenCredential, storage storageman.IStorage, diskId string, disk storageman.IDisk, body jsonutils.JSONObject) (interface{}, error) {
|
||||
flatPath, _ := body.GetString("esxi_flat_file_path")
|
||||
input := compute.DiskDeleteInput{
|
||||
EsxiFlatFilePath: flatPath,
|
||||
|
||||
// Only local storage support clean snapshots
|
||||
CleanSnapshots: jsonutils.QueryBoolean(body, "clean_snapshots", false),
|
||||
}
|
||||
if disk != nil {
|
||||
hostutils.DelayTask(ctx, disk.Delete, compute.DiskDeleteInput{EsxiFlatFilePath: flatPath})
|
||||
hostutils.DelayTask(ctx, disk.Delete, input)
|
||||
} else {
|
||||
hostutils.DelayTask(ctx, nil, nil)
|
||||
}
|
||||
@@ -327,6 +336,64 @@ func diskReset(ctx context.Context, userCred mcclient.TokenCredential, storage s
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func diskSrcMigratePrepare(ctx context.Context, userCred mcclient.TokenCredential, storage storageman.IStorage, diskId string, disk storageman.IDisk, body jsonutils.JSONObject) (interface{}, error) {
|
||||
snaps, back, hasTemplate, err := disk.PrepareMigrate(false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ret := jsonutils.NewDict()
|
||||
if len(back) > 0 {
|
||||
ret.Set("disk_back", jsonutils.NewString(back))
|
||||
}
|
||||
if len(snaps) > 0 {
|
||||
ret.Set("disk_snaps_chain", jsonutils.NewStringArray(snaps))
|
||||
}
|
||||
if hasTemplate {
|
||||
ret.Set("sys_disk_has_template", jsonutils.JSONTrue)
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func diskMigrate(ctx context.Context, userCred mcclient.TokenCredential, storage storageman.IStorage, diskId string, disk storageman.IDisk, body jsonutils.JSONObject) (interface{}, error) {
|
||||
srcStorageId, _ := body.GetString("src_storage_id")
|
||||
if srcStorageId == "" {
|
||||
return nil, httperrors.NewMissingParameterError("src_storage_id")
|
||||
}
|
||||
snapshotsUri, _ := body.GetString("snapshots_uri")
|
||||
if snapshotsUri == "" {
|
||||
return nil, httperrors.NewMissingParameterError("snapshots_uri")
|
||||
}
|
||||
diskUri, _ := body.GetString("disk_uri")
|
||||
if diskUri == "" {
|
||||
return nil, httperrors.NewMissingParameterError("disk_uri")
|
||||
}
|
||||
|
||||
templateId, _ := body.GetString("template_id")
|
||||
sysDiskHasTemplate := jsonutils.QueryBoolean(body, "sys_disk_has_template", false)
|
||||
diskBackingFile, _ := body.GetString("disk_back")
|
||||
|
||||
outChainSnaps, _ := body.GetArray("out_chain_snapshots")
|
||||
diskSnapsChain, _ := body.GetArray("disk_snaps_chain")
|
||||
|
||||
params := storageman.SDiskMigrate{
|
||||
DiskId: diskId,
|
||||
Disk: disk,
|
||||
Storage: storage,
|
||||
|
||||
DiskUri: diskUri,
|
||||
SnapshotsUri: snapshotsUri,
|
||||
SrcStorageId: srcStorageId,
|
||||
TemplateId: templateId,
|
||||
DiskBackingFile: diskBackingFile,
|
||||
SysDiskHasTemplate: sysDiskHasTemplate,
|
||||
|
||||
OutChainSnaps: outChainSnaps,
|
||||
SnapsChain: diskSnapsChain,
|
||||
}
|
||||
hostutils.DelayTask(ctx, storage.DiskMigrate, ¶ms)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func diskSnapshot(ctx context.Context, userCred mcclient.TokenCredential, storage storageman.IStorage, diskId string, disk storageman.IDisk, body jsonutils.JSONObject) (interface{}, error) {
|
||||
snapshotId, err := body.GetString("snapshot_id")
|
||||
if err != nil {
|
||||
|
||||
@@ -125,6 +125,7 @@ type IStorage interface {
|
||||
CreateDiskFromSnapshot(context.Context, IDisk, *SDiskCreateByDiskinfo) error
|
||||
CreateDiskFromExistingPath(context.Context, IDisk, *SDiskCreateByDiskinfo) error
|
||||
CreateDiskFromBackup(context.Context, IDisk, *SDiskCreateByDiskinfo) error
|
||||
DiskMigrate(context.Context, interface{}) (jsonutils.JSONObject, error)
|
||||
|
||||
// GetCloneTargetDiskPath generate target disk path by target disk id
|
||||
GetCloneTargetDiskPath(ctx context.Context, targetDiskId string) string
|
||||
@@ -481,6 +482,10 @@ func (s *SBaseStorage) CreateDiskFromBackup(ctx context.Context, disk IDisk, inp
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SBaseStorage) DiskMigrate(context.Context, interface{}) (jsonutils.JSONObject, error) {
|
||||
return nil, httperrors.ErrNotImplemented
|
||||
}
|
||||
|
||||
func (s *SBaseStorage) onSaveToGlanceFailed(ctx context.Context, imageId string, reason string) {
|
||||
params := jsonutils.NewDict()
|
||||
params.Set("status", jsonutils.NewString("killed"))
|
||||
|
||||
@@ -789,6 +789,80 @@ func doRebaseDisk(diskPath, newBasePath string, encInfo *apis.SEncryptInfo) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SLocalStorage) DiskMigrate(ctx context.Context, params interface{}) (jsonutils.JSONObject, error) {
|
||||
input := params.(*SDiskMigrate)
|
||||
|
||||
disk := s.CreateDisk(input.DiskId)
|
||||
snapshots := input.SnapsChain
|
||||
diskOutChainSnaps := input.OutChainSnaps
|
||||
// prepare disk snapshot dir
|
||||
if len(snapshots) > 0 && !fileutils2.Exists(disk.GetSnapshotDir()) {
|
||||
output, err := procutils.NewCommand("mkdir", "-p", disk.GetSnapshotDir()).Output()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "mkdir %s failed: %s", disk.GetSnapshotDir(), output)
|
||||
}
|
||||
}
|
||||
|
||||
baseImagePath := ""
|
||||
templateId := input.TemplateId
|
||||
for i, snapshotId := range snapshots {
|
||||
snapId, _ := snapshotId.GetString()
|
||||
snapshotUrl := fmt.Sprintf("%s/%s/%s/%s",
|
||||
input.SnapshotsUri, input.SrcStorageId, input.DiskId, snapId)
|
||||
snapshotPath := path.Join(disk.GetSnapshotDir(), snapId)
|
||||
log.Infof("Disk %s snapshot %s url: %s", input.DiskId, snapId, snapshotUrl)
|
||||
if err := s.CreateSnapshotFormUrl(ctx, snapshotUrl, input.DiskId, snapshotPath); err != nil {
|
||||
return nil, errors.Wrap(err, "create from snapshot url failed")
|
||||
}
|
||||
if i == 0 && len(templateId) > 0 && input.SysDiskHasTemplate {
|
||||
templatePath := path.Join(storageManager.LocalStorageImagecacheManager.GetPath(), templateId)
|
||||
if err := doRebaseDisk(snapshotPath, templatePath, nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else if len(baseImagePath) > 0 {
|
||||
if err := doRebaseDisk(snapshotPath, baseImagePath, nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
baseImagePath = snapshotPath
|
||||
}
|
||||
|
||||
for _, snapshotId := range diskOutChainSnaps {
|
||||
snapId, _ := snapshotId.GetString()
|
||||
snapshotUrl := fmt.Sprintf("%s/%s/%s/%s",
|
||||
input.SnapshotsUri, input.SrcStorageId, input.DiskId, snapId)
|
||||
snapshotPath := path.Join(disk.GetSnapshotDir(), snapId)
|
||||
log.Infof("Disk %s snapshot %s url: %s", input.DiskId, snapId, snapshotUrl)
|
||||
if err := s.CreateSnapshotFormUrl(ctx, snapshotUrl, input.DiskId, snapshotPath); err != nil {
|
||||
return nil, errors.Wrap(err, "create from snapshot url failed")
|
||||
}
|
||||
}
|
||||
|
||||
// download disk form remote url
|
||||
diskUrl := fmt.Sprintf("%s/%s/%s", input.DiskUri, input.SrcStorageId, input.DiskId)
|
||||
err := disk.CreateFromUrl(ctx, diskUrl, 0, func(progress, progressMbps float64, totalSizeMb int64) {
|
||||
log.Debugf("[%.2f / %d] disk %s create %.2f with speed %.2fMbps",
|
||||
progress*float64(totalSizeMb)/100, totalSizeMb, disk.GetId(), progress, progressMbps)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "CreateFromUrl")
|
||||
}
|
||||
if len(templateId) > 0 && len(baseImagePath) == 0 {
|
||||
templatePath := path.Join(storageManager.LocalStorageImagecacheManager.GetPath(), templateId)
|
||||
if err := doRebaseDisk(disk.GetPath(), templatePath, nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else if len(baseImagePath) > 0 {
|
||||
if err := doRebaseDisk(disk.GetPath(), baseImagePath, nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
res := jsonutils.NewDict()
|
||||
res.Set("disk_path", jsonutils.NewString(disk.GetPath()))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *SLocalStorage) CreateDiskFromSnapshot(
|
||||
ctx context.Context, disk IDisk, input *SDiskCreateByDiskinfo,
|
||||
) error {
|
||||
|
||||
@@ -35,6 +35,21 @@ func (i *SDiskCreateByDiskinfo) String() string {
|
||||
return fmt.Sprintf("disk_id: %s, disk_info: %s", i.DiskId, jsonutils.Marshal(i.DiskInfo))
|
||||
}
|
||||
|
||||
type SDiskMigrate struct {
|
||||
DiskId string
|
||||
Disk IDisk
|
||||
SrcStorageId string
|
||||
TemplateId string
|
||||
OutChainSnaps []jsonutils.JSONObject
|
||||
SnapsChain []jsonutils.JSONObject
|
||||
DiskBackingFile string
|
||||
SnapshotsUri string
|
||||
DiskUri string
|
||||
SysDiskHasTemplate bool
|
||||
|
||||
Storage IStorage
|
||||
}
|
||||
|
||||
type SDiskReset struct {
|
||||
SnapshotId string
|
||||
BackingDiskId string
|
||||
|
||||
@@ -15,8 +15,11 @@
|
||||
package compute
|
||||
|
||||
import (
|
||||
"yunion.io/x/jsonutils"
|
||||
|
||||
api "yunion.io/x/onecloud/pkg/apis/compute"
|
||||
"yunion.io/x/onecloud/pkg/cloudcommon/cmdline"
|
||||
"yunion.io/x/onecloud/pkg/mcclient/options"
|
||||
)
|
||||
|
||||
type DiskCreateOptions struct {
|
||||
@@ -74,3 +77,17 @@ func (o DiskCreateOptions) Params() (*api.DiskCreateInput, error) {
|
||||
params.BackupId = o.BackupId
|
||||
return params, nil
|
||||
}
|
||||
|
||||
type DiskMigrateOptions struct {
|
||||
ID string `help:"ID of the server" json:"-"`
|
||||
|
||||
TargetStorageId string `help:"Disk migrate target storage id or name" json:"target_storage_id"`
|
||||
}
|
||||
|
||||
func (o *DiskMigrateOptions) GetId() string {
|
||||
return o.ID
|
||||
}
|
||||
|
||||
func (o *DiskMigrateOptions) Params() (jsonutils.JSONObject, error) {
|
||||
return options.StructToParams(o)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user