Skip to content

Commit

Permalink
Set volume status when migrating
Browse files Browse the repository at this point in the history
Resolves ZSTAC-6476

Signed-off-by: AlanJager <[email protected]>
  • Loading branch information
AlanJager committed Mar 13, 2018
1 parent 8788bb2 commit 61977c6
Show file tree
Hide file tree
Showing 5 changed files with 377 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.zstack.header.volume;

import org.zstack.header.message.NeedReplyMessage;

/**
* Created by kayo on 2018/3/9.
*/
public class ChangeVolumeStatusMsg extends NeedReplyMessage implements VolumeMessage {
private String volumeUuid;
private VolumeStatus status;

@Override
public String getVolumeUuid() {
return volumeUuid;
}

public void setVolumeUuid(String volumeUuid) {
this.volumeUuid = volumeUuid;
}

public VolumeStatus getStatus() {
return status;
}

public void setStatus(VolumeStatus status) {
this.status = status;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.zstack.header.volume;

import org.zstack.header.message.MessageReply;

/**
* Created by kayo on 2018/3/9.
*/
public class ChangeVolumeStatusReply extends MessageReply {
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.zstack.core.thread.SyncTaskChain;
import org.zstack.core.workflow.FlowChainBuilder;
import org.zstack.core.workflow.ShareFlow;
import org.zstack.core.workflow.SimpleFlowChain;
import org.zstack.header.apimediator.ApiMessageInterceptionException;
import org.zstack.header.cluster.ClusterInventory;
import org.zstack.header.cluster.ClusterVO;
Expand All @@ -34,6 +35,7 @@
import org.zstack.header.message.APIMessage;
import org.zstack.header.message.Message;
import org.zstack.header.message.MessageReply;
import org.zstack.header.message.OverlayMessage;
import org.zstack.header.storage.primary.*;
import org.zstack.header.storage.primary.VolumeSnapshotCapability.VolumeSnapshotArrangementType;
import org.zstack.header.storage.snapshot.*;
Expand Down Expand Up @@ -260,64 +262,219 @@ private void handle(final APILocalStorageMigrateVolumeMsg msg) {
return;
}

MigrateVolumeOnLocalStorageMsg mmsg = new MigrateVolumeOnLocalStorageMsg();
mmsg.setPrimaryStorageUuid(msg.getPrimaryStorageUuid());
mmsg.setDestHostUuid(msg.getDestHostUuid());
mmsg.setVolumeUuid(msg.getVolumeUuid());
bus.makeTargetServiceIdByResourceUuid(mmsg, PrimaryStorageConstant.SERVICE_ID, self.getUuid());

MigrateVolumeOverlayMsg omsg = new MigrateVolumeOverlayMsg();
omsg.setMessage(mmsg);
omsg.setVolumeUuid(msg.getVolumeUuid());
bus.makeTargetServiceIdByResourceUuid(omsg, VolumeConstant.SERVICE_ID, msg.getVolumeUuid());

MigrateRootVolumeVmOverlayMsg vmsg = new MigrateRootVolumeVmOverlayMsg();

Tuple t = Q.New(VmInstanceVO.class).select(VmInstanceVO_.uuid, VmInstanceVO_.state)
.eq(VmInstanceVO_.rootVolumeUuid, msg.getVolumeUuid())
.findTuple();
String vmUuid = t == null ? null : t.get(0, String.class);
String originStateEvent = t == null ? null : t.get(1, VmInstanceState.class).getDrivenEvent().toString();

if(vmUuid != null){
ChangeVmStateMsg cmsg = new ChangeVmStateMsg();
cmsg.setStateEvent(VmInstanceStateEvent.volumeMigrating.toString());
cmsg.setVmInstanceUuid(vmUuid);
bus.makeTargetServiceIdByResourceUuid(cmsg, VmInstanceConstant.SERVICE_ID, vmUuid);
MessageReply reply = bus.call(cmsg);
if(!reply.isSuccess()){
evt.setError(reply.getError());
bus.publish(evt);
return;
class MigrateStruct {
private boolean isRootVolume = false;
private OverlayMessage message;
private String vmUuid;
private boolean isMigrated = false;

public String getVmOriginState() {
return vmOriginState;
}

public void setVmOriginState(String vmOriginState) {
this.vmOriginState = vmOriginState;
}

private String vmOriginState;

public boolean isRootVolume() {
return isRootVolume;
}

public void setRootVolume(boolean rootVolume) {
isRootVolume = rootVolume;
}

public OverlayMessage getMessage() {
return message;
}

public void setMessage(OverlayMessage message) {
this.message = message;
}

public String getVmUuid() {
return vmUuid;
}

public void setVmUuid(String vmUuid) {
this.vmUuid = vmUuid;
}

public boolean isMigrated() {
return isMigrated;
}

public void setMigrated(boolean migrated) {
isMigrated = migrated;
}
vmsg.setMessage(omsg);
vmsg.setVmInstanceUuid(vmUuid);
bus.makeTargetServiceIdByResourceUuid(vmsg, VmInstanceConstant.SERVICE_ID, vmUuid);
}

bus.send(vmUuid == null ? omsg : vmsg, new CloudBusCallBack(msg) {
MigrateStruct struct = new MigrateStruct();
VolumeStatus originStatus = Q.New(VolumeVO.class).select(VolumeVO_.status).eq(VolumeVO_.uuid, msg.getVolumeUuid()).findValue();
FlowChain chain = new SimpleFlowChain();
chain.setName(String.format("local-storage-%s-migrate-volume-%s-to-host-%s", msg.getPrimaryStorageUuid(), msg.getVolumeUuid(), msg.getDestHostUuid()));
chain.then(new NoRollbackFlow() {
@Override
public void run(MessageReply reply) {
if(vmUuid != null){
ChangeVmStateMsg cmsg = new ChangeVmStateMsg();
cmsg.setStateEvent(reply.isSuccess() ? VmInstanceStateEvent.volumeMigrated.toString() : originStateEvent);
cmsg.setVmInstanceUuid(vmUuid);
bus.makeTargetServiceIdByResourceUuid(cmsg, VmInstanceConstant.SERVICE_ID, vmUuid);
// if fail, host ping task will sync it state
bus.call(cmsg);
public void run(FlowTrigger trigger, Map data) {
String __name__ = "change-volume-status-to-migrating";

ChangeVolumeStatusMsg changeVolumeStatusMsg = new ChangeVolumeStatusMsg();
changeVolumeStatusMsg.setStatus(VolumeStatus.Migrating);
changeVolumeStatusMsg.setVolumeUuid(msg.getVolumeUuid());
bus.makeTargetServiceIdByResourceUuid(changeVolumeStatusMsg, VolumeConstant.SERVICE_ID, msg.getVolumeUuid());
bus.send(changeVolumeStatusMsg, new CloudBusCallBack(changeVolumeStatusMsg) {
@Override
public void run(MessageReply reply) {
if (!reply.isSuccess()) {
trigger.fail(reply.getError());
return;
}

trigger.next();
}
});
}
}).then(new NoRollbackFlow() {
@Override
public void run(FlowTrigger trigger, Map data) {
String __name__ = "change-vm-state-to-volume-migrating";

Tuple t = Q.New(VmInstanceVO.class).select(VmInstanceVO_.uuid, VmInstanceVO_.state)
.eq(VmInstanceVO_.rootVolumeUuid, msg.getVolumeUuid())
.findTuple();
String vmUuid = t == null ? null : t.get(0, String.class);
String originStateEvent = t == null ? null : t.get(1, VmInstanceState.class).getDrivenEvent().toString();

if (vmUuid == null) {
trigger.next();
return;
}

if (!reply.isSuccess()) {
evt.setError(reply.getError());
bus.publish(evt);
struct.setRootVolume(true);
struct.setVmUuid(vmUuid);
struct.setVmOriginState(originStateEvent);

ChangeVmStateMsg cmsg = new ChangeVmStateMsg();
cmsg.setStateEvent(VmInstanceStateEvent.volumeMigrating.toString());
cmsg.setVmInstanceUuid(struct.getVmUuid());
bus.makeTargetServiceIdByResourceUuid(cmsg, VmInstanceConstant.SERVICE_ID, struct.getVmUuid());
bus.send(cmsg, new CloudBusCallBack(cmsg) {
@Override
public void run(MessageReply reply) {
if(!reply.isSuccess()){
trigger.fail(reply.getError());
return;
}

trigger.next();
}
});
}
}).then(new NoRollbackFlow() {
@Override
public void run(FlowTrigger trigger, Map data) {
String __name__ = "migrate-volume-on-local-storage";

MigrateVolumeOnLocalStorageMsg mmsg = new MigrateVolumeOnLocalStorageMsg();
mmsg.setPrimaryStorageUuid(msg.getPrimaryStorageUuid());
mmsg.setDestHostUuid(msg.getDestHostUuid());
mmsg.setVolumeUuid(msg.getVolumeUuid());
bus.makeTargetServiceIdByResourceUuid(mmsg, PrimaryStorageConstant.SERVICE_ID, self.getUuid());

MigrateVolumeOverlayMsg omsg = new MigrateVolumeOverlayMsg();
omsg.setMessage(mmsg);
omsg.setVolumeUuid(msg.getVolumeUuid());
bus.makeTargetServiceIdByResourceUuid(omsg, VolumeConstant.SERVICE_ID, msg.getVolumeUuid());

struct.setMessage(omsg);

if (struct.isRootVolume) {
MigrateRootVolumeVmOverlayMsg vmsg = new MigrateRootVolumeVmOverlayMsg();
vmsg.setMessage(omsg);
vmsg.setVmInstanceUuid(struct.getVmUuid());
bus.makeTargetServiceIdByResourceUuid(vmsg, VmInstanceConstant.SERVICE_ID, struct.getVmUuid());

struct.setMessage(vmsg);
struct.setRootVolume(true);
}

bus.send(struct.getMessage(), new CloudBusCallBack(struct.getMessage()) {
@Override
public void run(MessageReply reply) {
if (!reply.isSuccess()) {
trigger.fail(reply.getError());
return;
}

MigrateVolumeOnLocalStorageReply mr = reply.castReply();
evt.setInventory(mr.getInventory());
struct.setMigrated(true);
trigger.next();
}
});
}
}).then(new NoRollbackFlow() {
@Override
public void run(FlowTrigger trigger, Map data) {
String __name__ = "change-vm-state-to-volume-migrated";

if (!struct.isRootVolume) {
trigger.next();
return;
}

MigrateVolumeOnLocalStorageReply mr = reply.castReply();
evt.setInventory(mr.getInventory());
ChangeVmStateMsg cmsg = new ChangeVmStateMsg();
cmsg.setStateEvent(VmInstanceStateEvent.volumeMigrated.toString());
cmsg.setVmInstanceUuid(struct.getVmUuid());
bus.makeTargetServiceIdByResourceUuid(cmsg, VmInstanceConstant.SERVICE_ID, struct.getVmUuid());
// if fail, host ping task will sync it state
bus.send(cmsg, new CloudBusCallBack(cmsg) {
@Override
public void run(MessageReply reply) {
if (!reply.isSuccess()) {
trigger.fail(reply.getError());
return;
}

trigger.next();
}
});
}
}).then(new NoRollbackFlow() {
@Override
public void run(FlowTrigger trigger, Map data) {
String __name__ = "change-volume-status-to-origin";

ChangeVolumeStatusMsg changeVolumeStatusMsg = new ChangeVolumeStatusMsg();
changeVolumeStatusMsg.setStatus(originStatus);
changeVolumeStatusMsg.setVolumeUuid(msg.getVolumeUuid());
bus.makeTargetServiceIdByResourceUuid(changeVolumeStatusMsg, VolumeConstant.SERVICE_ID, msg.getVolumeUuid());
bus.send(changeVolumeStatusMsg, new CloudBusCallBack(changeVolumeStatusMsg) {
@Override
public void run(MessageReply reply) {
if (!reply.isSuccess()) {
trigger.fail(reply.getError());
return;
}

trigger.next();
}
});
}
}).done(new FlowDoneHandler(msg) {
@Override
public void handle(Map data) {
bus.publish(evt);
}
});
}).error(new FlowErrorHandler(msg) {
@Override
public void handle(ErrorCode errCode, Map data) {
evt.setError(errCode);
bus.publish(evt);
}
}).start();
}

private void handle(final MigrateVolumeOnLocalStorageMsg msg) {
Expand Down Expand Up @@ -379,6 +536,7 @@ private void migrateVolume(MigrateVolumeOnLocalStorageMsg msg, NoErrorCompletion
VolumeVO volume;
MigrateBitsStruct struct = new MigrateBitsStruct();
LocalStorageHypervisorBackend bkd;
VolumeStatus originVolumeStatus;

{
SimpleQuery<LocalStorageResourceRefVO> q = dbf.createQuery(LocalStorageResourceRefVO.class);
Expand Down Expand Up @@ -442,6 +600,10 @@ public LocalStorageResourceRefInventory call(LocalStorageResourceRefVO arg) {

LocalStorageHypervisorFactory f = getHypervisorBackendFactoryByHostUuid(msg.getDestHostUuid());
bkd = f.getHypervisorBackend(self);

originVolumeStatus = volume.getStatus();
volume.setStatus(VolumeStatus.Migrating);
SQL.New(VolumeVO.class).set(VolumeVO_.status, VolumeStatus.Migrating).eq(VolumeVO_.uuid, volume.getUuid()).update();
}

@Override
Expand Down Expand Up @@ -568,6 +730,10 @@ protected void scripts() {
}
}

sql(VolumeVO.class)
.eq(VolumeVO_.uuid, volumeRefVO.getResourceUuid())
.set(VolumeVO_.status, originVolumeStatus)
.update();

LocalStorageResourceRefVO vo = Q.New(LocalStorageResourceRefVO.class)
.eq(LocalStorageResourceRefVO_.resourceUuid, volumeRefVO.getResourceUuid())
Expand Down
Loading

0 comments on commit 61977c6

Please sign in to comment.