mirror of
https://github.com/yunionio/cloudpods.git
synced 2026-07-01 01:36:11 +08:00
fix(host,host-image): use nbd export images (#22073)
This commit is contained in:
7
Makefile
7
Makefile
@@ -47,11 +47,6 @@ PKGS := go list ./...
|
||||
CGO_CFLAGS_ENV = $(shell go env CGO_CFLAGS)
|
||||
CGO_LDFLAGS_ENV = $(shell go env CGO_LDFLAGS)
|
||||
|
||||
ifdef LIBQEMUIO_PATH
|
||||
X_CGO_CFLAGS := ${CGO_CFLAGS_ENV} -I${LIBQEMUIO_PATH}/src -I${LIBQEMUIO_PATH}/src/include
|
||||
X_CGO_LDFLAGS := ${CGO_LDFLAGS_ENV} -laio -lqemuio -lpthread -lgnutls -lnettle -L ${LIBQEMUIO_PATH}/src
|
||||
endif
|
||||
|
||||
export GOOS ?= linux
|
||||
export GO111MODULE:=on
|
||||
export CGO_CFLAGS = ${X_CGO_CFLAGS}
|
||||
@@ -63,7 +58,7 @@ ifeq ($(UNAME), Linux)
|
||||
XARGS_FLAGS = --no-run-if-empty
|
||||
endif
|
||||
|
||||
cmdTargets:=$(filter-out cmd/host-image,$(wildcard cmd/*))
|
||||
cmdTargets:=$(wildcard cmd/*)
|
||||
rpmTargets:=$(foreach b,$(patsubst cmd/%,%,$(cmdTargets)),$(if $(shell [ -f "$(CURDIR)/build/$(b)/vars" ] && echo 1),rpm/$(b)))
|
||||
debTargets:=$(foreach b,$(patsubst cmd/%,%,$(cmdTargets)),$(if $(shell [ -f "$(CURDIR)/build/$(b)/vars" ] && echo 1),deb/$(b)))
|
||||
|
||||
|
||||
@@ -1,9 +1,6 @@
|
||||
FROM registry.cn-beijing.aliyuncs.com/yunionio/onecloud-base:v0.3.5-1
|
||||
|
||||
RUN mkdir -p /opt/yunion/bin
|
||||
|
||||
ENV TZ UTC
|
||||
|
||||
ADD ./_output/bin/host-image /opt/yunion/bin/host-image
|
||||
ADD ./_output/bin/.host-image.bin /opt/yunion/bin/.host-image.bin
|
||||
ADD ./_output/bin/bundles/host-image /opt/yunion/bin/bundles/host-image
|
||||
RUN mkdir -p /opt/yunion/bin
|
||||
ADD ./_output/alpine-build/bin/host-image /opt/yunion/bin/host-image
|
||||
|
||||
@@ -20,43 +20,36 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pierrec/lz4/v4"
|
||||
|
||||
execlient "yunion.io/x/executor/client"
|
||||
"yunion.io/x/jsonutils"
|
||||
"yunion.io/x/log"
|
||||
"yunion.io/x/pkg/appctx"
|
||||
|
||||
"yunion.io/x/onecloud/pkg/apis"
|
||||
"yunion.io/x/onecloud/pkg/appsrv"
|
||||
app_common "yunion.io/x/onecloud/pkg/cloudcommon/app"
|
||||
"yunion.io/x/onecloud/pkg/cloudcommon/consts"
|
||||
common_options "yunion.io/x/onecloud/pkg/cloudcommon/options"
|
||||
"yunion.io/x/onecloud/pkg/httperrors"
|
||||
"yunion.io/x/onecloud/pkg/mcclient/auth"
|
||||
"yunion.io/x/onecloud/pkg/util/procutils"
|
||||
"yunion.io/x/onecloud/pkg/util/qemuimg"
|
||||
"yunion.io/x/onecloud/pkg/util/seclib2"
|
||||
)
|
||||
|
||||
type SHostImageOptions struct {
|
||||
common_options.CommonOptions
|
||||
LocalImagePath []string `help:"Local Image Paths"`
|
||||
LVMVolumeGroups []string `help:"LVM Volume Groups(vgs)"`
|
||||
SnapshotDirSuffix string `help:"Snapshot dir name equal diskId concat snapshot dir suffix" default:"_snap"`
|
||||
CommonConfigFile string `help:"common config file for container"`
|
||||
StreamChunkSize int `help:"Download stream chunk size KB" default:"4096"`
|
||||
Lz4ChecksumOff bool `help:"Turn off lz4 checksum option"`
|
||||
common_options.HostCommonOptions
|
||||
LocalImagePath []string `help:"Local Image Paths"`
|
||||
LVMVolumeGroups []string `help:"LVM Volume Groups(vgs)"`
|
||||
SnapshotDirSuffix string `help:"Snapshot dir name equal diskId concat snapshot dir suffix" default:"_snap"`
|
||||
HostImageNbdPidDir string `help:"Host-image nbd pid files dir " default:"/var/run/onecloud/host-image"`
|
||||
CommonConfigFile string `help:"common config file for container"`
|
||||
}
|
||||
|
||||
var (
|
||||
HostImageOptions SHostImageOptions
|
||||
streamingWorkerMan *appsrv.SWorkerManager
|
||||
HostImageOptions SHostImageOptions
|
||||
nbdExportManager *SNbdExportManager
|
||||
)
|
||||
|
||||
func init() {
|
||||
streamingWorkerMan = appsrv.NewWorkerManager("streaming_worker", 20, 1024, false)
|
||||
}
|
||||
|
||||
func StartService() {
|
||||
consts.SetServiceType("host-image")
|
||||
common_options.ParseOptions(&HostImageOptions, os.Args, "host.conf", "host-image")
|
||||
@@ -68,6 +61,19 @@ func StartService() {
|
||||
HostImageOptions.CommonOptions = *commonCfg
|
||||
HostImageOptions.BaseOptions.BaseOptions = baseOpt
|
||||
}
|
||||
log.Infof("exec socket path: %s", HostImageOptions.ExecutorSocketPath)
|
||||
if HostImageOptions.EnableRemoteExecutor {
|
||||
execlient.Init(HostImageOptions.ExecutorSocketPath)
|
||||
execlient.SetTimeoutSeconds(HostImageOptions.ExecutorConnectTimeoutSeconds)
|
||||
procutils.SetRemoteExecutor()
|
||||
}
|
||||
|
||||
nbdExportManager = NewNbdExportManager()
|
||||
output, err := procutils.NewCommand("mkdir", "-p", HostImageOptions.HostImageNbdPidDir).Output()
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create path %s: %s %s", HostImageOptions.HostImageNbdPidDir, output, err)
|
||||
}
|
||||
|
||||
HostImageOptions.EnableSsl = false
|
||||
HostImageOptions.Port += 40000
|
||||
app_common.InitAuth(&HostImageOptions.CommonOptions, func() {
|
||||
@@ -79,27 +85,23 @@ func StartService() {
|
||||
}
|
||||
|
||||
func initHandlers(app *appsrv.Application, prefix string) {
|
||||
app.AddHandler("GET", fmt.Sprintf("%s/disks/<sid>", prefix), getImage).
|
||||
SetProcessNoTimeout().SetWorkerManager(streamingWorkerMan)
|
||||
app.AddHandler("GET", fmt.Sprintf("%s/snapshots/<diskId>/<sid>", prefix), getImage).
|
||||
SetProcessNoTimeout().SetWorkerManager(streamingWorkerMan)
|
||||
app.AddHandler("POST", fmt.Sprintf("%s/disks/<sid>/nbd-export", prefix), auth.Authenticate(imageNbdExport))
|
||||
app.AddHandler("POST", fmt.Sprintf("%s/snapshots/<diskId>/<sid>/nbd-export", prefix), auth.Authenticate(imageNbdExport))
|
||||
|
||||
app.AddHandler("HEAD", fmt.Sprintf("%s/disks/<sid>", prefix), getImageMeta)
|
||||
app.AddHandler("HEAD", fmt.Sprintf("%s/snapshots/<diskId>/<sid>", prefix), getImageMeta)
|
||||
app.AddHandler("POST", fmt.Sprintf("%s/disks/<sid>", prefix), closeImage)
|
||||
app.AddHandler("POST", fmt.Sprintf("%s/snapshots/<diskId>/<sid>", prefix), closeImage)
|
||||
app.AddHandler("POST", fmt.Sprintf("%s/disks/<sid>/nbd-close", prefix), auth.Authenticate(imageNbdClose))
|
||||
app.AddHandler("POST", fmt.Sprintf("%s/snapshots/<diskId>/<sid>/nbd-close", prefix), auth.Authenticate(imageNbdClose))
|
||||
}
|
||||
|
||||
func getDiskPath(diskId string) string {
|
||||
for _, imagePath := range HostImageOptions.LocalImagePath {
|
||||
diskPath := path.Join(imagePath, diskId)
|
||||
if _, err := os.Stat(diskPath); !os.IsNotExist(err) {
|
||||
if _, err := procutils.RemoteStat(diskPath); err == nil {
|
||||
return diskPath
|
||||
}
|
||||
}
|
||||
for _, vg := range HostImageOptions.LVMVolumeGroups {
|
||||
diskPath := path.Join("/dev", vg, diskId)
|
||||
if _, err := os.Stat(diskPath); !os.IsNotExist(err) {
|
||||
if _, err := procutils.RemoteStat(diskPath); err == nil {
|
||||
return diskPath
|
||||
}
|
||||
}
|
||||
@@ -110,244 +112,79 @@ func getSnapshotPath(diskId, snapshotId string) string {
|
||||
for _, imagePath := range HostImageOptions.LocalImagePath {
|
||||
diskPath := path.Join(imagePath, "snapshots",
|
||||
diskId+HostImageOptions.SnapshotDirSuffix, snapshotId)
|
||||
if _, err := os.Stat(diskPath); !os.IsNotExist(err) {
|
||||
if _, err := procutils.RemoteStat(diskPath); err == nil {
|
||||
return diskPath
|
||||
}
|
||||
}
|
||||
for _, vg := range HostImageOptions.LVMVolumeGroups {
|
||||
diskPath := path.Join("/dev", vg, "snap_"+snapshotId)
|
||||
if _, err := os.Stat(diskPath); !os.IsNotExist(err) {
|
||||
if _, err := procutils.RemoteStat(diskPath); err == nil {
|
||||
return diskPath
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func inputCheck(ctx context.Context) (string, error) {
|
||||
var params = appctx.AppContextParams(ctx)
|
||||
func inputCheck(ctx context.Context, w http.ResponseWriter, r *http.Request) (string, string, error) {
|
||||
params, _, body := appsrv.FetchEnv(ctx, w, r)
|
||||
var sid = params["<sid>"]
|
||||
var imagePath string
|
||||
var remoteDiskId string
|
||||
|
||||
remoteDiskId, _ = body.GetString("disk_id")
|
||||
if remoteDiskId == "" {
|
||||
return "", "", httperrors.NewMissingParameterError("disk_id")
|
||||
}
|
||||
|
||||
if diskId, ok := params["<diskId>"]; ok {
|
||||
imagePath = getSnapshotPath(diskId, sid)
|
||||
} else {
|
||||
imagePath = getDiskPath(sid)
|
||||
}
|
||||
if len(imagePath) == 0 {
|
||||
return "", httperrors.NewNotFoundError("Disk not found")
|
||||
return "", "", httperrors.NewNotFoundError("Disk not found")
|
||||
}
|
||||
return imagePath, nil
|
||||
return imagePath, remoteDiskId, nil
|
||||
}
|
||||
|
||||
func parseRange(reqRange string) (int64, int64, error) {
|
||||
if !strings.HasPrefix(reqRange, "bytes=") {
|
||||
return 0, 0, httperrors.NewInputParameterError("Invalid range header")
|
||||
}
|
||||
reqRange = reqRange[len("bytes="):]
|
||||
ranges := strings.Split(reqRange, "-")
|
||||
if len(ranges) != 2 {
|
||||
return 0, 0, httperrors.NewInputParameterError("Invalid range header")
|
||||
}
|
||||
startPos, err := strconv.ParseInt(ranges[0], 10, 0)
|
||||
if err != nil {
|
||||
return 0, 0, httperrors.NewInputParameterError("Invalid range header")
|
||||
}
|
||||
endPos, err := strconv.ParseInt(ranges[1], 10, 0)
|
||||
if err != nil {
|
||||
return 0, 0, httperrors.NewInputParameterError("Invalid range header")
|
||||
}
|
||||
return startPos, endPos, nil
|
||||
}
|
||||
|
||||
func closeImage(ctx context.Context, w http.ResponseWriter, r *http.Request) {
|
||||
imagePath, err := inputCheck(ctx)
|
||||
func imageNbdExport(ctx context.Context, w http.ResponseWriter, r *http.Request) {
|
||||
imagePath, targetDiskId, err := inputCheck(ctx, w, r)
|
||||
if err != nil {
|
||||
httperrors.GeneralServerError(ctx, w, err)
|
||||
return
|
||||
}
|
||||
|
||||
var f IImage
|
||||
if r.Header.Get("X-Read-File") == "true" {
|
||||
f = &SFile{}
|
||||
} else {
|
||||
f = &SQcow2Image{}
|
||||
imageInfo := qemuimg.SImageInfo{
|
||||
Path: imagePath,
|
||||
}
|
||||
err = f.Load(imagePath, true, false)
|
||||
encryptKey := r.Header.Get("X-Encrypt-Key")
|
||||
if len(encryptKey) > 0 {
|
||||
imageInfo.Password = encryptKey
|
||||
imageInfo.EncryptAlg = seclib2.TSymEncAlg(r.Header.Get("X-Encrypt-Alg"))
|
||||
}
|
||||
|
||||
nbdPort, err := nbdExportManager.QemuNbdStartExport(imageInfo, targetDiskId)
|
||||
if err != nil {
|
||||
httperrors.GeneralServerError(ctx, w, err)
|
||||
return
|
||||
}
|
||||
f.Close()
|
||||
w.WriteHeader(http.StatusOK)
|
||||
log.Infof("Image %s request nbd export with port %d", targetDiskId, nbdPort)
|
||||
|
||||
ret := jsonutils.NewDict()
|
||||
ret.Set("nbd_port", jsonutils.NewInt(int64(nbdPort)))
|
||||
appsrv.SendJSON(w, ret)
|
||||
}
|
||||
|
||||
func getImage(ctx context.Context, w http.ResponseWriter, r *http.Request) {
|
||||
imagePath, err := inputCheck(ctx)
|
||||
func imageNbdClose(ctx context.Context, w http.ResponseWriter, r *http.Request) {
|
||||
_, targetDiskId, err := inputCheck(ctx, w, r)
|
||||
if err != nil {
|
||||
httperrors.GeneralServerError(ctx, w, err)
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
f IImage
|
||||
startPos, endPos int64
|
||||
rateLimit int64 = -1
|
||||
encryptInfo *apis.SEncryptInfo
|
||||
)
|
||||
|
||||
if r.Header.Get("X-Read-File") == "true" {
|
||||
f = &SFile{}
|
||||
} else {
|
||||
f = &SQcow2Image{}
|
||||
}
|
||||
|
||||
err = f.Load(imagePath, true, true)
|
||||
if err != nil {
|
||||
encryptKey := r.Header.Get("X-Encrypt-Key")
|
||||
if len(encryptKey) > 0 {
|
||||
encryptInfo = new(apis.SEncryptInfo)
|
||||
encryptInfo.Key = encryptKey
|
||||
encryptInfo.Alg = seclib2.TSymEncAlg(r.Header.Get("X-Encrypt-Alg"))
|
||||
}
|
||||
|
||||
if err = f.Open(imagePath, true, encryptInfo); err != nil {
|
||||
log.Errorf("Open image error: %s", err)
|
||||
httperrors.GeneralServerError(ctx, w, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
defer f.Close()
|
||||
|
||||
endPos = f.Length() - 1
|
||||
reqRange := r.Header.Get("Range")
|
||||
if len(reqRange) > 0 {
|
||||
startPos, endPos, err = parseRange(reqRange)
|
||||
if err != nil {
|
||||
log.Errorf("Parse range error: %s", err)
|
||||
httperrors.GeneralServerError(ctx, w, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
strRateLimit := r.Header.Get("X-Rate-Limit-Mbps")
|
||||
if len(strRateLimit) > 0 {
|
||||
rateLimit, err = strconv.ParseInt(strRateLimit, 10, 0)
|
||||
if err != nil {
|
||||
log.Errorf("Parse ratelimit error: %s", err)
|
||||
httperrors.InvalidInputError(ctx, w, "Invaild rate limit header")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
streamHeader(w, f, startPos, endPos)
|
||||
startStream(w, f, startPos, endPos, rateLimit)
|
||||
}
|
||||
|
||||
func streamHeader(w http.ResponseWriter, f IImage, startPos, endPos int64) {
|
||||
var statusCode = http.StatusOK
|
||||
w.Header().Set("Content-Type", "application/octet-stream")
|
||||
if startPos > 0 || endPos < f.Length()-1 {
|
||||
statusCode = http.StatusPartialContent
|
||||
w.Header().Set("Content-Range",
|
||||
fmt.Sprintf("bytes %d-%d/%d", startPos, endPos, f.Length()))
|
||||
}
|
||||
w.WriteHeader(statusCode)
|
||||
}
|
||||
|
||||
func startStream(w http.ResponseWriter, f IImage, startPos, endPos, rateLimit int64) {
|
||||
var CHUNK_SIZE int64 = int64(HostImageOptions.StreamChunkSize * 1024)
|
||||
var readSize int64 = CHUNK_SIZE
|
||||
var sendBytes int64
|
||||
var lz4Writer = lz4.NewWriter(w)
|
||||
var startTime = time.Now()
|
||||
|
||||
opts := []lz4.Option{
|
||||
lz4.BlockSizeOption(lz4.Block4Mb),
|
||||
lz4.ConcurrencyOption(-1),
|
||||
lz4.CompressionLevelOption(lz4.Fast),
|
||||
}
|
||||
if HostImageOptions.Lz4ChecksumOff {
|
||||
log.Infof("Turn off lz4 checksum")
|
||||
opts = append(opts,
|
||||
lz4.BlockChecksumOption(false),
|
||||
lz4.ChecksumOption(false),
|
||||
)
|
||||
}
|
||||
if err := lz4Writer.Apply(opts...); err != nil {
|
||||
log.Errorf("lz4Writer.Apply options error: %v", err)
|
||||
goto fail
|
||||
}
|
||||
|
||||
for startPos < endPos {
|
||||
if endPos-startPos < CHUNK_SIZE {
|
||||
readSize = endPos - startPos + 1
|
||||
}
|
||||
buf, err := f.Read(startPos, readSize)
|
||||
if err != nil {
|
||||
log.Errorf("Read image error: %s", err)
|
||||
goto fail
|
||||
}
|
||||
startPos += readSize
|
||||
wSize, err := lz4Writer.Write(buf)
|
||||
if err != nil {
|
||||
log.Errorf("lz4Write error: %s", err)
|
||||
goto fail
|
||||
}
|
||||
sendBytes += int64(wSize)
|
||||
if rateLimit > 0 {
|
||||
tmDelta := time.Now().Sub(startTime)
|
||||
tms := tmDelta.Seconds()
|
||||
vtmDelta := float64(sendBytes*8) / float64(1024.0*1024.0*rateLimit)
|
||||
if vtmDelta > tms {
|
||||
time.Sleep(time.Duration(vtmDelta - tms))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fail:
|
||||
if err := lz4Writer.Close(); err != nil {
|
||||
log.Errorf("lz4 Close error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func getImageMeta(ctx context.Context, w http.ResponseWriter, r *http.Request) {
|
||||
imagePath, err := inputCheck(ctx)
|
||||
err = nbdExportManager.QemuNbdCloseExport(targetDiskId)
|
||||
if err != nil {
|
||||
httperrors.GeneralServerError(ctx, w, err)
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
f IImage
|
||||
encryptInfo *apis.SEncryptInfo
|
||||
)
|
||||
|
||||
if r.Header.Get("X-Read-File") == "true" {
|
||||
f = &SFile{}
|
||||
} else {
|
||||
f = &SQcow2Image{}
|
||||
}
|
||||
|
||||
log.Infof("open image %s", imagePath)
|
||||
err = f.Load(imagePath, true, true)
|
||||
if err != nil {
|
||||
encryptKey := r.Header.Get("X-Encrypt-Key")
|
||||
if len(encryptKey) > 0 {
|
||||
encryptInfo = new(apis.SEncryptInfo)
|
||||
encryptInfo.Key = encryptKey
|
||||
encryptInfo.Alg = seclib2.TSymEncAlg(r.Header.Get("X-Encrypt-Alg"))
|
||||
}
|
||||
|
||||
if err = f.Open(imagePath, true, encryptInfo); err != nil {
|
||||
log.Errorf("Open image error: %s", err)
|
||||
httperrors.GeneralServerError(ctx, w, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
w.Header().Set("Content-Length", fmt.Sprintf("%d", f.Length()))
|
||||
w.Header().Set("Content-Type", "application/octet-stream")
|
||||
w.Header().Set("Accept-Ranges", "bytes")
|
||||
w.WriteHeader(200)
|
||||
log.Infof("Image %s request nbd close export with port", targetDiskId)
|
||||
appsrv.SendStruct(w, map[string]string{"result": "ok"})
|
||||
}
|
||||
|
||||
@@ -1,266 +0,0 @@
|
||||
// Copyright 2019 Yunion
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package hostimage
|
||||
|
||||
/*
|
||||
#cgo pkg-config: glib-2.0 zlib
|
||||
|
||||
#include "libqemuio.h"
|
||||
#include "qemu/osdep.h"
|
||||
*/
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"yunion.io/x/log"
|
||||
"yunion.io/x/pkg/errors"
|
||||
|
||||
"yunion.io/x/onecloud/pkg/apis"
|
||||
"yunion.io/x/onecloud/pkg/util/qemuimg"
|
||||
)
|
||||
|
||||
var qemuBlkCache sync.Map
|
||||
|
||||
type QemuioBlkDev struct {
|
||||
imagePath string
|
||||
readonly bool
|
||||
encrypted bool
|
||||
refCount int32
|
||||
|
||||
blk *C.struct_QemuioBlk
|
||||
}
|
||||
|
||||
func init() {
|
||||
C.qemuio_init()
|
||||
}
|
||||
|
||||
func (qb *QemuioBlkDev) ReadQcow2(offset int64, count int64) ([]byte, int64) {
|
||||
if qb.blk == nil || offset < 0 || count < 0 {
|
||||
return nil, -1
|
||||
}
|
||||
var b = make([]byte, count)
|
||||
var total = C.int64_t(0)
|
||||
ret := C.read_qcow2(qb.blk, unsafe.Pointer(&b[0]), C.int64_t(offset), C.int64_t(count), &total)
|
||||
if ret < 0 {
|
||||
return nil, int64(ret)
|
||||
} else {
|
||||
return b, int64(total)
|
||||
}
|
||||
}
|
||||
|
||||
func OpenQcow2(disk *qemuimg.SImageInfo, readonly bool) *QemuioBlkDev {
|
||||
key := fmt.Sprintf("%s_%v", disk.Path, readonly)
|
||||
if blk, ok := qemuBlkCache.Load(key); ok {
|
||||
qb := blk.(*QemuioBlkDev)
|
||||
atomic.AddInt32(&qb.refCount, 1)
|
||||
return qb
|
||||
}
|
||||
|
||||
qb := &QemuioBlkDev{
|
||||
imagePath: disk.Path,
|
||||
readonly: readonly,
|
||||
}
|
||||
diskPath := C.CString(disk.Path)
|
||||
imageOpts := C.CString(disk.ImageOptions())
|
||||
|
||||
// sec options
|
||||
var secretOpts *C.char
|
||||
secOpt := disk.SecretOptions()
|
||||
if len(secOpt) > 0 {
|
||||
secretOpts = C.CString(secOpt)
|
||||
qb.encrypted = true
|
||||
}
|
||||
|
||||
// qemu io open image
|
||||
blk := C.open_qcow2(diskPath, imageOpts, secretOpts, C.bool(readonly))
|
||||
|
||||
C.free(unsafe.Pointer(diskPath))
|
||||
C.free(unsafe.Pointer(imageOpts))
|
||||
|
||||
if secretOpts != nil {
|
||||
C.free(unsafe.Pointer(secretOpts))
|
||||
}
|
||||
|
||||
if blk == nil {
|
||||
// failed open qemu image
|
||||
return nil
|
||||
}
|
||||
|
||||
qb.blk = blk
|
||||
qb.refCount = 1
|
||||
qemuBlkCache.Store(key, qb)
|
||||
return qb
|
||||
}
|
||||
|
||||
func LoadQcow2(imagePath string, readonly bool) *QemuioBlkDev {
|
||||
key := fmt.Sprintf("%s_%v", imagePath, readonly)
|
||||
if blk, ok := qemuBlkCache.Load(key); ok {
|
||||
return blk.(*QemuioBlkDev)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (qb *QemuioBlkDev) Qcow2GetLength() int64 {
|
||||
if qb.blk == nil {
|
||||
return -1
|
||||
} else {
|
||||
return int64(C.qcow2_get_length(qb.blk))
|
||||
}
|
||||
}
|
||||
|
||||
func (qb *QemuioBlkDev) CloseQcow2() {
|
||||
if qb.blk != nil && atomic.AddInt32(&qb.refCount, -1) == 0 {
|
||||
var secId *C.char
|
||||
if qb.encrypted {
|
||||
secId = C.CString(filepath.Base(qb.imagePath))
|
||||
}
|
||||
|
||||
C.close_qcow2(qb.blk, secId)
|
||||
qemuBlkCache.Delete(fmt.Sprintf("%s_%v", qb.imagePath, qb.readonly))
|
||||
|
||||
if secId != nil {
|
||||
C.free(unsafe.Pointer(secId))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type IImage interface {
|
||||
// Open image file and its backing file (if have)
|
||||
Open(imagePath string, readonly bool, encryptInfo *apis.SEncryptInfo) error
|
||||
|
||||
// load opend qcow2 img form qemu blk cache
|
||||
Load(imagePath string, readonly, reference bool) error
|
||||
|
||||
// Close may not really close image file handle, just reudce ref count
|
||||
Close()
|
||||
|
||||
// If return number < 0 indicate read failed
|
||||
Read(offset, count int64) ([]byte, error)
|
||||
|
||||
// Get image file length, not file actual length, it's image virtual size
|
||||
Length() int64
|
||||
}
|
||||
|
||||
type SQcow2Image struct {
|
||||
fd *QemuioBlkDev
|
||||
}
|
||||
|
||||
func (img *SQcow2Image) newQemuImage(imagePath string, encryptInfo *apis.SEncryptInfo) *qemuimg.SImageInfo {
|
||||
info := &qemuimg.SImageInfo{
|
||||
Path: imagePath,
|
||||
}
|
||||
|
||||
if encryptInfo != nil {
|
||||
info.SetSecId(filepath.Base(imagePath))
|
||||
info.Password = encryptInfo.Key
|
||||
info.EncryptAlg = encryptInfo.Alg
|
||||
}
|
||||
return info
|
||||
}
|
||||
|
||||
func (img *SQcow2Image) Open(imagePath string, readonly bool, encryptInfo *apis.SEncryptInfo) error {
|
||||
disk := img.newQemuImage(imagePath, encryptInfo)
|
||||
fd := OpenQcow2(disk, readonly)
|
||||
if fd == nil {
|
||||
return fmt.Errorf("open image %s failed", imagePath)
|
||||
} else {
|
||||
img.fd = fd
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (img *SQcow2Image) Load(imagePath string, readonly, reference bool) error {
|
||||
fd := LoadQcow2(imagePath, readonly)
|
||||
if fd == nil {
|
||||
return fmt.Errorf("image %s readonly: %v not found", imagePath, readonly)
|
||||
} else {
|
||||
atomic.AddInt32(&fd.refCount, 1)
|
||||
img.fd = fd
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (img *SQcow2Image) Read(offset, count int64) ([]byte, error) {
|
||||
buf, ret := img.fd.ReadQcow2(offset, count)
|
||||
if ret >= 0 {
|
||||
return buf[:ret], nil
|
||||
} else {
|
||||
return nil, errors.Errorf("failed read data %d", ret)
|
||||
}
|
||||
}
|
||||
|
||||
func (img *SQcow2Image) Close() {
|
||||
img.fd.CloseQcow2()
|
||||
}
|
||||
|
||||
func (img *SQcow2Image) Length() int64 {
|
||||
return img.fd.Qcow2GetLength()
|
||||
}
|
||||
|
||||
type SFile struct {
|
||||
fd *os.File
|
||||
}
|
||||
|
||||
func (f *SFile) Open(imagePath string, readonly bool, encryptInfo *apis.SEncryptInfo) error {
|
||||
var mode = os.O_RDWR
|
||||
if readonly {
|
||||
mode = os.O_RDONLY
|
||||
}
|
||||
fd, err := os.OpenFile(imagePath, mode, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
f.fd = fd
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (f *SFile) Load(imagePath string, readonly, reference bool) error {
|
||||
return fmt.Errorf("File don't support load")
|
||||
}
|
||||
|
||||
func (f *SFile) Read(offset, count int64) ([]byte, error) {
|
||||
if _, err := f.fd.Seek(offset, io.SeekStart); err != nil {
|
||||
log.Errorf("seek file %s", err)
|
||||
return nil, errors.Wrap(err, "seek")
|
||||
}
|
||||
buf := make([]byte, count)
|
||||
r := bufio.NewReader(f.fd)
|
||||
n, err := r.Read(buf)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "read")
|
||||
}
|
||||
return buf[:n], nil
|
||||
}
|
||||
|
||||
func (f *SFile) Close() {
|
||||
f.fd.Close()
|
||||
}
|
||||
|
||||
func (f *SFile) Length() int64 {
|
||||
stat, e := f.fd.Stat()
|
||||
if e != nil {
|
||||
return -1
|
||||
}
|
||||
return stat.Size()
|
||||
}
|
||||
162
pkg/hostimage/nbd.go
Normal file
162
pkg/hostimage/nbd.go
Normal file
@@ -0,0 +1,162 @@
|
||||
// Copyright 2019 Yunion
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package hostimage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"yunion.io/x/log"
|
||||
"yunion.io/x/pkg/errors"
|
||||
"yunion.io/x/pkg/util/version"
|
||||
|
||||
"yunion.io/x/onecloud/pkg/util/fileutils2"
|
||||
"yunion.io/x/onecloud/pkg/util/netutils2"
|
||||
"yunion.io/x/onecloud/pkg/util/procutils"
|
||||
"yunion.io/x/onecloud/pkg/util/qemuimg"
|
||||
"yunion.io/x/onecloud/pkg/util/qemutils"
|
||||
)
|
||||
|
||||
var EXPORT_NBD_BASE_PORT = 7777
|
||||
var LAST_USED_NBD_SERVER_PORT = 0
|
||||
|
||||
type SNbdExportManager struct {
|
||||
portsLock *sync.Mutex
|
||||
}
|
||||
|
||||
func NewNbdExportManager() *SNbdExportManager {
|
||||
return &SNbdExportManager{
|
||||
portsLock: new(sync.Mutex),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *SNbdExportManager) GetFreePortByBase(basePort int) int {
|
||||
var port = 1
|
||||
for {
|
||||
if netutils2.IsTcpPortUsed("0.0.0.0", basePort+port) {
|
||||
port += 1
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
return port + basePort
|
||||
}
|
||||
|
||||
func (m *SNbdExportManager) GetNBDServerFreePort() int {
|
||||
basePort := EXPORT_NBD_BASE_PORT + LAST_USED_NBD_SERVER_PORT
|
||||
var port = 1
|
||||
for {
|
||||
if netutils2.IsTcpPortUsed("0.0.0.0", basePort+port) {
|
||||
port += 1
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
LAST_USED_NBD_SERVER_PORT = port
|
||||
if LAST_USED_NBD_SERVER_PORT > 1000 {
|
||||
LAST_USED_NBD_SERVER_PORT = 0
|
||||
}
|
||||
return port + basePort
|
||||
}
|
||||
|
||||
func (m *SNbdExportManager) getQemuNbdVersion() (string, error) {
|
||||
output, err := procutils.NewRemoteCommandAsFarAsPossible(qemutils.GetQemuNbd(), "--version").Output()
|
||||
if err != nil {
|
||||
log.Errorf("qemu-nbd version failed %s %s", output, err.Error())
|
||||
return "", errors.Wrapf(err, "qemu-nbd version failed %s", output)
|
||||
}
|
||||
lines := strings.Split(strings.TrimSpace(string(output)), "\n")
|
||||
if len(lines) > 0 {
|
||||
parts := strings.Split(lines[0], " ")
|
||||
return parts[1], nil
|
||||
}
|
||||
return "", errors.Error("empty version output")
|
||||
}
|
||||
|
||||
func (m *SNbdExportManager) QemuNbdStartExport(imageInfo qemuimg.SImageInfo, diskId string) (int, error) {
|
||||
m.portsLock.Lock()
|
||||
defer m.portsLock.Unlock()
|
||||
|
||||
nbdPort := m.GetNBDServerFreePort()
|
||||
pidFilePath := path.Join(HostImageOptions.HostImageNbdPidDir, fmt.Sprintf("nbd_%s.pid", diskId))
|
||||
|
||||
nbdVer, err := m.getQemuNbdVersion()
|
||||
if err != nil {
|
||||
return -1, errors.Wrap(err, "getQemuNbdVersion")
|
||||
}
|
||||
var cmd []string
|
||||
if imageInfo.Encrypted() {
|
||||
cmd = []string{
|
||||
qemutils.GetQemuNbd(),
|
||||
"--read-only", "--persistent", "-x", diskId, "-p", strconv.Itoa(nbdPort),
|
||||
"--object", imageInfo.SecretOptions(),
|
||||
"--image-opts", imageInfo.ImageOptions(),
|
||||
}
|
||||
} else {
|
||||
cmd = []string{
|
||||
qemutils.GetQemuNbd(),
|
||||
"--read-only", "--persistent", "-x", diskId, "-p", strconv.Itoa(nbdPort),
|
||||
imageInfo.Path,
|
||||
}
|
||||
}
|
||||
cmd = append(cmd, "--pid-file", pidFilePath)
|
||||
if version.GE(nbdVer, "4.0.0") {
|
||||
cmd = append(cmd, "--fork")
|
||||
}
|
||||
cmdStr := strings.Join(cmd, " ")
|
||||
err = procutils.NewRemoteCommandAsFarAsPossible("sh", "-c", cmdStr).Run()
|
||||
if err != nil {
|
||||
log.Errorf("qemu-nbd connect failed %s %s", err.Error())
|
||||
return -1, errors.Wrapf(err, "qemu-nbd connect failed")
|
||||
}
|
||||
return nbdPort, nil
|
||||
}
|
||||
|
||||
func (m *SNbdExportManager) QemuNbdCloseExport(diskId string) error {
|
||||
pidFilePath := path.Join(HostImageOptions.HostImageNbdPidDir, fmt.Sprintf("nbd_%s.pid", diskId))
|
||||
if !m.nbdProcessExist(diskId) {
|
||||
if fileutils2.Exists(pidFilePath) {
|
||||
if err := os.Remove(pidFilePath); err != nil {
|
||||
log.Errorf("failed remove nbd pid file %s", pidFilePath)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if fileutils2.Exists(pidFilePath) {
|
||||
pid, err := fileutils2.FileGetIntContent(pidFilePath)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed get pid of qemu-nbd process %s", pidFilePath)
|
||||
}
|
||||
out, err := procutils.NewRemoteCommandAsFarAsPossible("kill", "-9", strconv.Itoa(pid)).Output()
|
||||
if err != nil {
|
||||
log.Errorf("failed kill nbd export process %s %s", err, out)
|
||||
return errors.Wrapf(err, "kill nbd export failed: %s", out)
|
||||
}
|
||||
|
||||
if err := os.Remove(pidFilePath); err != nil {
|
||||
log.Errorf("failed remove nbd pid file %s", pidFilePath)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *SNbdExportManager) nbdProcessExist(diskId string) bool {
|
||||
return procutils.NewRemoteCommandAsFarAsPossible("sh", "-c",
|
||||
fmt.Sprintf("ps -ef | grep [q]emu-nbd | grep %s", diskId)).Run() == nil
|
||||
}
|
||||
@@ -140,7 +140,7 @@ func (m *SGuestManager) GuestCreateFromCloudpods(
|
||||
iDisk := storage.CreateDisk(diskId)
|
||||
diskUrl := fmt.Sprintf("http://%s:48885/disks/%s",
|
||||
createConfig.CloudpodsAccessInfo.HostIp, createConfig.CloudpodsAccessInfo.OriginDisksId[i])
|
||||
if err = iDisk.CreateFromImageFuse(ctx, diskUrl, 0, nil); err != nil {
|
||||
if err = iDisk.CreateFromRemoteHostImage(ctx, diskUrl, 0, nil); err != nil {
|
||||
log.Errorf("failed create disk %s from fuse %s", diskUrl, err)
|
||||
break
|
||||
}
|
||||
|
||||
@@ -1640,10 +1640,8 @@ func (s *SGuestResumeTask) onStartRunning() {
|
||||
|
||||
disksIdx := s.GetNeedMergeBackingFileDiskIndexs()
|
||||
if len(disksIdx) > 0 {
|
||||
s.SyncStatus("")
|
||||
timeutils2.AddTimeout(
|
||||
time.Second*time.Duration(options.HostOptions.AutoMergeDelaySeconds),
|
||||
func() { s.startStreamDisks(disksIdx) })
|
||||
s.startStreamDisks(disksIdx)
|
||||
//s.SyncStatus("")
|
||||
} else if options.HostOptions.AutoMergeBackingTemplate {
|
||||
s.SyncStatus("")
|
||||
timeutils2.AddTimeout(
|
||||
|
||||
@@ -794,10 +794,10 @@ func (s *SKVMGuestInstance) asyncScriptStart(ctx context.Context, params interfa
|
||||
encryptInfo = new(apis.SEncryptInfo)
|
||||
data.Unmarshal(encryptInfo, "encrypt_info")
|
||||
}
|
||||
err = s.fuseMount(encryptInfo)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "fuse mount")
|
||||
}
|
||||
//err = s.fuseMount(encryptInfo)
|
||||
//if err != nil {
|
||||
// return nil, errors.Wrap(err, "fuse mount")
|
||||
//}
|
||||
|
||||
var vcpuOrder = make([][]int, 0)
|
||||
isMigrate := jsonutils.QueryBoolean(data, "need_migrate", false)
|
||||
@@ -1244,12 +1244,15 @@ func (s *SKVMGuestInstance) eventBlockJobReady(event *monitor.Event) {
|
||||
log.Errorf("failed get disk from index %d", diskIndex)
|
||||
return
|
||||
}
|
||||
var diskId, diskPath string
|
||||
var diskId, diskPath, diskUrl string
|
||||
var mergeSnapshots bool
|
||||
for i := 0; i < len(disks); i++ {
|
||||
index := disks[i].Index
|
||||
if index == int8(diskIndex) {
|
||||
diskId = disks[i].DiskId
|
||||
diskPath = disks[i].Path
|
||||
diskUrl = disks[i].Url
|
||||
mergeSnapshots = disks[i].MergeSnapshot
|
||||
}
|
||||
}
|
||||
if len(diskId) == 0 {
|
||||
@@ -1262,7 +1265,9 @@ func (s *SKVMGuestInstance) eventBlockJobReady(event *monitor.Event) {
|
||||
log.Errorf("eventBlockJobReady failed get disk %s", diskPath)
|
||||
return
|
||||
}
|
||||
disk.PostCreateFromImageFuse()
|
||||
if mergeSnapshots {
|
||||
disk.PostCreateFromRemoteHostImage(diskUrl)
|
||||
}
|
||||
blockJobCount := s.BlockJobsCount()
|
||||
if blockJobCount == 0 {
|
||||
for {
|
||||
@@ -2247,6 +2252,9 @@ func (s *SKVMGuestInstance) delTmpDisks(ctx context.Context, migrated bool) erro
|
||||
return err
|
||||
}
|
||||
}
|
||||
if d != nil && disk.MergeSnapshot {
|
||||
d.PostCreateFromRemoteHostImage(disk.Url)
|
||||
}
|
||||
if migrated {
|
||||
if d != nil && utils.IsInStringArray(d.GetType(), []string{api.STORAGE_SLVM, api.STORAGE_CLVM}) {
|
||||
if err := lvmutils.LVDeactivate(d.GetPath()); err != nil {
|
||||
@@ -2992,11 +3000,12 @@ func (s *SKVMGuestInstance) streamDisksComplete(ctx context.Context) {
|
||||
disks := s.Desc.Disks
|
||||
for i, _ := range disks {
|
||||
d, _ := storageman.GetManager().GetDiskByPath(disks[i].Path)
|
||||
if d != nil {
|
||||
log.Infof("Disk %s do post create from fuse", d.GetId())
|
||||
d.PostCreateFromImageFuse()
|
||||
}
|
||||
if disks[i].MergeSnapshot {
|
||||
if d != nil {
|
||||
log.Infof("Disk %s do post create from fuse", d.GetId())
|
||||
d.PostCreateFromRemoteHostImage(disks[i].Url)
|
||||
}
|
||||
|
||||
disks[i].MergeSnapshot = false
|
||||
s.needSyncStreamDisks = true
|
||||
}
|
||||
|
||||
@@ -17,17 +17,20 @@ package storageman
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"path"
|
||||
|
||||
"yunion.io/x/jsonutils"
|
||||
"yunion.io/x/log"
|
||||
"yunion.io/x/pkg/errors"
|
||||
"yunion.io/x/pkg/util/httputils"
|
||||
|
||||
"yunion.io/x/onecloud/pkg/apis"
|
||||
api "yunion.io/x/onecloud/pkg/apis/compute"
|
||||
"yunion.io/x/onecloud/pkg/hostman/guestman/desc"
|
||||
deployapi "yunion.io/x/onecloud/pkg/hostman/hostdeployer/apis"
|
||||
"yunion.io/x/onecloud/pkg/hostman/hostdeployer/deployclient"
|
||||
"yunion.io/x/onecloud/pkg/mcclient/auth"
|
||||
"yunion.io/x/onecloud/pkg/util/qemuimg"
|
||||
"yunion.io/x/onecloud/pkg/util/seclib2"
|
||||
)
|
||||
@@ -66,10 +69,10 @@ type IDisk interface {
|
||||
CreateFromTemplate(context.Context, string, string, int64, *apis.SEncryptInfo) (jsonutils.JSONObject, error)
|
||||
CreateFromSnapshotLocation(ctx context.Context, location string, size int64, encryptInfo *apis.SEncryptInfo) (jsonutils.JSONObject, error)
|
||||
CreateFromRbdSnapshot(ctx context.Context, snapshotId, srcDiskId, srcPool string) error
|
||||
CreateFromImageFuse(ctx context.Context, url string, size int64, encryptInfo *apis.SEncryptInfo) error
|
||||
CreateFromRemoteHostImage(ctx context.Context, url string, size int64, encryptInfo *apis.SEncryptInfo) error
|
||||
CreateRaw(ctx context.Context, sizeMb int, diskFormat string, fsFormat string,
|
||||
encryptInfo *apis.SEncryptInfo, diskId string, back string) (jsonutils.JSONObject, error)
|
||||
PostCreateFromImageFuse()
|
||||
PostCreateFromRemoteHostImage(diskUrl string)
|
||||
CreateSnapshot(snapshotId string, encryptKey string, encFormat qemuimg.TEncryptFormat, encAlg seclib2.TSymEncAlg) error
|
||||
DeleteSnapshot(snapshotId, convertSnapshot string, blockStream bool, encryptInfo apis.SEncryptInfo) error
|
||||
DeployGuestFs(diskInfo *deployapi.DiskInfo, guestDesc *desc.SGuestDesc,
|
||||
@@ -126,7 +129,7 @@ func (d *SBaseDisk) CreateFromSnapshotLocation(ctx context.Context, location str
|
||||
return nil, errors.Errorf("Not implemented")
|
||||
}
|
||||
|
||||
func (d *SBaseDisk) CreateFromImageFuse(ctx context.Context, url string, size int64, encryptInfo *apis.SEncryptInfo) error {
|
||||
func (d *SBaseDisk) CreateFromRemoteHostImage(ctx context.Context, url string, size int64, encryptInfo *apis.SEncryptInfo) error {
|
||||
return errors.Errorf("unsupported operation")
|
||||
}
|
||||
|
||||
@@ -174,7 +177,7 @@ func (d *SBaseDisk) RebuildSlaveDisk(diskUri string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *SBaseDisk) PostCreateFromImageFuse() {
|
||||
func (d *SBaseDisk) PostCreateFromRemoteHostImage(string) {
|
||||
}
|
||||
|
||||
func (d *SBaseDisk) GetZoneId() string {
|
||||
@@ -254,3 +257,43 @@ func (d *SBaseDisk) RollbackDiskOnSnapshotFail(snapshotId string) error {
|
||||
func (d *SBaseDisk) GetBackupDir() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (d *SBaseDisk) RequestExportNbdImage(ctx context.Context, url string, encryptInfo *apis.SEncryptInfo) (int64, error) {
|
||||
body := jsonutils.NewDict()
|
||||
body.Set("disk_id", jsonutils.NewString(d.GetId()))
|
||||
header := http.Header{}
|
||||
userCred := auth.AdminCredential()
|
||||
header.Set("X-Auth-Token", userCred.GetTokenString())
|
||||
if encryptInfo != nil {
|
||||
header.Set("X-Encrypt-Key", encryptInfo.Key)
|
||||
header.Set("X-Encrypt-Alg", string(encryptInfo.Alg))
|
||||
}
|
||||
|
||||
url = fmt.Sprintf("%s/%s", url, "nbd-export")
|
||||
httpClient := httputils.GetDefaultClient()
|
||||
_, respBody, err := httputils.JSONRequest(httpClient, ctx, "POST", url, header, body, false)
|
||||
if err != nil {
|
||||
return -1, errors.Wrap(err, "request export image")
|
||||
}
|
||||
nbdPort, err := respBody.Int("nbd_port")
|
||||
if err != nil {
|
||||
return -1, errors.Wrapf(err, "failed get nbd port from %s", url)
|
||||
}
|
||||
return nbdPort, nil
|
||||
}
|
||||
|
||||
func (d *SBaseDisk) RequestCloseNbdImage(ctx context.Context, url string) error {
|
||||
body := jsonutils.NewDict()
|
||||
body.Set("disk_id", jsonutils.NewString(d.GetId()))
|
||||
header := http.Header{}
|
||||
userCred := auth.AdminCredential()
|
||||
header.Set("X-Auth-Token", userCred.GetTokenString())
|
||||
|
||||
url = fmt.Sprintf("%s/%s", url, "nbd-close")
|
||||
httpClient := httputils.GetDefaultClient()
|
||||
_, _, err := httputils.JSONRequest(httpClient, ctx, "POST", url, header, body, false)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "request close image")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ func (d *SCLVMDisk) PrepareMigrate(liveMigrate bool) ([]string, string, bool, er
|
||||
return nil, "", false, fmt.Errorf("Not support")
|
||||
}
|
||||
|
||||
func (d *SCLVMDisk) CreateFromImageFuse(ctx context.Context, url string, size int64, encryptInfo *apis.SEncryptInfo) error {
|
||||
func (d *SCLVMDisk) CreateFromRemoteHostImage(ctx context.Context, url string, size int64, encryptInfo *apis.SEncryptInfo) error {
|
||||
return fmt.Errorf("Not support")
|
||||
}
|
||||
|
||||
|
||||
@@ -37,9 +37,8 @@ import (
|
||||
"yunion.io/x/onecloud/pkg/hostman/options"
|
||||
"yunion.io/x/onecloud/pkg/hostman/storageman/remotefile"
|
||||
"yunion.io/x/onecloud/pkg/httperrors"
|
||||
"yunion.io/x/onecloud/pkg/mcclient/auth"
|
||||
"yunion.io/x/onecloud/pkg/util/fileutils2"
|
||||
"yunion.io/x/onecloud/pkg/util/fuseutils"
|
||||
"yunion.io/x/onecloud/pkg/util/netutils2"
|
||||
"yunion.io/x/onecloud/pkg/util/procutils"
|
||||
"yunion.io/x/onecloud/pkg/util/qemuimg"
|
||||
"yunion.io/x/onecloud/pkg/util/seclib2"
|
||||
@@ -201,43 +200,35 @@ func (d *SLocalDisk) Resize(ctx context.Context, params interface{}) (jsonutils.
|
||||
return d.GetDiskDesc(), nil
|
||||
}
|
||||
|
||||
func (d *SLocalDisk) CreateFromImageFuse(ctx context.Context, url string, size int64, encryptInfo *apis.SEncryptInfo) error {
|
||||
log.Infof("Create from image fuse %s", url)
|
||||
func (d *SLocalDisk) CreateFromRemoteHostImage(ctx context.Context, url string, size int64, encryptInfo *apis.SEncryptInfo) error {
|
||||
log.Infof("Create from remote host image %s", url)
|
||||
nbdPort, err := d.RequestExportNbdImage(ctx, url, encryptInfo)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "RequestExportNbdImage")
|
||||
}
|
||||
remoteHostIp := netutils2.ParseIpFromUrl(url)
|
||||
nbdImagePath := fmt.Sprintf("nbd://%s:%d/%s", remoteHostIp, nbdPort, d.GetId())
|
||||
log.Infof("remote nbd image exported %s", nbdImagePath)
|
||||
|
||||
localPath := d.Storage.GetFuseTmpPath()
|
||||
mntPath := path.Join(d.Storage.GetFuseMountPath(), d.Id)
|
||||
contentPath := path.Join(mntPath, "content")
|
||||
newImg, err := qemuimg.NewQemuImage(d.getPath())
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("qemuimg.NewQemuImage %s fail: %s", d.getPath(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
if newImg.IsValid() && newImg.IsChained() && newImg.BackFilePath != contentPath {
|
||||
if newImg.IsValid() && newImg.IsChained() && newImg.BackFilePath != nbdImagePath {
|
||||
if err := newImg.Delete(); err != nil {
|
||||
log.Errorln(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
if !newImg.IsValid() || newImg.IsChained() {
|
||||
if err := fuseutils.MountFusefs(
|
||||
options.HostOptions.FetcherfsPath, url, localPath,
|
||||
auth.GetTokenString(), mntPath, options.HostOptions.FetcherfsBlockSize, encryptInfo,
|
||||
); err != nil {
|
||||
log.Errorln(err)
|
||||
return err
|
||||
}
|
||||
if encryptInfo != nil {
|
||||
err = newImg.CreateQcow2(0, false, nbdImagePath, encryptInfo.Key, qemuimg.EncryptFormatLuks, encryptInfo.Alg)
|
||||
} else {
|
||||
err = newImg.CreateQcow2(0, false, nbdImagePath, "", "", "")
|
||||
}
|
||||
if !newImg.IsValid() {
|
||||
if encryptInfo != nil {
|
||||
err = newImg.CreateQcow2(0, false, contentPath, encryptInfo.Key, qemuimg.EncryptFormatLuks, encryptInfo.Alg)
|
||||
} else {
|
||||
err = newImg.CreateQcow2(0, false, contentPath, "", "", "")
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "create from fuse")
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "create from remote host image")
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -394,21 +385,10 @@ func (d *SLocalDisk) GetDiskSetupScripts(diskIndex int) string {
|
||||
return cmd
|
||||
}
|
||||
|
||||
func (d *SLocalDisk) PostCreateFromImageFuse() {
|
||||
mntPath := path.Join(d.Storage.GetFuseMountPath(), d.Id)
|
||||
if output, err := procutils.NewCommand("umount", mntPath).Output(); err != nil {
|
||||
log.Errorf("umount %s failed: %s, %s", mntPath, err, output)
|
||||
}
|
||||
if output, err := procutils.NewCommand("rm", "-rf", mntPath).Output(); err != nil {
|
||||
log.Errorf("rm %s failed: %s, %s", mntPath, err, output)
|
||||
}
|
||||
tmpPath := d.Storage.GetFuseTmpPath()
|
||||
tmpFiles, err := ioutil.ReadDir(tmpPath)
|
||||
if err != nil {
|
||||
for _, f := range tmpFiles {
|
||||
if strings.HasPrefix(f.Name(), d.Id) {
|
||||
procutils.NewCommand("rm", "-f", path.Join(tmpPath, f.Name()))
|
||||
}
|
||||
func (d *SLocalDisk) PostCreateFromRemoteHostImage(diskUrl string) {
|
||||
if diskUrl != "" {
|
||||
if err := d.RequestCloseNbdImage(context.Background(), diskUrl); err != nil {
|
||||
log.Errorf("failed request close nbd image %s: %s", diskUrl, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -701,7 +681,7 @@ func (d *SLocalDisk) RebuildSlaveDisk(diskUri string) error {
|
||||
return errors.Errorf("failed delete slave top disk file %s %s", output, err)
|
||||
}
|
||||
diskUrl := fmt.Sprintf("%s/%s", diskUri, d.Id)
|
||||
if err := d.CreateFromImageFuse(context.Background(), diskUrl, 0, nil); err != nil {
|
||||
if err := d.CreateFromRemoteHostImage(context.Background(), diskUrl, 0, nil); err != nil {
|
||||
return errors.Wrap(err, "failed create slave disk")
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -17,7 +17,6 @@ package storageman
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
@@ -34,13 +33,12 @@ import (
|
||||
"yunion.io/x/onecloud/pkg/cloudcommon/consts"
|
||||
deployapi "yunion.io/x/onecloud/pkg/hostman/hostdeployer/apis"
|
||||
"yunion.io/x/onecloud/pkg/hostman/hostutils"
|
||||
"yunion.io/x/onecloud/pkg/hostman/options"
|
||||
"yunion.io/x/onecloud/pkg/hostman/storageman/lvmutils"
|
||||
"yunion.io/x/onecloud/pkg/hostman/storageman/storageutils"
|
||||
"yunion.io/x/onecloud/pkg/mcclient/auth"
|
||||
identity_modules "yunion.io/x/onecloud/pkg/mcclient/modules/identity"
|
||||
"yunion.io/x/onecloud/pkg/util/fileutils2"
|
||||
"yunion.io/x/onecloud/pkg/util/fuseutils"
|
||||
"yunion.io/x/onecloud/pkg/util/netutils2"
|
||||
"yunion.io/x/onecloud/pkg/util/procutils"
|
||||
"yunion.io/x/onecloud/pkg/util/qemuimg"
|
||||
"yunion.io/x/onecloud/pkg/util/seclib2"
|
||||
@@ -158,66 +156,44 @@ func (d *SLVMDisk) Delete(ctx context.Context, params interface{}) (jsonutils.JS
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (d *SLVMDisk) PostCreateFromImageFuse() {
|
||||
mntPath := path.Join(d.Storage.GetFuseMountPath(), d.Id)
|
||||
if output, err := procutils.NewCommand("umount", mntPath).Output(); err != nil {
|
||||
log.Errorf("umount %s failed: %s, %s", mntPath, err, output)
|
||||
}
|
||||
if output, err := procutils.NewCommand("rm", "-rf", mntPath).Output(); err != nil {
|
||||
log.Errorf("rm %s failed: %s, %s", mntPath, err, output)
|
||||
}
|
||||
tmpPath := d.Storage.GetFuseTmpPath()
|
||||
tmpFiles, err := ioutil.ReadDir(tmpPath)
|
||||
if err != nil {
|
||||
for _, f := range tmpFiles {
|
||||
if strings.HasPrefix(f.Name(), d.Id) {
|
||||
procutils.NewCommand("rm", "-f", path.Join(tmpPath, f.Name()))
|
||||
}
|
||||
func (d *SLVMDisk) PostCreateFromRemoteHostImage(diskUrl string) {
|
||||
if diskUrl != "" {
|
||||
if err := d.RequestCloseNbdImage(context.Background(), diskUrl); err != nil {
|
||||
log.Errorf("failed request close nbd image %s: %s", diskUrl, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *SLVMDisk) CreateFromImageFuse(ctx context.Context, url string, size int64, encryptInfo *apis.SEncryptInfo) error {
|
||||
log.Infof("Create from image fuse %s", url)
|
||||
func (d *SLVMDisk) CreateFromRemoteHostImage(ctx context.Context, url string, size int64, encryptInfo *apis.SEncryptInfo) error {
|
||||
log.Infof("Create from remote host image %s", url)
|
||||
nbdPort, err := d.RequestExportNbdImage(ctx, url, encryptInfo)
|
||||
remoteHostIp := netutils2.ParseIpFromUrl(url)
|
||||
nbdImagePath := fmt.Sprintf("nbd://%s:%d/%s", remoteHostIp, nbdPort, d.GetId())
|
||||
log.Infof("remote nbd image exported %s", nbdImagePath)
|
||||
|
||||
localPath := d.Storage.GetFuseTmpPath()
|
||||
mntPath := path.Join(d.Storage.GetFuseMountPath(), d.Id)
|
||||
contentPath := path.Join(mntPath, "content")
|
||||
newImg, err := qemuimg.NewQemuImage(d.GetPath())
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("qemuimg.NewQemuImage %s fail: %s", d.GetPath(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
if newImg.IsValid() && newImg.IsChained() && newImg.BackFilePath != contentPath {
|
||||
if newImg.IsValid() && newImg.IsChained() && newImg.BackFilePath != nbdImagePath {
|
||||
if err := lvmutils.LvRemove(d.GetPath()); err != nil {
|
||||
return errors.Wrap(err, "remove disk")
|
||||
}
|
||||
}
|
||||
if !newImg.IsValid() || newImg.IsChained() {
|
||||
if err := fuseutils.MountFusefs(
|
||||
options.HostOptions.FetcherfsPath, url, localPath,
|
||||
auth.GetTokenString(), mntPath, options.HostOptions.FetcherfsBlockSize, encryptInfo,
|
||||
); err != nil {
|
||||
log.Errorln(err)
|
||||
return err
|
||||
}
|
||||
lvSize := lvmutils.GetQcow2LvSize(size)
|
||||
if err := lvmutils.LvCreate(d.Storage.GetPath(), d.Id, lvSize*1024*1024); err != nil {
|
||||
return errors.Wrap(err, "lvcreate")
|
||||
}
|
||||
if !newImg.IsValid() {
|
||||
lvSize := lvmutils.GetQcow2LvSize(size)
|
||||
if err := lvmutils.LvCreate(d.Storage.GetPath(), d.Id, lvSize*1024*1024); err != nil {
|
||||
return errors.Wrap(err, "lvcreate")
|
||||
}
|
||||
|
||||
if encryptInfo != nil {
|
||||
err = newImg.CreateQcow2(0, false, contentPath, encryptInfo.Key, qemuimg.EncryptFormatLuks, encryptInfo.Alg)
|
||||
} else {
|
||||
err = newImg.CreateQcow2(0, false, contentPath, "", "", "")
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "create from fuse")
|
||||
}
|
||||
if encryptInfo != nil {
|
||||
err = newImg.CreateQcow2(0, false, nbdImagePath, encryptInfo.Key, qemuimg.EncryptFormatLuks, encryptInfo.Alg)
|
||||
} else {
|
||||
err = newImg.CreateQcow2(0, false, nbdImagePath, "", "", "")
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "create from remote host image")
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -677,7 +653,7 @@ func (d *SLVMDisk) RebuildSlaveDisk(diskUri string) error {
|
||||
return errors.Wrap(err, "lvremove")
|
||||
}
|
||||
diskUrl := fmt.Sprintf("%s/%s", diskUri, d.Id)
|
||||
if err := d.CreateFromImageFuse(context.Background(), diskUrl, 0, nil); err != nil {
|
||||
if err := d.CreateFromRemoteHostImage(context.Background(), diskUrl, 0, nil); err != nil {
|
||||
return errors.Wrap(err, "failed create slave disk")
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -46,7 +46,7 @@ func (d *SNasDisk) CreateFromTemplate(ctx context.Context, imageId, format strin
|
||||
return d.SLocalDisk.createFromTemplateAndResize(ctx, imageId, format, imageCacheManager, encryptInfo, size)
|
||||
}
|
||||
|
||||
func (d *SNasDisk) CreateFromImageFuse(ctx context.Context, url string, size int64, encryptInfo *apis.SEncryptInfo) error {
|
||||
func (d *SNasDisk) CreateFromRemoteHostImage(ctx context.Context, url string, size int64, encryptInfo *apis.SEncryptInfo) error {
|
||||
return fmt.Errorf("Not implemented")
|
||||
}
|
||||
|
||||
|
||||
@@ -210,7 +210,7 @@ func (d *SRBDDisk) createFromTemplate(ctx context.Context, imageId, format strin
|
||||
return d.GetDiskDesc(), nil
|
||||
}
|
||||
|
||||
func (d *SRBDDisk) CreateFromImageFuse(ctx context.Context, url string, size int64, encryptInfo *apis.SEncryptInfo) error {
|
||||
func (d *SRBDDisk) CreateFromRemoteHostImage(ctx context.Context, url string, size int64, encryptInfo *apis.SEncryptInfo) error {
|
||||
return fmt.Errorf("Not support")
|
||||
}
|
||||
|
||||
@@ -234,8 +234,8 @@ func (d *SRBDDisk) CreateRaw(ctx context.Context, sizeMb int, diskFromat string,
|
||||
return d.GetDiskDesc(), nil
|
||||
}
|
||||
|
||||
func (d *SRBDDisk) PostCreateFromImageFuse() {
|
||||
log.Errorf("Not support PostCreateFromImageFuse")
|
||||
func (d *SRBDDisk) PostCreateFromRemoteHostImage(string) {
|
||||
log.Errorf("Not support PostCreateFromRemoteHostImage")
|
||||
}
|
||||
|
||||
func (d *SRBDDisk) DiskBackup(ctx context.Context, params interface{}) (jsonutils.JSONObject, error) {
|
||||
|
||||
@@ -223,7 +223,7 @@ func (d *SSLVMDisk) CreateSnapshot(snapshotId string, encryptKey string, encForm
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *SSLVMDisk) PostCreateFromImageFuse() {
|
||||
func (d *SSLVMDisk) PostCreateFromRemoteHostImage(string) {
|
||||
log.Infof("slvm post create from fuse do nothing")
|
||||
}
|
||||
|
||||
|
||||
@@ -961,9 +961,9 @@ func (s *SLocalStorage) CreateDiskFromSnapshot(ctx context.Context, disk IDisk,
|
||||
if info.Encryption {
|
||||
encryptInfo = &info.EncryptInfo
|
||||
}
|
||||
err := disk.CreateFromImageFuse(ctx, info.SnapshotUrl, int64(info.DiskSizeMb), encryptInfo)
|
||||
err := disk.CreateFromRemoteHostImage(ctx, info.SnapshotUrl, int64(info.DiskSizeMb), encryptInfo)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "CreateFromImageFuse")
|
||||
return nil, errors.Wrapf(err, "CreateFromRemoteHostImage")
|
||||
}
|
||||
return disk.GetDiskDesc(), nil
|
||||
}
|
||||
|
||||
@@ -507,9 +507,9 @@ func (s *SLVMStorage) CreateDiskFromSnapshot(ctx context.Context, disk IDisk, in
|
||||
if info.Encryption {
|
||||
encryptInfo = &info.EncryptInfo
|
||||
}
|
||||
err := disk.CreateFromImageFuse(ctx, info.SnapshotUrl, int64(info.DiskSizeMb), encryptInfo)
|
||||
err := disk.CreateFromRemoteHostImage(ctx, info.SnapshotUrl, int64(info.DiskSizeMb), encryptInfo)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "CreateFromImageFuse")
|
||||
return nil, errors.Wrapf(err, "CreateFromRemoteHostImage")
|
||||
}
|
||||
return disk.GetDiskDesc(), nil
|
||||
}
|
||||
|
||||
@@ -15,7 +15,9 @@
|
||||
package netutils2
|
||||
|
||||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
)
|
||||
|
||||
@@ -38,3 +40,16 @@ func GetHttpRequestIp(r *http.Request) string {
|
||||
}
|
||||
return ipStr
|
||||
}
|
||||
|
||||
func ParseIpFromUrl(u string) string {
|
||||
parsedUrl, err := url.Parse(u)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
host := parsedUrl.Hostname()
|
||||
ip := net.ParseIP(host)
|
||||
if ip == nil {
|
||||
return ""
|
||||
}
|
||||
return ip.String()
|
||||
}
|
||||
|
||||
@@ -66,15 +66,6 @@ build_bin() {
|
||||
local BUILD_ARCH=$2
|
||||
local BUILD_CGO=$3
|
||||
case "$1" in
|
||||
host-image)
|
||||
rm -vf _output/bin/$1
|
||||
rm -rvf _output/bin/bundles/$1
|
||||
if [ -z "$LIBQEMUIO_PATH" ]; then
|
||||
echo "Need set \$LIBQEMUIO_PATH env to build host-image"
|
||||
exit 1
|
||||
fi
|
||||
GOOS=linux make cmd/$1
|
||||
;;
|
||||
climc)
|
||||
rm -vf _output/bin/*cli
|
||||
env $BUILD_ARCH $BUILD_CGO make -C "$SRC_DIR" docker-alpine-build F="cmd/$1 cmd/*cli"
|
||||
@@ -88,14 +79,14 @@ build_bin() {
|
||||
esac
|
||||
}
|
||||
|
||||
build_bundle_libraries() {
|
||||
for bundle_component in 'host-image'; do
|
||||
if [ $1 == $bundle_component ]; then
|
||||
$CUR_DIR/bundle_libraries.sh _output/bin/bundles/$1 _output/bin/$1
|
||||
break
|
||||
fi
|
||||
done
|
||||
}
|
||||
# build_bundle_libraries() {
|
||||
# for bundle_component in 'host-image'; do
|
||||
# if [ $1 == $bundle_component ]; then
|
||||
# $CUR_DIR/bundle_libraries.sh _output/bin/bundles/$1 _output/bin/$1
|
||||
# break
|
||||
# fi
|
||||
# done
|
||||
# }
|
||||
|
||||
build_image() {
|
||||
local tag=$1
|
||||
@@ -163,7 +154,7 @@ build_process() {
|
||||
echo "[$(readlink -f ${BASH_SOURCE}):${LINENO} ${FUNCNAME[0]}] return for DRY_RUN"
|
||||
return
|
||||
fi
|
||||
build_bundle_libraries $component
|
||||
# build_bundle_libraries $component
|
||||
|
||||
build_image $img_name $DOCKER_DIR/Dockerfile.$component $SRC_DIR
|
||||
}
|
||||
@@ -249,7 +240,7 @@ show_update_cmd() {
|
||||
local spec=$1
|
||||
local name=$1
|
||||
local tag=${TAG}
|
||||
if [[ "$arch" == arm64 || "$component" == host-image ]]; then
|
||||
if [[ "$arch" == arm64 ]]; then
|
||||
tag="${tag}-$arch"
|
||||
fi
|
||||
|
||||
@@ -294,10 +285,6 @@ for component in $COMPONENTS; do
|
||||
fi
|
||||
|
||||
echo "Start to build component: $component"
|
||||
if [[ $component == host-image ]]; then
|
||||
build_process $component $ARCH "false"
|
||||
continue
|
||||
fi
|
||||
|
||||
case "$ARCH" in
|
||||
all)
|
||||
|
||||
Reference in New Issue
Block a user