Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 136 additions & 107 deletions pkg/importer/vddk-datasource_amd64.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,17 @@ func (vmware *VMwareClient) FindDiskInRootSnapshotParent(snapshots []types.Virtu
var parent *types.VirtualDeviceFileBackingInfo
switch disk.Backing.(type) {
case *types.VirtualDiskFlatVer1BackingInfo:
parent = &disk.Backing.(*types.VirtualDiskFlatVer1BackingInfo).Parent.VirtualDeviceFileBackingInfo
if info := disk.Backing.(*types.VirtualDiskFlatVer1BackingInfo); info != nil && info.Parent != nil {
parent = &info.Parent.VirtualDeviceFileBackingInfo
}
case *types.VirtualDiskFlatVer2BackingInfo:
parent = &disk.Backing.(*types.VirtualDiskFlatVer2BackingInfo).Parent.VirtualDeviceFileBackingInfo
if info := disk.Backing.(*types.VirtualDiskFlatVer2BackingInfo); info != nil && info.Parent != nil {
parent = &info.VirtualDeviceFileBackingInfo
}
case *types.VirtualDiskRawDiskMappingVer1BackingInfo:
parent = &disk.Backing.(*types.VirtualDiskRawDiskMappingVer1BackingInfo).Parent.VirtualDeviceFileBackingInfo
if info := disk.Backing.(*types.VirtualDiskRawDiskMappingVer1BackingInfo); info != nil && info.Parent != nil {
parent = &info.VirtualDeviceFileBackingInfo
}
}
if parent != nil && parent.FileName == fileName {
return disk
Expand Down Expand Up @@ -807,8 +813,9 @@ func logOnError(err error) {

// VDDKDataSource is the data provider for vddk.
type VDDKDataSource struct {
VMware *VMwareClient
BackingFile string
NbdKit *NbdKitWrapper
ChangedBlocks *types.DiskChangeInfo
CurrentSnapshot string
PreviousSnapshot string
Size uint64
Expand All @@ -835,13 +842,12 @@ func createVddkDataSource(endpoint string, accessKey string, secKey string, thum
return nil, errors.New("previous checkpoint set without current")
}

// Log in to VMware, and get everything needed up front
// Log in to VMware to make sure disks and snapshots are present
vmware, err := newVMwareClient(endpoint, accessKey, secKey, thumbprint, uuid)
if err != nil {
klog.Errorf("Unable to log in to VMware: %v", err)
return nil, err
}
defer func() { _ = vmware.Close() }()

// Find disk object for backingFile disk image path
backingFileObject, err := vmware.FindDiskFromName(backingFile)
Expand All @@ -860,80 +866,6 @@ func createVddkDataSource(endpoint string, accessKey string, secKey string, thum
}
}

// If this is a warm migration (current and previous checkpoints set),
// then get the list of changed blocks from VMware for a delta copy.
var changed *types.DiskChangeInfo
if currentSnapshot != nil && previousCheckpoint != "" {
disk, err := vmware.FindSnapshotDisk(currentSnapshot, backingFileObject.DiskObjectId)
if err != nil {
klog.Errorf("Could not find matching disk in current snapshot: %v", err)
return nil, err
}
// QueryChangedDiskAreas needs to be called multiple times to get all possible disk changes.
// Experimentation shows it returns maximally 2000 changed blocks. If the disk has more than
// 2000 changed blocks we need to query the next chunk of the blocks starting from previous.
// Loop until QueryChangedDiskAreas starts returning zero-length block lists.
changed = &types.DiskChangeInfo{}
// Check if this is a snapshot or a change ID, and query disk areas as appropriate.
// Change IDs look like: 52 de c0 d9 b9 43 9d 10-61 d5 4c 1b e9 7b 65 63/81
changeIDPattern := `([0-9a-fA-F]{2}\s?)*-([0-9a-fA-F]{2}\s?)*\/([0-9a-fA-F]*)`
if matched, _ := regexp.MatchString(changeIDPattern, previousCheckpoint); matched {
for {
klog.Infof("Querying changed disk areas at offset %d", changed.Length)
request := types.QueryChangedDiskAreas{
ChangeId: previousCheckpoint,
DeviceKey: backingFileObject.Key,
Snapshot: currentSnapshot,
StartOffset: changed.Length,
This: vmware.vm.Reference(),
}
response, err := QueryChangedDiskAreas(vmware.context, vmware.vm.Client(), &request)
if err != nil {
klog.Errorf("Failed to query changed areas: %s", err)
return nil, err
}
klog.Infof("%d changed areas reported at offset %d with data length %d", len(response.Returnval.ChangedArea), changed.Length, response.Returnval.Length)
if len(response.Returnval.ChangedArea) == 0 { // No more changes
break
}
changed.ChangedArea = append(changed.ChangedArea, response.Returnval.ChangedArea...)
changed.Length += response.Returnval.Length
// The start offset should not be the size of the disk otherwise the QueryChangedDiskAreas will fail
if changed.Length >= disk.CapacityInBytes {
klog.Infof("the offset %d is greater or equal to disk capacity %d", changed.Length, disk.CapacityInBytes)
break
}
}
} else { // Previous checkpoint is a snapshot
previousSnapshot, err := vmware.vm.FindSnapshot(vmware.context, previousCheckpoint)
if err != nil {
klog.Errorf("Could not find previous snapshot %s: %v", previousCheckpoint, err)
return nil, err
}
if previousSnapshot != nil {
for {
klog.Infof("Querying changed disk areas at offset %d", changed.Length)
changedAreas, err := vmware.vm.QueryChangedDiskAreas(vmware.context, previousSnapshot, currentSnapshot, backingFileObject, changed.Length)
if err != nil {
klog.Errorf("Unable to query changed areas: %s", err)
return nil, err
}
klog.Infof("%d changed areas reported at offset %d with data length %d", len(changedAreas.ChangedArea), changed.Length, changedAreas.Length)
if len(changedAreas.ChangedArea) == 0 {
break
}
changed.ChangedArea = append(changed.ChangedArea, changedAreas.ChangedArea...)
changed.Length += changedAreas.Length
// The start offset should not be the size of the disk otherwise the QueryChangedDiskAreas will fail
if changed.Length >= disk.CapacityInBytes {
klog.Infof("the offset %d is greater or equal to disk capacity %d", changed.Length, disk.CapacityInBytes)
break
}
}
}
}
}

diskFileName := backingFile // By default, just set the nbdkit file name to the given backingFile path
if currentSnapshot != nil {
// When copying from a snapshot, set the nbdkit file name to the name of the disk in the snapshot
Expand All @@ -953,17 +885,10 @@ func createVddkDataSource(endpoint string, accessKey string, secKey string, thum

// Get the total transfer size of either the disk or the delta
var size uint64
if changed != nil { // Warm migration: get size of the delta
size = 0
for _, change := range changed.ChangedArea {
size += uint64(change.Length)
}
} else { // Cold migration: get size of the whole disk
size, err = nbdkit.Handle.GetSize()
if err != nil {
klog.Errorf("Unable to get source disk size: %v", err)
return nil, err
}
size, err = nbdkit.Handle.GetSize()
if err != nil {
klog.Errorf("Unable to get source disk size: %v", err)
return nil, err
}

MaxPreadLength = MaxPreadLengthESX
Expand All @@ -973,8 +898,9 @@ func createVddkDataSource(endpoint string, accessKey string, secKey string, thum
}

source := &VDDKDataSource{
VMware: vmware,
BackingFile: backingFile,
NbdKit: nbdkit,
ChangedBlocks: changed,
CurrentSnapshot: currentCheckpoint,
PreviousSnapshot: previousCheckpoint,
Size: size,
Expand Down Expand Up @@ -1023,29 +949,35 @@ func (vs *VDDKDataSource) Transfer(path string) (ProcessingPhase, error) {
return ProcessingPhaseTransferDataFile, nil
}

// IsWarm returns true if this is a multi-stage transfer.
func (vs *VDDKDataSource) IsWarm() bool {
return vs.CurrentSnapshot != ""
}

// IsDeltaCopy is called to determine if this is a full copy or one delta copy stage
// in a warm migration.
// in a warm migration. This is different from IsWarm because the first step is
// a full copy, and subsequent steps are delta copies.
func (vs *VDDKDataSource) IsDeltaCopy() bool {
result := vs.PreviousSnapshot != "" && vs.CurrentSnapshot != ""
return result
}

// Mockable stat, so unit tests can run through TransferFile
var MockableStat = os.Stat

// TransferFile is called to transfer the data from the source to the file passed in.
func (vs *VDDKDataSource) TransferFile(fileName string) (ProcessingPhase, error) {
if !vs.IsDeltaCopy() {
defer func() { _ = vs.VMware.Close() }()

if !vs.IsWarm() {
if err := CleanAll(fileName); err != nil {
return ProcessingPhaseError, err
}
}

if vs.ChangedBlocks != nil { // Warm migration pre-checks
if len(vs.ChangedBlocks.ChangedArea) < 1 { // No changes? Immediately return success.
klog.Infof("No changes reported between snapshot %s and snapshot %s, marking transfer complete.", vs.PreviousSnapshot, vs.CurrentSnapshot)
return ProcessingPhaseComplete, nil
}

if vs.IsDeltaCopy() {
// Make sure file exists before applying deltas.
_, err := os.Stat(fileName)
_, err := MockableStat(fileName)
if os.IsNotExist(err) {
klog.Infof("Disk image does not exist, cannot apply deltas for warm migration: %v", err)
return ProcessingPhaseError, err
Expand Down Expand Up @@ -1097,15 +1029,112 @@ func (vs *VDDKDataSource) TransferFile(fileName string) (ProcessingPhase, error)
}
}

if vs.ChangedBlocks != nil { // Warm migration delta copy
for _, extent := range vs.ChangedBlocks.ChangedArea {
blocks := GetBlockStatus(vs.NbdKit.Handle, extent)
for _, block := range blocks {
err := CopyRange(vs.NbdKit.Handle, sink, block, updateProgress)
if vs.IsDeltaCopy() { // Warm migration delta copy
// Find disk object for backingFile disk image path
backingFileObject, err := vs.VMware.FindDiskFromName(vs.BackingFile)
if err != nil {
klog.Errorf("Could not find VM disk %s: %v", vs.BackingFile, err)
return ProcessingPhaseError, err
}

// Find current snapshot object if requested
var currentSnapshot *types.ManagedObjectReference
if vs.CurrentSnapshot != "" {
currentSnapshot, err = vs.VMware.vm.FindSnapshot(vs.VMware.context, vs.CurrentSnapshot)
if err != nil {
klog.Errorf("Could not find current snapshot %s: %v", vs.CurrentSnapshot, err)
return ProcessingPhaseError, err
}
}

disk, err := vs.VMware.FindSnapshotDisk(currentSnapshot, backingFileObject.DiskObjectId)
if err != nil {
klog.Errorf("Could not find matching disk in current snapshot: %v", err)
return ProcessingPhaseError, err
}

// Check if this is a snapshot or a change ID, and query disk areas as appropriate.
// Change IDs look like: 52 de c0 d9 b9 43 9d 10-61 d5 4c 1b e9 7b 65 63/81
changeIDPattern := `([0-9a-fA-F]{2}\s?)*-([0-9a-fA-F]{2}\s?)*\/([0-9a-fA-F]*)`
isChangeID, _ := regexp.MatchString(changeIDPattern, vs.PreviousSnapshot)
var previousSnapshot *types.ManagedObjectReference
if !isChangeID {
previousSnapshot, err = vs.VMware.vm.FindSnapshot(vs.VMware.context, vs.PreviousSnapshot)
if err != nil {
klog.Errorf("Could not find previous snapshot %s: %v", vs.PreviousSnapshot, err)
return ProcessingPhaseError, err
}
if previousSnapshot == nil {
return ProcessingPhaseError, fmt.Errorf("failed to find previous snapshot %s", vs.PreviousSnapshot)
}
}

// QueryChangedDiskAreas needs to be called multiple times to get all possible disk changes.
// Experimentation shows it returns maximally 2000 changed blocks. If the disk has more than
// 2000 changed blocks we need to query the next chunk of the blocks starting from previous.
// Loop until QueryChangedDiskAreas starts returning zero-length block lists.
offset := int64(0) // Next offset to query
for {
klog.Infof("Querying changed disk areas at offset %d", offset)
var changed types.DiskChangeInfo
if isChangeID { // Previous checkpoint is a change ID
request := types.QueryChangedDiskAreas{
ChangeId: vs.PreviousSnapshot,
DeviceKey: backingFileObject.Key,
Snapshot: currentSnapshot,
StartOffset: offset,
This: vs.VMware.vm.Reference(),
}
response, err := QueryChangedDiskAreas(vs.VMware.context, vs.VMware.vm.Client(), &request)
if err != nil {
klog.Errorf("Unable to copy block at offset %d: %v", block.Offset, err)
klog.Errorf("Failed to query changed areas: %s", err)
return ProcessingPhaseError, err
}
klog.Infof("%d changed areas reported at offset %d with data length %d", len(response.Returnval.ChangedArea), response.Returnval.StartOffset, response.Returnval.Length)
if len(response.Returnval.ChangedArea) == 0 { // No more changes
break
}
changed.ChangedArea = response.Returnval.ChangedArea
changed.StartOffset = response.Returnval.StartOffset
changed.Length = response.Returnval.Length
} else { // Previous checkpoint is a snapshot
changedAreas, err := vs.VMware.vm.QueryChangedDiskAreas(vs.VMware.context, previousSnapshot, currentSnapshot, backingFileObject, changed.Length)
if err != nil {
klog.Errorf("Unable to query changed areas: %s", err)
return ProcessingPhaseError, err
}
klog.Infof("%d changed areas reported at offset %d with data length %d", len(changedAreas.ChangedArea), changedAreas.StartOffset, changedAreas.Length)
if len(changedAreas.ChangedArea) == 0 {
break
}
changed.ChangedArea = changedAreas.ChangedArea
changed.StartOffset = changedAreas.StartOffset
changed.Length = changedAreas.Length
}

// No changes? Immediately return success.
if len(changed.ChangedArea) < 1 {
klog.Infof("No changes reported between snapshot %s and snapshot %s, marking transfer complete.", vs.PreviousSnapshot, vs.CurrentSnapshot)
return ProcessingPhaseComplete, nil
}

// Copy actual data from query ranges to destination
for _, extent := range changed.ChangedArea {
blocks := GetBlockStatus(vs.NbdKit.Handle, extent)
for _, block := range blocks {
err := CopyRange(vs.NbdKit.Handle, sink, block, updateProgress)
if err != nil {
klog.Errorf("Unable to copy block at offset %d: %v", block.Offset, err)
return ProcessingPhaseError, err
}
}
}

// The start offset should not be the size of the disk otherwise the next QueryChangedDiskAreas will fail
offset = changed.StartOffset + changed.Length
if offset >= disk.CapacityInBytes {
klog.Infof("The offset %d is greater or equal to disk capacity %d, assuming no further changes", offset, disk.CapacityInBytes)
break
}
}
} else { // Cold migration full copy
Expand Down
Loading