Files
cloudpods/pkg/compute/models/storages.go
2026-02-08 12:57:34 +08:00

2177 lines
71 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package models
import (
"context"
"database/sql"
"fmt"
"path"
"strconv"
"strings"
"yunion.io/x/cloudmux/pkg/cloudprovider"
"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/errors"
"yunion.io/x/pkg/gotypes"
"yunion.io/x/pkg/tristate"
"yunion.io/x/pkg/util/compare"
"yunion.io/x/pkg/util/rbacscope"
"yunion.io/x/pkg/utils"
"yunion.io/x/sqlchemy"
"yunion.io/x/onecloud/pkg/apis"
billing_api "yunion.io/x/onecloud/pkg/apis/billing"
api "yunion.io/x/onecloud/pkg/apis/compute"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
"yunion.io/x/onecloud/pkg/cloudcommon/validators"
"yunion.io/x/onecloud/pkg/compute/options"
"yunion.io/x/onecloud/pkg/httperrors"
"yunion.io/x/onecloud/pkg/mcclient"
"yunion.io/x/onecloud/pkg/util/logclient"
"yunion.io/x/onecloud/pkg/util/rbacutils"
"yunion.io/x/onecloud/pkg/util/stringutils2"
)
type SStorageManager struct {
db.SEnabledStatusInfrasResourceBaseManager
db.SExternalizedResourceBaseManager
SManagedResourceBaseManager
SZoneResourceBaseManager
}
var StorageManager *SStorageManager
func init() {
StorageManager = &SStorageManager{
SEnabledStatusInfrasResourceBaseManager: db.NewEnabledStatusInfrasResourceBaseManager(
SStorage{},
"storages_tbl",
"storage",
"storages",
),
}
StorageManager.SetVirtualObject(StorageManager)
StorageManager.TableSpec().AddIndex(false, "deleted", "status", "enabled", "zone_id", "storagecache_id")
}
type SStorage struct {
db.SEnabledStatusInfrasResourceBase `"status->default":"offline" "status->update":"domain" "enabled->default":"true"`
db.SExternalizedResourceBase
SManagedResourceBase
SZoneResourceBase `update:""`
// 容量大小,单位Mb
Capacity int64 `nullable:"false" list:"user" update:"domain" create:"domain_required"`
// 实际容量大小单位Mb
// we always expect actual capacity great or equal than zero, otherwise something wrong
ActualCapacityUsed int64 `nullable:"true" list:"user" update:"domain" create:"domain_optional"`
// 预留容量大小
Reserved int64 `nullable:"true" default:"0" list:"domain" update:"domain" create:"domain_optional"`
// 存储类型
// example: local
StorageType string `width:"64" charset:"ascii" nullable:"false" list:"user" create:"domain_required"`
// 介质类型
// example: ssd
MediumType string `width:"32" charset:"ascii" nullable:"false" list:"user" update:"domain" create:"domain_required"`
// 超售比
Cmtbound float32 `nullable:"true" list:"domain"`
// 存储配置信息
StorageConf jsonutils.JSONObject `nullable:"true" get:"domain" list:"domain" update:"domain"`
// 存储缓存Id
StoragecacheId string `width:"36" charset:"ascii" nullable:"true" list:"domain" get:"domain" update:"domain" create:"domain_optional"`
// master host id
MasterHost string `width:"36" charset:"ascii" nullable:"true" list:"user" json:"master_host"`
// indicating whether system disk can be allocated in this storage
// 是否可以用作系统盘存储
// example: true
IsSysDiskStore tristate.TriState `default:"true" list:"user" create:"optional" update:"domain"`
}
func (manager *SStorageManager) GetContextManagers() [][]db.IModelManager {
return [][]db.IModelManager{
{ZoneManager},
{StoragecacheManager},
}
}
func (self *SStorage) ValidateUpdateData(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input api.StorageUpdateInput) (api.StorageUpdateInput, error) {
var err error
input.EnabledStatusInfrasResourceBaseUpdateInput, err = self.SEnabledStatusInfrasResourceBase.ValidateUpdateData(ctx, userCred, query, input.EnabledStatusInfrasResourceBaseUpdateInput)
if err != nil {
return input, err
}
if gotypes.IsNil(input.StorageConf) {
input.StorageConf = jsonutils.NewDict()
}
if self.StorageConf != nil {
confs, _ := self.StorageConf.GetMap()
for k, v := range confs {
if input.StorageConf.Contains(k) {
continue
}
input.StorageConf.Set(k, v)
}
}
if input.MasterHost != "" {
host, err := HostManager.FetchByIdOrName(ctx, userCred, input.MasterHost)
if err != nil {
return input, httperrors.NewInputParameterError("get host %s failed", input.MasterHost)
}
input.MasterHost = host.GetId()
}
driver := GetStorageDriver(self.StorageType)
if driver != nil {
return driver.ValidateUpdateData(ctx, userCred, input)
}
return input, nil
}
func (self *SStorage) PostUpdate(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) {
self.SEnabledStatusInfrasResourceBase.PostUpdate(ctx, userCred, query, data)
if err := self.setHardwareInfoByData(ctx, userCred, data); err != nil {
log.Errorf("setHardwareInfo when post update eror: %v", err)
}
if data.Contains("cmtbound") || data.Contains("capacity") {
hosts, _ := self.GetAttachedHosts()
for _, host := range hosts {
if err := host.ClearSchedDescCache(); err != nil {
log.Errorf("clear host %s sched cache failed %v", host.GetName(), err)
}
}
}
if masterHost, _ := data.GetString("master_host"); masterHost != "" {
storageCache := self.GetStoragecache()
if storageCache.MasterHost != masterHost {
_, err := db.Update(storageCache, func() error {
storageCache.MasterHost = masterHost
return nil
})
if err != nil {
log.Errorf("failed update storage master host")
}
}
}
if update, _ := data.Bool("update_storage_conf"); update {
self.StartStorageUpdateTask(ctx, userCred)
}
}
func (self *SStorage) StartStorageUpdateTask(ctx context.Context, userCred mcclient.TokenCredential) error {
task, err := taskman.TaskManager.NewTask(ctx, "StorageUpdateTask", self, userCred, nil, "", "", nil)
if err != nil {
return err
}
task.ScheduleRun(nil)
return nil
}
func (self *SStorage) getFakeDeletedSnapshots() ([]SSnapshot, error) {
q := SnapshotManager.Query().Equals("storage_id", self.Id).IsTrue("fake_deleted")
snapshots := make([]SSnapshot, 0)
err := db.FetchModelObjects(SnapshotManager, q, &snapshots)
if err != nil {
return nil, errors.Wrap(err, "FetchModelObjects")
}
return snapshots, nil
}
func (self *SStorage) Delete(ctx context.Context, userCred mcclient.TokenCredential) error {
ok, err := self.IsNeedDeleteStoragecache()
if err != nil {
return err
}
if ok {
cache := self.GetStoragecache()
if cache != nil {
cache.Delete(ctx, userCred)
}
}
if len(self.ManagerId) > 0 {
db.SharedResourceManager.CleanModelShares(ctx, userCred, self.GetIInfrasModel())
return self.purge(ctx, userCred)
}
return self.purge(ctx, userCred)
}
func (self *SStorage) IsNeedDeleteStoragecache() (bool, error) {
q := StorageManager.Query().Equals("storagecache_id", self.StoragecacheId).NotEquals("id", self.Id)
cnt, err := q.CountWithError()
if err != nil {
return false, err
}
return cnt == 0, nil
}
func (manager *SStorageManager) GetStorageTypesByProvider(provider string) ([]string, error) {
q := manager.Query("storage_type")
providers := CloudproviderManager.Query().SubQuery()
q = q.Join(providers, sqlchemy.Equals(q.Field("manager_id"), providers.Field("id"))).
Filter(sqlchemy.Equals(providers.Field("provider"), provider)).Distinct()
storages := []string{}
rows, err := q.Rows()
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var storage string
err = rows.Scan(&storage)
if err != nil {
return nil, errors.Wrap(err, "rows.Scan(&storage)")
}
storages = append(storages, storage)
}
return storages, nil
}
func (self *SStorage) IsNeedDeactivateOnAllHost() bool {
return self.StorageType == api.STORAGE_SLVM
}
func (manager *SStorageManager) GetStorageTypesByHostType(hostType string) ([]string, error) {
q := manager.Query("storage_type")
hosts := HostManager.Query().SubQuery()
hs := HoststorageManager.Query().SubQuery()
q = q.Join(hs, sqlchemy.Equals(q.Field("id"), hs.Field("storage_id"))).
Join(hosts, sqlchemy.Equals(hosts.Field("id"), hs.Field("host_id"))).
Filter(sqlchemy.Equals(hosts.Field("host_type"), hostType)).Distinct()
storages := []string{}
rows, err := q.Rows()
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var storage string
err = rows.Scan(&storage)
if err != nil {
return nil, errors.Wrap(err, "rows.Scan(&storage)")
}
storages = append(storages, storage)
}
return storages, nil
}
func (manager *SStorageManager) ValidateCreateData(
ctx context.Context,
userCred mcclient.TokenCredential,
ownerId mcclient.IIdentityProvider,
query jsonutils.JSONObject,
input api.StorageCreateInput,
) (api.StorageCreateInput, error) {
if !utils.IsInStringArray(input.StorageType, api.STORAGE_TYPES) {
return input, httperrors.NewInputParameterError("Invalid storage type %s", input.StorageType)
}
if len(input.MediumType) == 0 {
input.MediumType = api.DISK_TYPE_SSD
}
if !utils.IsInStringArray(input.MediumType, api.DISK_TYPES) {
return input, httperrors.NewInputParameterError("Invalid medium type %s", input.MediumType)
}
if len(input.ZoneId) == 0 {
return input, httperrors.NewMissingParameterError("zone_id")
}
_, err := validators.ValidateModel(ctx, userCred, ZoneManager, &input.ZoneId)
if err != nil {
return input, err
}
storageDirver := GetStorageDriver(input.StorageType)
if storageDirver == nil {
return input, httperrors.NewUnsupportOperationError("Not support create %s storage", input.StorageType)
}
err = storageDirver.ValidateCreateData(ctx, userCred, &input)
if err != nil {
return input, errors.Wrap(err, "storageDirver.ValidateCreateData")
}
input.EnabledStatusInfrasResourceBaseCreateInput, err = manager.SEnabledStatusInfrasResourceBaseManager.ValidateCreateData(ctx, userCred, ownerId, query, input.EnabledStatusInfrasResourceBaseCreateInput)
if err != nil {
return input, errors.Wrap(err, "SEnabledStatusInfrasResourceBaseManager.ValidateCreateData")
}
return input, nil
}
func (self *SStorage) CustomizeCreate(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data jsonutils.JSONObject) error {
self.SetEnabled(true)
self.SetStatus(ctx, userCred, api.STORAGE_UNMOUNT, "CustomizeCreate")
if err := self.setHardwareInfoByData(ctx, userCred, data); err != nil {
return errors.Wrap(err, "setHardwareInfo")
}
return self.SEnabledStatusInfrasResourceBase.CustomizeCreate(ctx, userCred, ownerId, query, data)
}
func (self *SStorage) setHardwareInfoByData(ctx context.Context, userCred mcclient.TokenCredential, data jsonutils.JSONObject) error {
var hdInfo *api.StorageHardwareInfo = nil
if data.Contains("hardware_info") {
hdInfo = new(api.StorageHardwareInfo)
data.Unmarshal(hdInfo, "hardware_info")
}
if err := self.setHardwareInfo(ctx, userCred, hdInfo); err != nil {
return errors.Wrap(err, "setHardwareInfo")
}
return nil
}
func (self *SStorage) setHardwareInfo(ctx context.Context, userCred mcclient.TokenCredential, info *api.StorageHardwareInfo) error {
if info == nil {
return nil
}
for k, v := range map[string]*string{
api.STORAGE_METADATA_MODEL: info.Model,
api.STORAGE_METADATA_VENDOR: info.Vendor,
} {
if v != nil {
if err := self.SetMetadata(ctx, k, *v, userCred); err != nil {
return errors.Wrapf(err, "set metadata %s = %s", k, *v)
}
}
}
if info.Bandwidth != 0 {
if err := self.SetMetadata(ctx, api.STORAGE_METADATA_BANDWIDTH, info.Bandwidth, userCred); err != nil {
return errors.Wrapf(err, "set metadata %s = %f", api.STORAGE_METADATA_BANDWIDTH, info.Bandwidth)
}
}
return nil
}
func (self *SStorage) ValidateDeleteCondition(ctx context.Context, info api.StorageDetails) error {
if !info.IsZero() {
return httperrors.NewNotEmptyError("storage has resources with %s", jsonutils.Marshal(info.StorageUsage).String())
}
return self.SEnabledStatusInfrasResourceBase.ValidateDeleteCondition(ctx, nil)
}
func (self *SStorage) PostCreate(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, data jsonutils.JSONObject) {
self.SEnabledStatusInfrasResourceBase.PostCreate(ctx, userCred, ownerId, query, data)
storageDriver := GetStorageDriver(self.StorageType)
if storageDriver != nil {
storageDriver.PostCreate(ctx, userCred, self, data)
}
}
func (self *SStorage) SetStatus(ctx context.Context, userCred mcclient.TokenCredential, status string, reason string) error {
if self.Status == status {
return nil
}
oldStatus := self.Status
_, err := db.Update(self, func() error {
self.Status = status
return nil
})
if err != nil {
return err
}
if userCred != nil {
notes := fmt.Sprintf("%s=>%s", oldStatus, status)
if len(reason) > 0 {
notes = fmt.Sprintf("%s: %s", notes, reason)
}
db.OpsLog.LogEvent(self, db.ACT_UPDATE_STATUS, notes, userCred)
// if strings.Contains(notes, "fail") {
// logclient.AddActionLogWithContext(ctx, self, logclient.ACT_VM_SYNC_STATUS, notes, userCred, false)
// }
}
return nil
}
func (self *SStorage) PerformEnable(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
if self.Enabled.IsFalse() {
_, err := db.Update(self, func() error {
self.Enabled = tristate.True
return nil
})
if err != nil {
log.Errorf("PerformEnable save update fail %s", err)
return nil, err
}
db.OpsLog.LogEvent(self, db.ACT_ENABLE, "", userCred)
logclient.AddSimpleActionLog(self, logclient.ACT_ENABLE, nil, userCred, true)
self.ClearSchedDescCache()
}
return nil, nil
}
func (self *SStorage) PerformDisable(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
if self.Enabled.IsTrue() {
_, err := db.Update(self, func() error {
self.Enabled = tristate.False
return nil
})
if err != nil {
log.Errorf("PerformDisable save update fail %s", err)
return nil, err
}
db.OpsLog.LogEvent(self, db.ACT_DISABLE, "", userCred)
logclient.AddSimpleActionLog(self, logclient.ACT_DISABLE, nil, userCred, true)
self.ClearSchedDescCache()
}
return nil, nil
}
func (self *SStorage) PerformOnline(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
if self.Status != api.STORAGE_ONLINE {
err := self.SetStatus(ctx, userCred, api.STORAGE_ONLINE, "")
if err != nil {
return nil, err
}
db.OpsLog.LogEvent(self, db.ACT_ONLINE, "", userCred)
self.ClearSchedDescCache()
}
return nil, nil
}
func (self *SStorage) PerformOffline(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
if self.Status != api.STORAGE_OFFLINE {
err := self.SetStatus(ctx, userCred, api.STORAGE_OFFLINE, data.String())
if err != nil {
return nil, err
}
db.OpsLog.LogEvent(self, db.ACT_OFFLINE, "", userCred)
self.ClearSchedDescCache()
}
return nil, nil
}
func (self *SStorage) GetHostCount() (int, error) {
return HoststorageManager.Query().Equals("storage_id", self.Id).CountWithError()
}
func (self *SStorage) GetDiskCount() (int, error) {
return DiskManager.Query().Equals("storage_id", self.Id).CountWithError()
}
func (self *SStorage) GetDisks() []SDisk {
disks := make([]SDisk, 0)
q := DiskManager.Query().Equals("storage_id", self.Id)
err := db.FetchModelObjects(DiskManager, q, &disks)
if err != nil {
log.Errorf("GetDisks fail %s", err)
return nil
}
return disks
}
func (self *SStorage) GetVisibleSnapshotCount() (int, error) {
return SnapshotManager.Query().Equals("storage_id", self.Id).IsFalse("fake_deleted").CountWithError()
}
func (self *SStorage) IsLocal() bool {
return utils.IsInStringArray(self.StorageType, api.HOST_STORAGE_LOCAL_TYPES)
}
func (self *SStorage) GetStorageCachePath(mountPoint, imageCachePath string) string {
if utils.IsInStringArray(self.StorageType, api.SHARED_FILE_STORAGE) {
return path.Join(mountPoint, imageCachePath)
} else if self.StorageType == api.STORAGE_LVM {
return mountPoint
} else {
return imageCachePath
}
}
func (self *SStorage) getStorageCapacity() SStorageCapacity {
capa := SStorageCapacity{}
capa.Capacity = self.GetCapacity()
capa.Used = self.GetUsedCapacity(tristate.True)
capa.Wasted = self.GetUsedCapacity(tristate.False)
capa.VCapacity = int64(float32(self.GetCapacity()) * self.GetOvercommitBound())
capa.ActualUsed = self.ActualCapacityUsed
return capa
}
type sStorageSchedtag struct {
Id string
Name string
StorageId string
DefaultStrategy string
}
func (self *sStorageSchedtag) GetShortDesc() api.SchedtagShortDescDetails {
return api.SchedtagShortDescDetails{
StandaloneResourceShortDescDetail: &apis.StandaloneResourceShortDescDetail{
StandaloneAnonResourceShortDescDetail: apis.StandaloneAnonResourceShortDescDetail{
Id: self.Id,
},
Name: self.Name,
},
Default: self.DefaultStrategy,
}
}
func (sm *SStorageManager) query(manager db.IModelManager, field string, storageIds []string, filter func(*sqlchemy.SQuery) *sqlchemy.SQuery) *sqlchemy.SSubQuery {
q := manager.Query()
if filter != nil {
q = filter(q)
}
sq := q.SubQuery()
return sq.Query(
sq.Field("storage_id"),
sqlchemy.COUNT(field),
).In("storage_id", storageIds).GroupBy(sq.Field("storage_id")).SubQuery()
}
type StorageUsageCount struct {
Id string
api.StorageUsage
}
func (manager *SStorageManager) TotalResourceCount(storageIds []string) (map[string]api.StorageUsage, error) {
ret := map[string]api.StorageUsage{}
hostSQ := manager.query(HoststorageManager, "host_cnt", storageIds, nil)
diskSQ := manager.query(DiskManager, "disk_cnt", storageIds, nil)
diskUsed := DiskManager.Query().Equals("status", api.DISK_READY)
_diskUsedSQ := diskUsed.SubQuery()
diskUsedSQ := _diskUsedSQ.Query(
_diskUsedSQ.Field("storage_id"),
sqlchemy.SUM("disk_used", _diskUsedSQ.Field("disk_size")),
).In("storage_id", storageIds).GroupBy(_diskUsedSQ.Field("storage_id")).SubQuery()
diskWasted := DiskManager.Query().NotEquals("status", api.DISK_READY)
_diskWastedSQ := diskWasted.SubQuery()
diskWastedSQ := _diskWastedSQ.Query(
_diskWastedSQ.Field("storage_id"),
sqlchemy.SUM("disk_wasted", _diskWastedSQ.Field("disk_size")),
).In("storage_id", storageIds).GroupBy(_diskWastedSQ.Field("storage_id")).SubQuery()
snapshotSQ := manager.query(SnapshotManager, "snapshot_cnt", storageIds, func(q *sqlchemy.SQuery) *sqlchemy.SQuery {
return q.IsFalse("fake_deleted")
})
storages := manager.Query().SubQuery()
storageQ := storages.Query(
sqlchemy.SUM("host_count", hostSQ.Field("host_cnt")),
sqlchemy.SUM("disk_count", diskSQ.Field("disk_cnt")),
sqlchemy.SUM("snapshot_count", snapshotSQ.Field("snapshot_cnt")),
sqlchemy.SUM("used", diskUsedSQ.Field("disk_used")),
sqlchemy.SUM("wasted", diskWastedSQ.Field("disk_wasted")),
)
storageQ.AppendField(storageQ.Field("id"))
storageQ = storageQ.LeftJoin(hostSQ, sqlchemy.Equals(storageQ.Field("id"), hostSQ.Field("storage_id")))
storageQ = storageQ.LeftJoin(diskSQ, sqlchemy.Equals(storageQ.Field("id"), diskSQ.Field("storage_id")))
storageQ = storageQ.LeftJoin(snapshotSQ, sqlchemy.Equals(storageQ.Field("id"), snapshotSQ.Field("storage_id")))
storageQ = storageQ.LeftJoin(diskUsedSQ, sqlchemy.Equals(storageQ.Field("id"), diskUsedSQ.Field("storage_id")))
storageQ = storageQ.LeftJoin(diskWastedSQ, sqlchemy.Equals(storageQ.Field("id"), diskWastedSQ.Field("storage_id")))
storageQ = storageQ.Filter(sqlchemy.In(storageQ.Field("id"), storageIds)).GroupBy(storageQ.Field("id"))
counts := []StorageUsageCount{}
err := storageQ.All(&counts)
if err != nil {
return nil, errors.Wrapf(err, "storageQ.All")
}
for i := range counts {
ret[counts[i].Id] = counts[i].StorageUsage
}
return ret, nil
}
func (manager *SStorageManager) FetchCustomizeColumns(
ctx context.Context,
userCred mcclient.TokenCredential,
query jsonutils.JSONObject,
objs []interface{},
fields stringutils2.SSortedStrings,
isList bool,
) []api.StorageDetails {
rows := make([]api.StorageDetails, len(objs))
stdRows := manager.SEnabledStatusInfrasResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
zoneRows := manager.SZoneResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
manageRows := manager.SManagedResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
storageIds := make([]string, len(objs))
for i := range rows {
rows[i] = api.StorageDetails{
EnabledStatusInfrasResourceBaseDetails: stdRows[i],
ZoneResourceInfo: zoneRows[i],
ManagedResourceInfo: manageRows[i],
}
storage := objs[i].(*SStorage)
storageIds[i] = storage.Id
if rows[i].ManagerId == "" && rows[i].MasterHost == "" &&
utils.IsInStringArray(storage.StorageType, api.SHARED_STORAGE) {
if host, err := storage.GetMasterHost(); host != nil {
rows[i].MasterHost = host.Id
rows[i].MasterHostName = host.Name
} else {
log.Errorf("storage %s failed get master host %s", storageIds[i], err)
}
}
if rows[i].MasterHost != "" && rows[i].MasterHostName == "" {
if host := HostManager.FetchHostById(rows[i].MasterHost); host != nil {
rows[i].MasterHostName = host.Name
}
}
rows[i].Capacity = storage.GetCapacity()
rows[i].VCapacity = int64(float32(rows[i].Capacity) * storage.GetOvercommitBound())
rows[i].ActualUsed = storage.ActualCapacityUsed
rows[i].CommitBound = storage.GetOvercommitBound()
}
count, err := manager.TotalResourceCount(storageIds)
if err != nil {
log.Errorf("TotalResourceCount error: %v", err)
return rows
}
tags := make([]sStorageSchedtag, 0)
schedtags := SchedtagManager.Query().SubQuery()
storagetags := StorageschedtagManager.Query().IsFalse("deleted").In("storage_id", storageIds).SubQuery()
q := schedtags.Query(
schedtags.Field("id"),
schedtags.Field("name"),
schedtags.Field("default_strategy"),
storagetags.Field("storage_id"),
)
q = q.LeftJoin(storagetags, sqlchemy.Equals(storagetags.Field("schedtag_id"), schedtags.Field("id")))
err = q.All(&tags)
if err != nil {
log.Errorf("tagQ.All error: %v", err)
return rows
}
tagMap := map[string][]api.SchedtagShortDescDetails{}
for i := range tags {
desc := tags[i].GetShortDesc()
_, ok := tagMap[tags[i].StorageId]
if !ok {
tagMap[tags[i].StorageId] = []api.SchedtagShortDescDetails{}
}
tagMap[tags[i].StorageId] = append(tagMap[tags[i].StorageId], desc)
}
sq := HoststorageManager.Query().In("storage_id", storageIds).SubQuery()
hosts := HostManager.Query().SubQuery()
q = sq.Query(
sq.Field("storage_id"),
sq.Field("host_id"),
hosts.Field("name"),
hosts.Field("status"),
hosts.Field("host_status"),
).LeftJoin(hosts, sqlchemy.Equals(sq.Field("host_id"), hosts.Field("id")))
hs := []struct {
StorageId string
HostId string
Name string
Status string
HostStatus string
}{}
err = q.All(&hs)
if err != nil {
log.Errorf("query host error: %v", err)
return rows
}
hoststorages := map[string][]api.StorageHost{}
for _, h := range hs {
_, ok := hoststorages[h.StorageId]
if !ok {
hoststorages[h.StorageId] = []api.StorageHost{}
}
hoststorages[h.StorageId] = append(hoststorages[h.StorageId], api.StorageHost{
Id: h.HostId,
Name: h.Name,
Status: h.Status,
HostStatus: h.HostStatus,
})
}
for i := range rows {
rows[i].Hosts, _ = hoststorages[storageIds[i]]
tags, ok := tagMap[storageIds[i]]
if ok {
rows[i].Schedtags = tags
}
cnt, ok := count[storageIds[i]]
if ok {
rows[i].StorageUsage = cnt
}
capa := SStorageCapacity{
Capacity: rows[i].Capacity,
VCapacity: rows[i].VCapacity,
ActualUsed: rows[i].ActualUsed,
Used: rows[i].Used,
Wasted: rows[i].Wasted,
}
rows[i].SStorageCapacityInfo = capa.toCapacityInfo()
}
return rows
}
func (self *SStorage) GetUsedCapacity(isReady tristate.TriState) int64 {
disks := DiskManager.Query().SubQuery()
q := disks.Query(sqlchemy.SUM("sum", disks.Field("disk_size"))).Equals("storage_id", self.Id)
switch isReady {
case tristate.True:
q = q.Equals("status", api.DISK_READY)
case tristate.False:
q = q.NotEquals("status", api.DISK_READY)
}
row := q.Row()
// sum can be null, deal with null:
// https://github.com/golang/go/wiki/SQLInterface#dealing-with-null
var sum sql.NullInt64
err := row.Scan(&sum)
if err != nil {
log.Errorf("GetUsedCapacity fail: %s", err)
return 0
}
if sum.Valid {
return sum.Int64
} else {
return 0
}
}
func (storage *SStorage) GetOvercommitBound() float32 {
if storage.Cmtbound > 0 {
return storage.Cmtbound
} else {
return options.Options.DefaultStorageOvercommitBound
}
}
func (self *SStorage) GetMasterHost() (*SHost, error) {
if self.MasterHost != "" {
host := HostManager.FetchHostById(self.MasterHost)
if host != nil && host.Enabled.IsTrue() && host.HostStatus == api.HOST_ONLINE {
return host, nil
}
}
hosts := HostManager.Query().SubQuery()
hoststorages := HoststorageManager.Query().SubQuery()
q := hosts.Query().Join(hoststorages, sqlchemy.Equals(hoststorages.Field("host_id"), hosts.Field("id")))
q = q.Filter(sqlchemy.Equals(hoststorages.Field("storage_id"), self.Id))
q = q.IsTrue("enabled")
q = q.Equals("host_status", api.HOST_ONLINE).Asc("id")
host := SHost{}
host.SetModelManager(HostManager, &host)
err := q.First(&host)
if err != nil {
return nil, errors.Wrapf(err, "q.First")
}
if utils.IsInStringArray(self.StorageType, api.SHARED_STORAGE) {
if err := self.UpdateMasterHost(host.Id); err != nil {
log.Errorf("storage %s udpate master host failed %s: %s", self.GetName(), host.Id, err)
}
}
return &host, nil
}
func (self *SStorage) UpdateMasterHost(hostId string) error {
_, err := db.Update(self, func() error {
self.MasterHost = hostId
return nil
})
return err
}
func (self *SStorage) GetZoneId() string {
if len(self.ZoneId) > 0 {
return self.ZoneId
}
host, _ := self.GetMasterHost()
if host != nil {
_, err := db.Update(self, func() error {
self.ZoneId = host.ZoneId
return nil
})
if err != nil {
log.Errorf("%s", err)
return ""
}
return self.ZoneId
} else {
log.Errorf("No mater host for storage")
return ""
}
}
func (self *SStorage) getZone() (*SZone, error) {
zoneId := self.GetZoneId()
if len(zoneId) > 0 {
zone, err := ZoneManager.FetchById(zoneId)
if err != nil {
return nil, errors.Wrapf(err, "GetZone(%s)", zoneId)
}
return zone.(*SZone), nil
}
return nil, fmt.Errorf("empty zoneId for storage %s(%s)", self.Name, self.Id)
}
func (self *SStorage) GetRegion() (*SCloudregion, error) {
zone, err := self.getZone()
if err != nil {
return nil, errors.Wrapf(err, "getZone")
}
return zone.GetRegion()
}
func (self *SStorage) GetReserved() int64 {
return self.Reserved
}
func (self *SStorage) GetCapacity() int64 {
return self.Capacity - self.GetReserved()
}
func (self *SStorage) GetFreeCapacity() int64 {
return int64(float32(self.GetCapacity())*self.GetOvercommitBound()) - self.GetUsedCapacity(tristate.None)
}
func (self *SStorage) GetAttachedHosts() ([]SHost, error) {
hosts := HostManager.Query().SubQuery()
hoststorages := HoststorageManager.Query().SubQuery()
q := hosts.Query()
q = q.Join(hoststorages, sqlchemy.Equals(hoststorages.Field("host_id"), hosts.Field("id")))
q = q.Filter(sqlchemy.Equals(hoststorages.Field("storage_id"), self.Id))
hostList := make([]SHost, 0)
err := db.FetchModelObjects(HostManager, q, &hostList)
if err != nil {
return nil, errors.Wrapf(err, "GetAttachedHosts")
}
return hostList, nil
}
func (self *SStorage) SyncStatusWithHosts(ctx context.Context) {
hosts, err := self.GetAttachedHosts()
if err != nil {
log.Errorf("storage.SyncStatusWithHosts: GetAttachedHosts fail %s", err)
return
}
total := 0
online := 0
offline := 0
for _, h := range hosts {
if h.HostStatus == api.HOST_ONLINE {
online += 1
} else {
offline += 1
}
total += 1
}
var status string
if !self.IsLocal() {
status = self.Status
if online == 0 {
status = api.STORAGE_OFFLINE
} else {
status = api.STORAGE_ONLINE
}
} else if online > 0 {
status = api.STORAGE_ONLINE
} else if offline > 0 {
status = api.STORAGE_OFFLINE
} else {
status = api.STORAGE_OFFLINE
}
if len(hosts) == 0 {
status = api.STORAGE_UNMOUNT
}
if status != self.Status {
log.Infof("Storage %s(%s) status %s expect %s online %d", self.Name, self.Id, self.Status, status, online)
self.SetStatus(ctx, nil, status, "SyncStatusWithHosts")
}
}
func (manager *SStorageManager) getStoragesByZone(zone *SZone, provider *SCloudprovider) ([]SStorage, error) {
storages := make([]SStorage, 0)
q := manager.Query()
if zone != nil {
q = q.Equals("zone_id", zone.Id)
}
if provider != nil {
q = q.Equals("manager_id", provider.Id)
}
err := db.FetchModelObjects(manager, q, &storages)
if err != nil {
log.Errorf("getStoragesByZoneId fail %s", err)
return nil, err
}
return storages, nil
}
func (manager *SStorageManager) GetStorageByStoragecache(storagecacheId string) (*SStorage, error) {
s := SStorage{}
s.SetModelManager(StorageManager, &s)
err := manager.Query().Equals("storagecache_id", storagecacheId).First(&s)
if err != nil {
return nil, errors.Wrap(err, "get storage by storagecache")
}
return &s, nil
}
func (manager *SStorageManager) scanLegacyStorages() error {
storages := make([]SStorage, 0)
table := manager.Query().SubQuery()
q := table.Query().Filter(sqlchemy.OR(sqlchemy.IsNull(table.Field("zone_id")), sqlchemy.IsEmpty(table.Field("zone_id"))))
err := db.FetchModelObjects(manager, q, &storages)
if err != nil {
log.Errorf("getLegacyStoragesByZoneId fail %s", err)
return err
}
for i := 0; i < len(storages); i += 1 {
storages[i].GetZoneId()
}
return nil
}
func (manager *SStorageManager) SyncStorages(ctx context.Context, userCred mcclient.TokenCredential, provider *SCloudprovider, zone *SZone, storages []cloudprovider.ICloudStorage, xor bool) ([]SStorage, []cloudprovider.ICloudStorage, compare.SyncResult) {
var resId string
if zone != nil {
resId = fmt.Sprintf("%s-%s", provider.Id, zone.Id)
} else {
resId = provider.Id
}
lockman.LockRawObject(ctx, "storages", resId)
defer lockman.ReleaseRawObject(ctx, "storages", resId)
localStorages := make([]SStorage, 0)
remoteStorages := make([]cloudprovider.ICloudStorage, 0)
syncResult := compare.SyncResult{}
err := manager.scanLegacyStorages()
if err != nil {
syncResult.Error(err)
return nil, nil, syncResult
}
dbStorages, err := manager.getStoragesByZone(zone, provider)
if err != nil {
syncResult.Error(err)
return nil, nil, syncResult
}
removed := make([]SStorage, 0)
commondb := make([]SStorage, 0)
commonext := make([]cloudprovider.ICloudStorage, 0)
added := make([]cloudprovider.ICloudStorage, 0)
err = compare.CompareSets(dbStorages, storages, &removed, &commondb, &commonext, &added)
if err != nil {
syncResult.Error(err)
return nil, nil, syncResult
}
for i := 0; i < len(removed); i += 1 {
// may be a fake storage for prepaid recycle host
if removed[i].IsPrepaidRecycleResource() {
continue
}
err = removed[i].syncRemoveCloudStorage(ctx, userCred)
if err != nil {
syncResult.DeleteError(err)
} else {
syncResult.Delete()
}
}
for i := 0; i < len(commondb); i += 1 {
if !xor {
err = commondb[i].syncWithCloudStorage(ctx, userCred, commonext[i], provider)
if err != nil {
syncResult.UpdateError(err)
continue
}
}
localStorages = append(localStorages, commondb[i])
remoteStorages = append(remoteStorages, commonext[i])
syncResult.Update()
}
for i := 0; i < len(added); i += 1 {
storage, err := manager.newFromCloudStorage(ctx, userCred, added[i], provider, zone)
if err != nil {
syncResult.AddError(err)
continue
}
localStorages = append(localStorages, *storage)
remoteStorages = append(remoteStorages, added[i])
syncResult.Add()
}
return localStorages, remoteStorages, syncResult
}
func (self *SStorage) syncRemoveCloudStorage(ctx context.Context, userCred mcclient.TokenCredential) error {
lockman.LockObject(ctx, self)
defer lockman.ReleaseObject(ctx, self)
return self.purge(ctx, userCred)
}
var CapacityUsedCloudStorageProvider = []string{
api.CLOUD_PROVIDER_VMWARE,
}
func (sm *SStorageManager) SyncCapacityUsedForEsxiStorage(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
cpQ := CloudproviderManager.Query("id").Equals("provider", api.CLOUD_PROVIDER_VMWARE)
cloudproviders := make([]SCloudprovider, 0)
err := db.FetchModelObjects(CloudproviderManager, cpQ, &cloudproviders)
if err != nil {
log.Errorf("unable to FetchModelObjects: %v", err)
}
for i := range cloudproviders {
cp := cloudproviders[i]
icp, err := cp.GetProvider(ctx)
if err != nil {
log.Errorf("unable to GetProvider: %v", err)
continue
}
iregion, err := icp.GetOnPremiseIRegion()
if err != nil {
log.Errorf("unable to GetOnPremiseIRegion: %v", err)
continue
}
css, err := iregion.GetIStorages()
if err != nil {
log.Errorf("unable to GetIStorages: %v", err)
continue
}
storageSizeMap := make(map[string]int64, len(css))
for i := range css {
id := css[i].GetGlobalId()
size := css[i].GetCapacityUsedMB()
storageSizeMap[id] = size
}
sQ := sm.Query().Equals("manager_id", cp.GetId())
storages := make([]SStorage, 0, 5)
err = db.FetchModelObjects(sm, sQ, &storages)
if err != nil {
log.Errorf("unable to fetch storages with sql %q: %v", sQ.String(), err)
continue
}
for i := range storages {
s := &storages[i]
newSize, ok := storageSizeMap[s.GetExternalId()]
if !ok {
log.Warningf("can't find usedSize for storage %q", s.GetId())
continue
}
_, err = db.Update(s, func() error {
s.ActualCapacityUsed = newSize
return nil
})
if err != nil {
log.Errorf("unable to udpate storage %q: %v", s.GetId(), err)
}
}
}
}
func (self *SStorage) syncWithCloudStorage(ctx context.Context, userCred mcclient.TokenCredential, ext cloudprovider.ICloudStorage, provider *SCloudprovider) error {
diff, err := db.UpdateWithLock(ctx, self, func() error {
// self.Name = extStorage.GetName()
self.Status = ext.GetStatus()
self.StorageType = ext.GetStorageType()
if provider != nil && !utils.IsInStringArray(provider.Provider, strings.Split(options.Options.SkipSyncStorageConfigInfoProviders, ",")) {
self.MediumType = ext.GetMediumType()
if capacity := ext.GetCapacityMB(); capacity != 0 {
self.Capacity = capacity
}
if capacity := ext.GetCapacityUsedMB(); capacity != 0 {
self.ActualCapacityUsed = capacity
}
}
self.StorageConf = ext.GetStorageConf()
self.Enabled = tristate.NewFromBool(ext.GetEnabled())
self.IsEmulated = ext.IsEmulated()
self.IsSysDiskStore = tristate.NewFromBool(ext.IsSysDiskStore())
return nil
})
if err != nil {
log.Errorf("syncWithCloudZone error %s", err)
return err
}
if provider != nil {
SyncCloudDomain(userCred, self, provider.GetOwnerId())
self.SyncShareState(ctx, userCred, provider.getAccountShareInfo())
if account, _ := provider.GetCloudaccount(); account != nil {
syncMetadata(ctx, userCred, self, ext, account.ReadOnly)
}
}
db.OpsLog.LogSyncUpdate(self, diff, userCred)
return nil
}
func (manager *SStorageManager) newFromCloudStorage(ctx context.Context, userCred mcclient.TokenCredential, extStorage cloudprovider.ICloudStorage, provider *SCloudprovider, zone *SZone) (*SStorage, error) {
storage := SStorage{}
storage.SetModelManager(manager, &storage)
storage.Status = extStorage.GetStatus()
storage.ExternalId = extStorage.GetGlobalId()
storage.ZoneId = zone.Id
storage.StorageType = extStorage.GetStorageType()
storage.MediumType = extStorage.GetMediumType()
storage.StorageConf = extStorage.GetStorageConf()
storage.Capacity = extStorage.GetCapacityMB()
storage.ActualCapacityUsed = extStorage.GetCapacityUsedMB()
storage.Cmtbound = 1.0
storage.Enabled = tristate.NewFromBool(extStorage.GetEnabled())
storage.IsEmulated = extStorage.IsEmulated()
storage.ManagerId = provider.Id
storage.IsSysDiskStore = tristate.NewFromBool(extStorage.IsSysDiskStore())
var err = func() error {
lockman.LockRawObject(ctx, manager.Keyword(), "name")
defer lockman.ReleaseRawObject(ctx, manager.Keyword(), "name")
newName, err := db.GenerateName(ctx, manager, userCred, extStorage.GetName())
if err != nil {
return err
}
storage.Name = newName
return manager.TableSpec().Insert(ctx, &storage)
}()
if err != nil {
return nil, errors.Wrapf(err, "Insert")
}
SyncCloudDomain(userCred, &storage, provider.GetOwnerId())
syncMetadata(ctx, userCred, &storage, extStorage, false)
if provider != nil {
storage.SyncShareState(ctx, userCred, provider.getAccountShareInfo())
}
db.OpsLog.LogEvent(&storage, db.ACT_CREATE, storage.GetShortDesc(ctx), userCred)
return &storage, nil
}
type StorageCapacityStat struct {
TotalSize int64
TotalSizeVirtual float64
}
func filterDisksByScope(ctx context.Context, scope rbacscope.TRbacScope, ownerId mcclient.IIdentityProvider, pendingDeleted bool, includeSystem bool, policyResult rbacutils.SPolicyResult) *sqlchemy.SSubQuery {
q := DiskManager.Query()
switch scope {
case rbacscope.ScopeSystem:
case rbacscope.ScopeDomain:
q = q.Filter(sqlchemy.Equals(q.Field("domain_id"), ownerId.GetProjectDomainId()))
case rbacscope.ScopeProject:
q = q.Filter(sqlchemy.Equals(q.Field("tenant_id"), ownerId.GetProjectId()))
}
if pendingDeleted {
q = q.IsTrue("pending_deleted")
} else {
q = q.IsFalse("pending_deleted")
}
if !includeSystem {
q = q.IsFalse("is_system")
}
return db.ObjectIdQueryWithPolicyResult(ctx, q, DiskManager, policyResult).SubQuery()
}
func (manager *SStorageManager) disksReadyQ(ctx context.Context, scope rbacscope.TRbacScope, ownerId mcclient.IIdentityProvider, pendingDeleted bool, includeSystem bool, policyResult rbacutils.SPolicyResult) *sqlchemy.SSubQuery {
disks := filterDisksByScope(ctx, scope, ownerId, pendingDeleted, includeSystem, policyResult)
q := disks.Query(
disks.Field("storage_id"),
sqlchemy.SUM("used_capacity", disks.Field("disk_size")),
sqlchemy.COUNT("used_count"),
).Equals("status", api.DISK_READY)
q = q.GroupBy(disks.Field("storage_id"))
return q.SubQuery()
}
func (manager *SStorageManager) diskIsAttachedQ(ctx context.Context, isAttached bool, scope rbacscope.TRbacScope, ownerId mcclient.IIdentityProvider, pendingDeleted bool, includeSystem bool, policyResult rbacutils.SPolicyResult) *sqlchemy.SSubQuery {
sumKey := "attached_used_capacity"
countKey := "attached_count"
cond := sqlchemy.In
if !isAttached {
sumKey = "detached_used_capacity"
countKey = "detached_count"
cond = sqlchemy.NotIn
}
sq := GuestdiskManager.Query("disk_id").SubQuery()
disks := filterDisksByScope(ctx, scope, ownerId, pendingDeleted, includeSystem, policyResult)
disks = disks.Query().Filter(cond(disks.Field("id"), sq)).SubQuery()
q := disks.Query(
disks.Field("storage_id"),
sqlchemy.SUM(sumKey, disks.Field("disk_size")),
sqlchemy.COUNT(countKey),
).Equals("status", api.DISK_READY).GroupBy(disks.Field("storage_id"))
return q.SubQuery()
}
func (manager *SStorageManager) diskAttachedQ(ctx context.Context, scope rbacscope.TRbacScope, ownerId mcclient.IIdentityProvider, pendingDeleted bool, includeSystem bool, policyResult rbacutils.SPolicyResult) *sqlchemy.SSubQuery {
return manager.diskIsAttachedQ(ctx, true, scope, ownerId, pendingDeleted, includeSystem, policyResult)
}
func (manager *SStorageManager) diskDetachedQ(ctx context.Context, scope rbacscope.TRbacScope, ownerId mcclient.IIdentityProvider, pendingDeleted bool, includeSystem bool, policyResult rbacutils.SPolicyResult) *sqlchemy.SSubQuery {
return manager.diskIsAttachedQ(ctx, false, scope, ownerId, pendingDeleted, includeSystem, policyResult)
}
func (manager *SStorageManager) disksFailedQ(ctx context.Context, scope rbacscope.TRbacScope, ownerId mcclient.IIdentityProvider, pendingDeleted bool, includeSystem bool, policyResult rbacutils.SPolicyResult) *sqlchemy.SSubQuery {
disks := filterDisksByScope(ctx, scope, ownerId, pendingDeleted, includeSystem, policyResult)
q := disks.Query(
disks.Field("storage_id"),
sqlchemy.SUM("failed_capacity", disks.Field("disk_size")),
sqlchemy.COUNT("failed_count"),
).NotEquals("status", api.DISK_READY)
q = q.GroupBy(disks.Field("storage_id"))
return q.SubQuery()
}
func (manager *SStorageManager) totalCapacityQ(
ctx context.Context,
rangeObjs []db.IStandaloneModel, hostTypes []string,
resourceTypes []string,
providers []string, brands []string, cloudEnv string,
scope rbacscope.TRbacScope, ownerId mcclient.IIdentityProvider,
pendingDeleted bool, includeSystem bool,
storageOwnership bool,
policyResult rbacutils.SPolicyResult,
) *sqlchemy.SQuery {
stmt := manager.disksReadyQ(ctx, scope, ownerId, pendingDeleted, includeSystem, policyResult)
stmt2 := manager.disksFailedQ(ctx, scope, ownerId, pendingDeleted, includeSystem, policyResult)
attachedDisks := manager.diskAttachedQ(ctx, scope, ownerId, pendingDeleted, includeSystem, policyResult)
detachedDisks := manager.diskDetachedQ(ctx, scope, ownerId, pendingDeleted, includeSystem, policyResult)
sq := manager.Query()
if len(hostTypes) > 0 || len(resourceTypes) > 0 || len(rangeObjs) > 0 {
hosts := HostManager.Query().SubQuery()
subq := HoststorageManager.Query("storage_id")
subq = subq.Join(hosts, sqlchemy.Equals(hosts.Field("id"), subq.Field("host_id")))
subq = subq.Filter(sqlchemy.IsTrue(hosts.Field("enabled")))
subq = subq.Filter(sqlchemy.Equals(hosts.Field("host_status"), api.HOST_ONLINE))
subq = AttachUsageQuery(subq, hosts, hostTypes, resourceTypes, nil, nil, "", rangeObjs)
sq = sq.Filter(sqlchemy.In(sq.Field("id"), subq.SubQuery()))
}
if len(rangeObjs) > 0 || len(providers) > 0 || len(brands) > 0 || cloudEnv != "" {
sq = CloudProviderFilter(sq, sq.Field("manager_id"), providers, brands, cloudEnv)
sq = RangeObjectsFilter(sq, rangeObjs, nil, sq.Field("zone_id"), sq.Field("manager_id"), nil, sq.Field("id"))
}
if storageOwnership {
switch scope {
case rbacscope.ScopeSystem:
case rbacscope.ScopeDomain, rbacscope.ScopeProject:
sq = sq.Equals("domain_id", ownerId.GetProjectDomainId())
}
}
sq = db.ObjectIdQueryWithPolicyResult(ctx, sq, manager, policyResult)
storages := sq.SubQuery()
q := storages.Query(
storages.Field("capacity"),
storages.Field("reserved"),
storages.Field("cmtbound"),
storages.Field("actual_capacity_used"),
storages.Field("storage_type"),
storages.Field("medium_type"),
stmt.Field("used_capacity"),
stmt.Field("used_count"),
stmt2.Field("failed_capacity"),
stmt2.Field("failed_count"),
attachedDisks.Field("attached_used_capacity"),
attachedDisks.Field("attached_count"),
detachedDisks.Field("detached_used_capacity"),
detachedDisks.Field("detached_count"),
)
q = q.LeftJoin(stmt, sqlchemy.Equals(stmt.Field("storage_id"), storages.Field("id")))
q = q.LeftJoin(stmt2, sqlchemy.Equals(stmt2.Field("storage_id"), storages.Field("id")))
q = q.LeftJoin(attachedDisks, sqlchemy.Equals(attachedDisks.Field("storage_id"), storages.Field("id")))
q = q.LeftJoin(detachedDisks, sqlchemy.Equals(detachedDisks.Field("storage_id"), storages.Field("id")))
return q
}
type StorageStat struct {
Capacity int
Reserved int
Cmtbound float32
ActualCapacityUsed int64
StorageType string
MediumType string
UsedCapacity int
UsedCount int
FailedCapacity int
FailedCount int
AttachedUsedCapacity int
AttachedCount int
DetachedUsedCapacity int
DetachedCount int
}
type StoragesCapacityStat struct {
Capacity int64
CapacityVirtual float64
CapacityUsed int64
ActualCapacityUsed int64
CountUsed int
CapacityUnready int64
CountUnready int
AttachedCapacity int64
CountAttached int
DetachedCapacity int64
CountDetached int
MediumeCapacity map[string]int64
StorageTypeCapacity map[string]int64
MediumeCapacityUsed map[string]int64
StorageTypeCapacityUsed map[string]int64
AttachedMediumeCapacity map[string]int64
AttachedStorageTypeCapacity map[string]int64
DetachedMediumeCapacity map[string]int64
DetachedStorageTypeCapacity map[string]int64
}
func (manager *SStorageManager) calculateCapacity(q *sqlchemy.SQuery) StoragesCapacityStat {
stats := make([]StorageStat, 0)
err := q.All(&stats)
if err != nil {
log.Errorf("calculateCapacity: %v", err)
}
var (
tCapa int64 = 0
tVCapa float64 = 0
tUsed int64 = 0
aUsed int64 = 0
cUsed int = 0
tFailed int64 = 0
cFailed int = 0
atCapa int64 = 0
atCount int = 0
dtCapa int64 = 0
dtCount int = 0
mCapa = map[string]int64{}
sCapa = map[string]int64{}
mFailed = map[string]int64{}
sFailed = map[string]int64{}
matCapa = map[string]int64{}
satCapa = map[string]int64{}
mdtCapa = map[string]int64{}
sdtCapa = map[string]int64{}
mCapaUsed = map[string]int64{}
sCapaUsed = map[string]int64{}
)
var add = func(m, s map[string]int64, mediumType, storageType string, capa int64) (map[string]int64, map[string]int64) {
_, ok := m[mediumType]
if !ok {
m[mediumType] = 0
}
m[mediumType] += capa
_, ok = s[storageType]
if !ok {
s[storageType] = 0
}
s[storageType] += capa
return m, s
}
for _, stat := range stats {
tCapa += int64(stat.Capacity - stat.Reserved)
if stat.Cmtbound == 0 {
stat.Cmtbound = options.Options.DefaultStorageOvercommitBound
}
mCapa, sCapa = add(mCapa, sCapa, stat.MediumType, stat.StorageType, int64(stat.Capacity-stat.Reserved))
tVCapa += float64(stat.Capacity-stat.Reserved) * float64(stat.Cmtbound)
mCapaUsed, sCapaUsed = add(mCapaUsed, sCapaUsed, stat.MediumType, stat.StorageType, int64(stat.UsedCapacity))
tUsed += int64(stat.UsedCapacity)
aUsed += int64(stat.ActualCapacityUsed)
cUsed += stat.UsedCount
tFailed += int64(stat.FailedCapacity)
mFailed, sFailed = add(mFailed, sFailed, stat.MediumType, stat.StorageType, int64(stat.FailedCapacity))
cFailed += stat.FailedCount
atCapa += int64(stat.AttachedUsedCapacity)
matCapa, satCapa = add(matCapa, satCapa, stat.MediumType, stat.StorageType, int64(stat.AttachedUsedCapacity))
atCount += stat.AttachedCount
dtCapa += int64(stat.DetachedUsedCapacity)
mdtCapa, sdtCapa = add(mdtCapa, sdtCapa, stat.MediumType, stat.StorageType, int64(stat.DetachedUsedCapacity))
dtCount += stat.DetachedCount
}
return StoragesCapacityStat{
Capacity: tCapa,
MediumeCapacity: mCapa,
StorageTypeCapacity: sCapa,
CapacityVirtual: tVCapa,
CapacityUsed: tUsed,
ActualCapacityUsed: aUsed,
MediumeCapacityUsed: mCapaUsed,
StorageTypeCapacityUsed: sCapaUsed,
CountUsed: cUsed,
CapacityUnready: tFailed,
CountUnready: cFailed,
AttachedCapacity: atCapa,
AttachedMediumeCapacity: matCapa,
AttachedStorageTypeCapacity: satCapa,
CountAttached: atCount,
DetachedCapacity: dtCapa,
DetachedMediumeCapacity: mdtCapa,
DetachedStorageTypeCapacity: sdtCapa,
CountDetached: dtCount,
}
}
func (manager *SStorageManager) TotalCapacity(
ctx context.Context,
rangeObjs []db.IStandaloneModel,
hostTypes []string,
resourceTypes []string,
providers []string, brands []string, cloudEnv string,
scope rbacscope.TRbacScope,
ownerId mcclient.IIdentityProvider,
pendingDeleted bool, includeSystem bool,
storageOwnership bool,
policyResult rbacutils.SPolicyResult,
) StoragesCapacityStat {
res1 := manager.calculateCapacity(
manager.totalCapacityQ(
ctx,
rangeObjs,
hostTypes,
resourceTypes,
providers, brands, cloudEnv,
scope, ownerId,
pendingDeleted, includeSystem,
storageOwnership,
policyResult,
),
)
return res1
}
func (self *SStorage) createDisk(ctx context.Context, name string, diskConfig *api.DiskConfig, userCred mcclient.TokenCredential,
ownerId mcclient.IIdentityProvider, autoDelete bool, isSystem bool,
billingType billing_api.TBillingType, billingCycle string,
encryptKeyId string,
) (*SDisk, error) {
disk := SDisk{}
disk.SetModelManager(DiskManager, &disk)
disk.Name = name
if err := disk.fetchDiskInfo(diskConfig); err != nil {
return nil, errors.Wrap(err, "fetchDiskInfo")
}
disk.StorageId = self.Id
disk.AutoDelete = autoDelete
disk.ProjectId = ownerId.GetProjectId()
disk.ProjectSrc = string(apis.OWNER_SOURCE_LOCAL)
disk.DomainId = ownerId.GetProjectDomainId()
disk.IsSystem = isSystem
disk.Iops = diskConfig.Iops
disk.Throughput = diskConfig.Throughput
disk.Preallocation = diskConfig.Preallocation
disk.AutoReset = diskConfig.AutoReset
if self.MediumType == api.DISK_TYPE_SSD {
disk.IsSsd = true
} else {
disk.IsSsd = false
}
disk.BillingType = billingType
disk.BillingCycle = billingCycle
disk.EncryptKeyId = encryptKeyId
err := disk.GetModelManager().TableSpec().Insert(ctx, &disk)
if err != nil {
return nil, err
}
db.OpsLog.LogEvent(&disk, db.ACT_CREATE, disk.GetShortDesc(ctx), userCred)
return &disk, nil
}
func (self *SStorage) GetAllAttachingHosts() []SHost {
hosts := HostManager.Query().SubQuery()
hoststorages := HoststorageManager.Query().SubQuery()
q := hosts.Query()
q = q.Join(hoststorages, sqlchemy.Equals(hoststorages.Field("host_id"), hosts.Field("id")))
q = q.Filter(sqlchemy.Equals(hoststorages.Field("storage_id"), self.Id))
q = q.Filter(sqlchemy.IsTrue(hosts.Field("enabled")))
q = q.Filter(sqlchemy.Equals(hosts.Field("host_status"), api.HOST_ONLINE))
ret := make([]SHost, 0)
err := db.FetchModelObjects(HostManager, q, &ret)
if err != nil {
log.Errorf("%s", err)
return nil
}
return ret
}
func (self *SStorage) SetStoragecache(userCred mcclient.TokenCredential, cache *SStoragecache) error {
if self.StoragecacheId == cache.Id {
return nil
}
diff, err := db.Update(self, func() error {
self.StoragecacheId = cache.Id
return nil
})
if err != nil {
return err
}
db.OpsLog.LogEvent(self, db.ACT_UPDATE, diff, userCred)
return nil
}
func (self *SStorage) GetStoragecache() *SStoragecache {
obj, err := StoragecacheManager.FetchById(self.StoragecacheId)
if err != nil {
log.Errorf("cannot find storage cache??? %s", err)
return nil
}
return obj.(*SStoragecache)
}
func (self *SStorage) PerformCacheImage(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input api.CacheImageInput) (jsonutils.JSONObject, error) {
cache := self.GetStoragecache()
if cache == nil {
return nil, httperrors.NewInternalServerError("storage cache is missing")
}
return cache.PerformCacheImage(ctx, userCred, query, input)
}
func (self *SStorage) PerformUncacheImage(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
cache := self.GetStoragecache()
if cache == nil {
return nil, httperrors.NewInternalServerError("storage cache is missing")
}
return cache.PerformUncacheImage(ctx, userCred, query, data)
}
func (self *SStorage) GetIStorage(ctx context.Context) (cloudprovider.ICloudStorage, error) {
provider, err := self.GetDriver(ctx)
if err != nil {
return nil, errors.Wrap(err, "self.GetDriver")
}
var iRegion cloudprovider.ICloudRegion
if provider.GetFactory().IsOnPremise() {
iRegion, err = provider.GetOnPremiseIRegion()
} else {
region, _ := self.GetRegion()
if region == nil {
msg := "cannot find region for storage???"
log.Errorf("%s", msg)
return nil, fmt.Errorf("%s", msg)
}
iRegion, err = provider.GetIRegionById(region.ExternalId)
}
if err != nil {
return nil, errors.Wrap(err, "provider.GetIRegionById")
}
istore, err := iRegion.GetIStorageById(self.GetExternalId())
if err != nil {
log.Errorf("iRegion.GetIStorageById fail %s", err)
return nil, errors.Wrapf(err, "iRegion.GetIStorageById(%s)", self.GetExternalId())
}
return istore, nil
}
func (manager *SStorageManager) FetchStorageById(storageId string) *SStorage {
obj, err := manager.FetchById(storageId)
if err != nil {
log.Errorf("%s", err)
return nil
}
return obj.(*SStorage)
}
func (manager *SStorageManager) FetchStorageByIds(ids []string) ([]SStorage, error) {
objs := make([]SStorage, 0)
q := manager.Query().In("id", ids)
if err := db.FetchModelObjects(manager, q, &objs); err != nil {
return nil, err
}
return objs, nil
}
func (manager *SStorageManager) InitializeData() error {
storages := make([]SStorage, 0)
q := manager.Query()
q = q.Filter(
sqlchemy.OR(
sqlchemy.IsNullOrEmpty(q.Field("zone_id")),
sqlchemy.AND(
sqlchemy.IsNullOrEmpty(q.Field("storagecache_id")),
sqlchemy.Equals(q.Field("storage_type"), api.STORAGE_RBD),
),
),
)
err := db.FetchModelObjects(manager, q, &storages)
if err != nil {
return err
}
for _, s := range storages {
if len(s.ZoneId) == 0 {
zoneId := ""
hosts, _ := s.GetAttachedHosts()
if hosts != nil && len(hosts) > 0 {
zoneId = hosts[0].ZoneId
} else {
log.Fatalf("Cannot locate zoneId for storage %s", s.Name)
}
db.Update(&s, func() error {
s.ZoneId = zoneId
return nil
})
}
if len(s.StoragecacheId) == 0 && s.StorageType == api.STORAGE_RBD {
storagecache := &SStoragecache{}
storagecache.SetModelManager(StoragecacheManager, storagecache)
storagecache.Name = "rbd-" + s.Id
if pool, err := s.StorageConf.GetString("pool"); err != nil {
log.Fatalf("Get storage %s pool info error", s.Name)
} else {
storagecache.Path = "rbd:" + pool
if err := StoragecacheManager.TableSpec().Insert(context.TODO(), storagecache); err != nil {
log.Fatalf("Cannot Add storagecache for %s", s.Name)
} else {
db.Update(&s, func() error {
s.StoragecacheId = storagecache.Id
return nil
})
}
}
}
}
sq := CloudproviderManager.Query("id").Equals("provider", api.CLOUD_PROVIDER_ALIYUN).SubQuery()
q = manager.Query().NotEquals("medium_type", api.DISK_TYPE_SSD).In("manager_id", sq)
storages = make([]SStorage, 0)
err = db.FetchModelObjects(manager, q, &storages)
if err != nil {
return err
}
for i := range storages {
db.Update(&storages[i], func() error {
storages[i].MediumType = api.DISK_TYPE_SSD
return nil
})
}
return nil
}
func (manager *SStorageManager) IsStorageTypeExist(storageType string) (string, bool) {
storages := []SStorage{}
q := manager.Query().Equals("storage_type", storageType)
if err := db.FetchModelObjects(manager, q, &storages); err != nil {
return "", false
}
if len(storages) == 0 {
return "", false
}
return storages[0].StorageType, true
}
// 块存储列表
func (manager *SStorageManager) ListItemFilter(
ctx context.Context,
q *sqlchemy.SQuery,
userCred mcclient.TokenCredential,
query api.StorageListInput,
) (*sqlchemy.SQuery, error) {
var err error
q, err = manager.SManagedResourceBaseManager.ListItemFilter(ctx, q, userCred, query.ManagedResourceListInput)
if err != nil {
return nil, errors.Wrap(err, "SManagedResourceBaseManager.ListItemFilter")
}
q, err = manager.SExternalizedResourceBaseManager.ListItemFilter(ctx, q, userCred, query.ExternalizedResourceBaseListInput)
if err != nil {
return nil, errors.Wrap(err, "SExternalizedResourceBaseManager.ListItemFilter")
}
q, err = manager.SZoneResourceBaseManager.ListItemFilter(ctx, q, userCred, query.ZonalFilterListInput)
if err != nil {
return nil, errors.Wrap(err, "SZoneResourceBaseManager.ListItemFilter")
}
q, err = manager.SEnabledStatusInfrasResourceBaseManager.ListItemFilter(ctx, q, userCred, query.EnabledStatusInfrasResourceBaseListInput)
if err != nil {
return nil, errors.Wrap(err, "SEnabledStatusInfrasResourceBaseManager.ListItemFilter")
}
if query.Share != nil && *query.Share {
q = q.Filter(sqlchemy.NotIn(q.Field("storage_type"), api.STORAGE_LOCAL_TYPES))
}
if query.Local != nil && *query.Local {
q = q.Filter(sqlchemy.In(q.Field("storage_type"), api.STORAGE_LOCAL_TYPES))
}
if len(query.StorageType) > 0 {
q = q.Equals("storage_type", query.StorageType)
}
if len(query.SchedtagId) > 0 {
schedTag, err := SchedtagManager.FetchByIdOrName(ctx, nil, query.SchedtagId)
if err != nil {
if errors.Cause(err) == sql.ErrNoRows {
return nil, httperrors.NewResourceNotFoundError2(SchedtagManager.Keyword(), query.SchedtagId)
}
return nil, httperrors.NewGeneralError(err)
}
sq := StorageschedtagManager.Query("storage_id").Equals("schedtag_id", schedTag.GetId()).SubQuery()
q = q.In("id", sq)
}
if query.Usable != nil && *query.Usable {
hostStorageTable := HoststorageManager.Query().SubQuery()
hostTable := HostManager.Query().SubQuery()
sq1 := hostStorageTable.Query(hostStorageTable.Field("storage_id")).
Join(hostTable, sqlchemy.Equals(hostTable.Field("id"), hostStorageTable.Field("host_id"))).
Filter(sqlchemy.Equals(hostTable.Field("host_status"), api.HOST_ONLINE)).
Filter(sqlchemy.IsTrue(hostTable.Field("enabled"))).
Filter(sqlchemy.IsNullOrEmpty(hostTable.Field("manager_id")))
providerTable := usableCloudProviders().SubQuery()
sq2 := hostStorageTable.Query(hostStorageTable.Field("storage_id")).
Join(hostTable, sqlchemy.Equals(hostTable.Field("id"), hostStorageTable.Field("host_id"))).
Join(providerTable, sqlchemy.Equals(hostTable.Field("manager_id"), providerTable.Field("id")))
q = q.Filter(
sqlchemy.OR(
sqlchemy.In(q.Field("id"), sq1),
sqlchemy.In(q.Field("id"), sq2),
)).
Filter(sqlchemy.In(q.Field("status"), []string{api.STORAGE_ENABLED, api.STORAGE_ONLINE})).
Filter(sqlchemy.IsTrue(q.Field("enabled")))
}
if len(query.HostSchedtagId) > 0 {
schedTagObj, err := SchedtagManager.FetchByIdOrName(ctx, userCred, query.HostSchedtagId)
if err != nil {
if errors.Cause(err) == sql.ErrNoRows {
return nil, errors.Wrapf(httperrors.ErrResourceNotFound, "%s %s", SchedtagManager.Keyword(), query.HostSchedtagId)
} else {
return nil, errors.Wrap(err, "SchedtagManager.FetchByIdOrName")
}
}
subq := HoststorageManager.Query("storage_id")
hostschedtags := HostschedtagManager.Query().Equals("schedtag_id", schedTagObj.GetId()).SubQuery()
subq = subq.Join(hostschedtags, sqlchemy.Equals(hostschedtags.Field("host_id"), subq.Field("host_id")))
q = q.In("id", subq.SubQuery())
}
if len(query.ImageId) > 0 {
image, err := CachedimageManager.getImageInfo(ctx, userCred, query.ImageId, false)
if err != nil {
return nil, errors.Wrap(err, "CachedimageManager.getImageInfo")
}
subq := StorageManager.Query("id")
storagecaches := StoragecachedimageManager.Query("storagecache_id").Equals("cachedimage_id", image.Id).SubQuery()
subq = subq.Join(storagecaches, sqlchemy.Equals(subq.Field("storagecache_id"), storagecaches.Field("storagecache_id")))
q = q.In("id", subq.SubQuery())
}
if len(query.ServerId) > 0 {
guest, err := GuestManager.FetchByIdOrName(ctx, userCred, query.ServerId)
if err != nil {
if errors.Cause(err) == sql.ErrNoRows {
return nil, errors.Wrapf(httperrors.ErrResourceNotFound, "%s %s", GuestManager.Keyword(), query.ServerId)
} else {
return nil, errors.Wrapf(err, "GuestManager.FetchByIdOrName %s", query.ServerId)
}
}
query.HostId = guest.(*SGuest).HostId
}
if len(query.HostId) > 0 {
host, err := HostManager.FetchByIdOrName(ctx, userCred, query.HostId)
if err != nil {
if errors.Cause(err) == sql.ErrNoRows {
return nil, errors.Wrapf(httperrors.ErrResourceNotFound, "%s %s", HostManager.Keyword(), query.HostId)
} else {
return nil, errors.Wrapf(err, "HostManager.FetchByIdOrName %s", query.HostId)
}
}
sq := HoststorageManager.Query("storage_id").Equals("host_id", host.GetId())
q = q.In("id", sq.SubQuery())
}
if query.IsBaremetal != nil {
qf := q.NotEquals
if *query.IsBaremetal {
qf = q.Equals
}
q = qf("storage_type", api.STORAGE_BAREMETAL)
}
return q, err
}
func (manager *SStorageManager) OrderByExtraFields(
ctx context.Context,
q *sqlchemy.SQuery,
userCred mcclient.TokenCredential,
query api.StorageListInput,
) (*sqlchemy.SQuery, error) {
q, err := manager.SEnabledStatusInfrasResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.EnabledStatusInfrasResourceBaseListInput)
if err != nil {
return nil, errors.Wrap(err, "SEnabledStatusInfrasResourceBaseManager.OrderByExtraFields")
}
q, err = manager.SZoneResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.ZonalFilterListInput)
if err != nil {
return nil, errors.Wrap(err, "SZoneResourceBaseManager.OrderByExtraFields")
}
q, err = manager.SManagedResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.ManagedResourceListInput)
if err != nil {
return nil, errors.Wrap(err, "SManagedResourceBaseManager.OrderByExtraFields")
}
return q, nil
}
func (manager *SStorageManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) {
var err error
q, err = manager.SEnabledStatusInfrasResourceBaseManager.QueryDistinctExtraField(q, field)
if err == nil {
return q, nil
}
q, err = manager.SZoneResourceBaseManager.QueryDistinctExtraField(q, field)
if err == nil {
return q, nil
}
q, err = manager.SManagedResourceBaseManager.QueryDistinctExtraField(q, field)
if err == nil {
return q, nil
}
return q, httperrors.ErrNotFound
}
func (manager *SStorageManager) QueryDistinctExtraFields(q *sqlchemy.SQuery, resource string, fields []string) (*sqlchemy.SQuery, error) {
var err error
q, err = manager.SManagedResourceBaseManager.QueryDistinctExtraFields(q, resource, fields)
if err == nil {
return q, nil
}
return q, httperrors.ErrNotFound
}
func (self *SStorage) ClearSchedDescCache() error {
hosts := self.GetAllAttachingHosts()
if hosts == nil {
msg := "get attaching host error"
log.Errorf("%s", msg)
return fmt.Errorf("%s", msg)
}
for i := 0; i < len(hosts); i += 1 {
err := hosts[i].ClearSchedDescCache()
if err != nil {
log.Errorf("host CleanHostSchedCache error: %v", err)
return err
}
}
return nil
}
func (self *SStorage) getCloudProviderInfo() SCloudProviderInfo {
var region *SCloudregion
zone, _ := self.getZone()
if zone != nil {
region, _ = zone.GetRegion()
}
provider := self.GetCloudprovider()
return MakeCloudProviderInfo(region, zone, provider)
}
func (self *SStorage) GetShortDesc(ctx context.Context) *jsonutils.JSONDict {
desc := self.SStandaloneResourceBase.GetShortDesc(ctx)
info := self.getCloudProviderInfo()
desc.Update(jsonutils.Marshal(&info))
return desc
}
func (self *SStorage) IsPrepaidRecycleResource() bool {
if !self.IsLocal() {
return false
}
hosts, _ := self.GetAttachedHosts()
if len(hosts) != 1 {
return false
}
return hosts[0].IsPrepaidRecycleResource()
}
func (self *SStorage) GetSchedtags() []SSchedtag {
return GetSchedtags(StorageschedtagManager, self.Id)
}
func (self *SStorage) GetDynamicConditionInput() *jsonutils.JSONDict {
return jsonutils.Marshal(self).(*jsonutils.JSONDict)
}
func (self *SStorage) PerformSetSchedtag(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) (jsonutils.JSONObject, error) {
return PerformSetResourceSchedtag(self, ctx, userCred, query, data)
}
func (self *SStorage) PerformSetCommitBound(
ctx context.Context,
userCred mcclient.TokenCredential,
query jsonutils.JSONObject,
input api.StorageSetCmtBoundInput,
) (jsonutils.JSONObject, error) {
_, err := db.Update(self, func() error {
if input.Cmtbound != nil {
self.Cmtbound = *input.Cmtbound
}
return nil
})
if err != nil {
return nil, err
}
db.OpsLog.LogEvent(self, db.ACT_SET_COMMIT_BOUND, input, userCred)
logclient.AddActionLogWithContext(ctx, self, logclient.ACT_SET_COMMIT_BOUND, input, userCred, true)
return nil, nil
}
func (self *SStorage) GetSchedtagJointManager() ISchedtagJointManager {
return StorageschedtagManager
}
func (self *SStorage) StartDeleteRbdDisks(ctx context.Context, userCred mcclient.TokenCredential, disksId []string) error {
data := jsonutils.NewDict()
data.Add(jsonutils.NewStringArray(disksId), "disks_id")
task, err := taskman.TaskManager.NewTask(ctx, "StorageDeleteRbdDiskTask", self, userCred, data, "", "", nil)
if err != nil {
return err
}
task.ScheduleRun(nil)
return nil
}
func (storage *SStorage) PerformChangeOwner(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input apis.PerformChangeDomainOwnerInput) (jsonutils.JSONObject, error) {
// not allow to perform public for locally connected storage
if storage.IsLocal() {
hosts, _ := storage.GetAttachedHosts()
if len(hosts) > 0 {
return nil, errors.Wrap(httperrors.ErrForbidden, "not allow to change owner for local storage")
}
}
return storage.performChangeOwnerInternal(ctx, userCred, query, input)
}
func (storage *SStorage) performChangeOwnerInternal(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input apis.PerformChangeDomainOwnerInput) (jsonutils.JSONObject, error) {
return storage.SEnabledStatusInfrasResourceBase.PerformChangeOwner(ctx, userCred, query, input)
}
func (storage *SStorage) GetChangeOwnerRequiredDomainIds() []string {
requires := stringutils2.SSortedStrings{}
disks := storage.GetDisks()
for i := range disks {
requires = stringutils2.Append(requires, disks[i].DomainId)
}
return requires
}
func (storage *SStorage) PerformPublic(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input apis.PerformPublicDomainInput) (jsonutils.JSONObject, error) {
// not allow to perform public for locally connected storage
if storage.IsLocal() {
hosts, _ := storage.GetAttachedHosts()
if len(hosts) > 0 {
return nil, errors.Wrap(httperrors.ErrForbidden, "not allow to perform public for local storage")
}
}
return storage.performPublicInternal(ctx, userCred, query, input)
}
func (storage *SStorage) performPublicInternal(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input apis.PerformPublicDomainInput) (jsonutils.JSONObject, error) {
return storage.SEnabledStatusInfrasResourceBase.PerformPublic(ctx, userCred, query, input)
}
func (storage *SStorage) PerformPrivate(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input apis.PerformPrivateInput) (jsonutils.JSONObject, error) {
// not allow to perform private for locally conencted storage
if storage.IsLocal() {
hosts, _ := storage.GetAttachedHosts()
if len(hosts) > 0 {
return nil, errors.Wrap(httperrors.ErrForbidden, "not allow to perform private for local storage")
}
}
return storage.performPrivateInternal(ctx, userCred, query, input)
}
func (storage *SStorage) performPrivateInternal(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input apis.PerformPrivateInput) (jsonutils.JSONObject, error) {
return storage.SEnabledStatusInfrasResourceBase.PerformPrivate(ctx, userCred, query, input)
}
func (manager *SStorageManager) ListItemExportKeys(ctx context.Context,
q *sqlchemy.SQuery,
userCred mcclient.TokenCredential,
keys stringutils2.SSortedStrings,
) (*sqlchemy.SQuery, error) {
q, err := manager.SEnabledStatusInfrasResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
if err != nil {
return nil, errors.Wrap(err, "SEnabledStatusInfrasResourceBaseManager.ListItemExportKeys")
}
if keys.ContainsAny(manager.SManagedResourceBaseManager.GetExportKeys()...) {
q, err = manager.SManagedResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
if err != nil {
return nil, errors.Wrap(err, "SManagedResourceBaseManager.ListItemExportKeys")
}
}
if keys.ContainsAny(manager.SZoneResourceBaseManager.GetExportKeys()...) {
q, err = manager.SZoneResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
if err != nil {
return nil, errors.Wrap(err, "SZoneResourceBaseManager.ListItemExportKeys")
}
}
if keys.Contains("schedtag") {
schedtagsQ := SchedtagManager.Query("id", "name").SubQuery()
storageSchedtagQ := StorageschedtagManager.Query("storage_id", "schedtag_id").SubQuery()
subQ := storageSchedtagQ.Query(storageSchedtagQ.Field("storage_id"), sqlchemy.GROUP_CONCAT("schedtag", schedtagsQ.Field("name")))
subQ = subQ.Join(schedtagsQ, sqlchemy.Equals(schedtagsQ.Field("id"), storageSchedtagQ.Field("schedtag_id")))
subQ = subQ.GroupBy(storageSchedtagQ.Field("storage_id"))
subQT := subQ.SubQuery()
q = q.LeftJoin(subQT, sqlchemy.Equals(q.Field("id"), subQT.Field("storage_id")))
q = q.AppendField(subQT.Field("schedtag"))
}
return q, nil
}
func (storage *SStorage) PerformForceDetachHost(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input apis.StorageForceDetachHostInput) (jsonutils.JSONObject, error) {
if storage.Enabled.Bool() {
return nil, httperrors.NewBadRequestError("storage is enabled")
}
iHost, err := HostManager.FetchByIdOrName(ctx, userCred, input.HostId)
if err == sql.ErrNoRows {
return nil, httperrors.NewNotFoundError("host %s not found", input.HostId)
} else if err != nil {
return nil, err
}
host := iHost.(*SHost)
if host.Status == api.HOST_ONLINE {
return nil, httperrors.NewBadRequestError("can't detach host in status online")
}
iHostStorage, err := db.FetchJointByIds(HoststorageManager, host.GetId(), storage.Id, nil)
if err == sql.ErrNoRows {
return nil, httperrors.NewNotFoundError("host %s storage %s not found", input.HostId, storage.Name)
} else if err != nil {
return nil, err
}
hostStorage := iHostStorage.(*SHoststorage)
hostStorage.SetModelManager(HoststorageManager, hostStorage)
err = hostStorage.Delete(ctx, userCred)
if err == nil {
db.OpsLog.LogDetachEvent(ctx, db.JointMaster(hostStorage), db.JointSlave(hostStorage), userCred, jsonutils.NewString("force detach"))
}
return nil, err
}
func (storage *SStorage) GetDetailsHardwareInfo(ctx context.Context, userCred mcclient.TokenCredential, _ jsonutils.JSONObject) (*api.StorageHardwareInfo, error) {
info := new(api.StorageHardwareInfo)
model := storage.GetMetadata(ctx, api.STORAGE_METADATA_MODEL, userCred)
if model != "" {
info.Model = &model
}
vendor := storage.GetMetadata(ctx, api.STORAGE_METADATA_VENDOR, userCred)
if vendor != "" {
info.Vendor = &vendor
}
bw := storage.GetMetadata(ctx, api.STORAGE_METADATA_BANDWIDTH, userCred)
if bw != "" {
bwNum, err := strconv.ParseFloat(bw, 64)
if err != nil {
return nil, errors.Wrapf(err, "parse bandwidth string: %s", bw)
}
info.Bandwidth = bwNum
}
return info, nil
}
func (storage *SStorage) PerformSetHardwareInfo(ctx context.Context, userCred mcclient.TokenCredential, _ jsonutils.JSONObject, data *api.StorageHardwareInfo) (*api.StorageHardwareInfo, error) {
return data, storage.setHardwareInfo(ctx, userCred, data)
}
func StoragesCleanRecycleDiskfiles(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
// get shared storages
q := StorageManager.Query().IsNullOrEmpty("manager_id")
q = q.In("storage_type", api.SHARED_STORAGE)
storages := make([]SStorage, 0)
err := q.All(&storages)
if err != nil {
log.Errorf("StoragesCleanRecycleDiskfiles failed get storages %s", err)
return
}
for i := range storages {
storages[i].SetModelManager(StorageManager, &storages[i])
log.Infof("storage %s start clean recycle diskfiles", storages[i].GetName())
host, err := storages[i].GetMasterHost()
if err != nil {
log.Errorf("StoragesCleanRecycleDiskfiles storage %s failed get master host: %s", storages[i].GetName(), err)
continue
}
url := fmt.Sprintf("/storages/%s/clean-recycle-diskfiles", storages[i].Id)
body := jsonutils.NewDict()
_, err = host.Request(ctx, userCred, "POST", url, mcclient.GetTokenHeaders(userCred), body)
if err != nil {
log.Errorf("StoragesCleanRecycleDiskfiles storage %s request failed %s", storages[i].GetName(), err)
continue
}
}
}