Skip to content

Commit

Permalink
feat: supports to run opsRequest forcibly and queuing based on opsReq…
Browse files Browse the repository at this point in the history
…uest type (apecloud#6828)
  • Loading branch information
wangyelei authored Mar 18, 2024
1 parent bdc693a commit f91a76c
Show file tree
Hide file tree
Showing 34 changed files with 1,251 additions and 274 deletions.
21 changes: 20 additions & 1 deletion apis/apps/v1alpha1/opsrequest_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ type OpsRequestSpec struct {
// +optional
Cancel bool `json:"cancel,omitempty"`

// Indicates if pre-checks should be bypassed, allowing the opsRequest to execute immediately. If set to true, pre-checks are skipped except for 'Start' type.
// Particularly useful when concurrent execution of VerticalScaling and HorizontalScaling opsRequests is required,
// achievable through the use of the Force flag.
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="forbidden to update spec.force"
// +optional
Force bool `json:"force,omitempty"`

// Defines the operation type.
// +kubebuilder:validation:Required
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="forbidden to update spec.type"
Expand Down Expand Up @@ -119,7 +126,6 @@ type OpsRequestSpec struct {
// +patchStrategy=merge,retainKeys
// +listType=map
// +listMapKey=componentName
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="forbidden to update spec.expose"
ExposeList []Expose `json:"expose,omitempty" patchStrategy:"merge,retainKeys" patchMergeKey:"componentName"`

// Cluster RestoreFrom backup or point in time.
Expand Down Expand Up @@ -786,6 +792,11 @@ type OpsRequestComponentStatus struct {
// +optional
WorkloadType WorkloadType `json:"workloadType,omitempty"`

// Describes the configuration covered by the latest OpsRequest of the same kind.
// when reconciling, this information will be used as a benchmark rather than the 'spec', such as 'Spec.HorizontalScaling'.
// +optional
OverrideBy *OverrideBy `json:"overrideBy,omitempty"`

// Describes the reason for the component phase.
// +kubebuilder:validation:MaxLength=1024
// +optional
Expand All @@ -797,6 +808,14 @@ type OpsRequestComponentStatus struct {
Message string `json:"message,omitempty" protobuf:"bytes,6,opt,name=message"`
}

type OverrideBy struct {
// Indicates the opsRequest name.
// +optional
OpsName string `json:"opsName"`

LastComponentConfiguration `json:",inline"`
}

type PreCheckResult struct {
// Indicates whether the preCheck operation was successful or not.
// +kubebuilder:validation:Required
Expand Down
9 changes: 9 additions & 0 deletions apis/apps/v1alpha1/opsrequest_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,22 @@ func (r *OpsRequest) IsComplete(phases ...OpsPhase) bool {
return slices.Contains([]OpsPhase{OpsCancelledPhase, OpsSucceedPhase, OpsFailedPhase}, phases[0])
}

// Force checks if the current opsRequest can be forcibly executed
func (r *OpsRequest) Force() bool {
// ops of type 'Start' do not support force execution.
return r.Spec.Force && r.Spec.Type != StartType
}

// validateClusterPhase validates whether the current cluster state supports the OpsRequest
func (r *OpsRequest) validateClusterPhase(cluster *Cluster) error {
opsBehaviour := OpsRequestBehaviourMapper[r.Spec.Type]
// if the OpsType has no cluster phases, ignore it
if len(opsBehaviour.FromClusterPhases) == 0 {
return nil
}
if r.Force() {
return nil
}
// validate whether existing the same type OpsRequest
var (
opsRequestValue string
Expand Down
2 changes: 2 additions & 0 deletions apis/apps/v1alpha1/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,8 @@ type OpsRecorder struct {
Type OpsType `json:"type"`
// indicates whether the current opsRequest is in the queue
InQueue bool `json:"inQueue,omitempty"`
// indicates that the operation is queued for execution within its own-type scope.
QueueBySelf bool `json:"queueBySelf,omitempty"`
}

// ProvisionPolicyType defines the policy for creating accounts.
Expand Down
158 changes: 157 additions & 1 deletion config/crd/bases/apps.kubeblocks.io_opsrequests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,15 @@ spec:
x-kubernetes-list-map-keys:
- componentName
x-kubernetes-list-type: map
force:
description: Indicates if pre-checks should be bypassed, allowing
the opsRequest to execute immediately. If set to true, pre-checks
are skipped except for 'Start' type. Particularly useful when concurrent
execution of VerticalScaling and HorizontalScaling opsRequests is
required, achievable through the use of the Force flag.
type: boolean
x-kubernetes-validations:
- message: forbidden to update spec.expose
- message: forbidden to update spec.force
rule: self == oldSelf
horizontalScaling:
description: Defines what component need to horizontal scale the specified
Expand Down Expand Up @@ -1070,6 +1077,155 @@ spec:
about this operation.
maxLength: 32768
type: string
overrideBy:
description: Describes the configuration covered by the latest
OpsRequest of the same kind. when reconciling, this information
will be used as a benchmark rather than the 'spec', such as
'Spec.HorizontalScaling'.
properties:
claims:
description: "Claims lists the names of resources, defined
in spec.resourceClaims, that are used by this container.
\n This is an alpha field and requires enabling the DynamicResourceAllocation
feature gate. \n This field is immutable. It can only
be set for containers."
items:
description: ResourceClaim references one entry in PodSpec.ResourceClaims.
properties:
name:
description: Name must match the name of one entry
in pod.spec.resourceClaims of the Pod where this
field is used. It makes that resource available
inside a container.
type: string
required:
- name
type: object
type: array
x-kubernetes-list-map-keys:
- name
x-kubernetes-list-type: map
classDefRef:
description: References a class defined in ComponentClassDefinition.
properties:
class:
description: Defines the name of the class that is defined
in the ComponentClassDefinition.
type: string
name:
description: Specifies the name of the ComponentClassDefinition.
maxLength: 63
pattern: ^[a-z0-9]([a-z0-9\.\-]*[a-z0-9])?$
type: string
required:
- class
type: object
limits:
additionalProperties:
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: 'Limits describes the maximum amount of compute
resources allowed. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/'
type: object
opsName:
description: Indicates the opsRequest name.
type: string
replicas:
description: Represents the last replicas of the component.
format: int32
type: integer
requests:
additionalProperties:
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: 'Requests describes the minimum amount of compute
resources required. If Requests is omitted for a container,
it defaults to Limits if that is explicitly specified,
otherwise to an implementation-defined value. Requests
cannot exceed Limits. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/'
type: object
services:
description: Records the last services of the component.
items:
properties:
annotations:
additionalProperties:
type: string
description: 'If ServiceType is LoadBalancer, cloud
provider related parameters can be put here. More
info: https://kubernetes.io/docs/concepts/services-networking/service/#loadbalancer.'
type: object
name:
description: The name of the service.
maxLength: 15
type: string
serviceType:
default: ClusterIP
description: "Determines how the Service is exposed.
Valid options are ClusterIP, NodePort, and LoadBalancer.
\n - `ClusterIP` allocates a cluster-internal IP
address for load-balancing to endpoints. Endpoints
are determined by the selector or if that is not
specified, they are determined by manual construction
of an Endpoints object or EndpointSlice objects.
If clusterIP is \"None\", no virtual IP is allocated
and the endpoints are published as a set of endpoints
rather than a virtual IP. - `NodePort` builds on
ClusterIP and allocates a port on every node which
routes to the same endpoints as the clusterIP. -
`LoadBalancer` builds on NodePort and creates an
external load-balancer (if supported in the current
cloud) which routes to the same endpoints as the
clusterIP. \n More info: https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types."
enum:
- ClusterIP
- NodePort
- LoadBalancer
type: string
x-kubernetes-preserve-unknown-fields: true
required:
- name
type: object
type: array
targetResources:
additionalProperties:
items:
type: string
type: array
description: Records the information about the target resources
affected by the component. The resource key is in the
list of [pods].
type: object
volumeClaimTemplates:
description: Records the last volumeClaimTemplates of the
component.
items:
properties:
name:
description: A reference to the volumeClaimTemplate
name from the cluster components.
type: string
storage:
anyOf:
- type: integer
- type: string
description: Specifies the requested storage size
for the volume.
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
required:
- name
- storage
type: object
type: array
type: object
x-kubernetes-preserve-unknown-fields: true
phase:
description: Describes the component phase, referencing Cluster.status.component.phase.
enum:
Expand Down
9 changes: 6 additions & 3 deletions controllers/apps/operations/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ func (c CustomOpsHandler) checkExpression(reqCtx intctrlutil.RequestCtx,
rule *appsv1alpha1.Rule,
compCustomOSpec appsv1alpha1.CustomOpsComponent) error {
opsSpec := opsRes.OpsRequest.Spec
if opsSpec.Force {
return nil
}
componentObjName := constant.GenerateClusterComponentName(opsSpec.ClusterRef, compCustomOSpec.ComponentName)
comp := &appsv1alpha1.Component{}
if err := cli.Get(reqCtx.Ctx, client.ObjectKey{Name: componentObjName, Namespace: opsRes.OpsRequest.Namespace}, comp); err != nil {
Expand All @@ -141,9 +144,9 @@ func (c CustomOpsHandler) checkExpression(reqCtx intctrlutil.RequestCtx,
// get the built-in objects and covert the json tag
getBuiltInObjs := func() (map[string]interface{}, error) {
b, err := json.Marshal(map[string]interface{}{
"cluster": opsRes.Cluster,
"component": comp,
"params": params,
"cluster": opsRes.Cluster,
"component": comp,
"parameters": params,
})
if err != nil {
return nil, err
Expand Down
5 changes: 2 additions & 3 deletions controllers/apps/operations/expose.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ var _ OpsHandler = ExposeOpsHandler{}
func init() {
// ToClusterPhase is not defined, because 'expose' does not affect the cluster status.
exposeBehavior := OpsBehaviour{
OpsHandler: ExposeOpsHandler{},
OpsHandler: ExposeOpsHandler{},
QueueBySelf: true,
}

opsMgr := GetOpsManager()
Expand All @@ -55,7 +56,6 @@ func (e ExposeOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli client.Clien
exposeMap = opsRes.OpsRequest.Spec.ToExposeListToMap()
)
reqCtx.Log.Info("cluster service before action", "clusterService", opsRes.Cluster.Spec.Services)

compMap := make(map[string]appsv1alpha1.ClusterComponentSpec)
for _, comp := range opsRes.Cluster.Spec.ComponentSpecs {
compMap[comp.Name] = comp
Expand Down Expand Up @@ -98,7 +98,6 @@ func (e ExposeOpsHandler) ReconcileAction(reqCtx intctrlutil.RequestCtx, cli cli
oldOpsRequestStatus = opsRequest.Status.DeepCopy()
opsRequestPhase = appsv1alpha1.OpsRunningPhase
)

patch := client.MergeFrom(opsRequest.DeepCopy())

// update component status
Expand Down
12 changes: 11 additions & 1 deletion controllers/apps/operations/horizontal_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func init() {
// TODO: we should add "force" flag for these opsRequest.
FromClusterPhases: appsv1alpha1.GetClusterUpRunningPhases(),
ToClusterPhase: appsv1alpha1.UpdatingClusterPhase,
QueueByCluster: true,
OpsHandler: hsHandler,
CancelFunc: hsHandler.Cancel,
}
Expand Down Expand Up @@ -83,7 +84,7 @@ func (hs horizontalScalingOpsHandler) ReconcileAction(reqCtx intctrlutil.Request
compStatus *appsv1alpha1.OpsRequestComponentStatus) (int32, int32, error) {
return handleComponentProgressForScalingReplicas(reqCtx, cli, opsRes, pgRes, compStatus, hs.getExpectReplicas)
}
return reconcileActionWithComponentOps(reqCtx, cli, opsRes, "", handleComponentProgress)
return reconcileActionWithComponentOps(reqCtx, cli, opsRes, "", syncOverrideByOpsForScaleReplicas, handleComponentProgress)
}

// SaveLastConfiguration records last configuration to the OpsRequest.status.lastConfiguration
Expand Down Expand Up @@ -116,6 +117,10 @@ func (hs horizontalScalingOpsHandler) SaveLastConfiguration(reqCtx intctrlutil.R
}

func (hs horizontalScalingOpsHandler) getExpectReplicas(opsRequest *appsv1alpha1.OpsRequest, componentName string) *int32 {
compStatus := opsRequest.Status.Components[componentName]
if compStatus.OverrideBy != nil {
return compStatus.OverrideBy.Replicas
}
for _, v := range opsRequest.Spec.HorizontalScalingList {
if v.ComponentName == componentName {
return &v.Replicas
Expand All @@ -140,6 +145,11 @@ func getCompPodNamesBeforeScaleDownReplicas(reqCtx intctrlutil.RequestCtx,

// Cancel this function defines the cancel horizontalScaling action.
func (hs horizontalScalingOpsHandler) Cancel(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) error {
for _, v := range opsRes.OpsRequest.Status.Components {
if v.OverrideBy != nil && v.OverrideBy.OpsName != "" {
return intctrlutil.NewErrorf(intctrlutil.ErrorIgnoreCancel, `can not cancel the opsRequest due to another opsRequest "%s" is running`, v.OverrideBy.OpsName)
}
}
return cancelComponentOps(reqCtx.Ctx, cli, opsRes, func(lastConfig *appsv1alpha1.LastComponentConfiguration, comp *appsv1alpha1.ClusterComponentSpec) error {
if lastConfig.Replicas == nil {
return nil
Expand Down
Loading

0 comments on commit f91a76c

Please sign in to comment.