forked from RedHatQE/openshift-python-wrapper
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add resources ReclaimSpaceCronJob and ReclaimSpaceJob (RedHatQE#1805)
* support token creation for service account * updated doc strings etc. * address review comments * Add reclaimspacecronjob and reclaimspacejob resources * updates to address review comments * address tox failure * updates based on review comments * Update ocp_resources/reclaim_space_cron_job.py Co-authored-by: Ruth Netser <[email protected]> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * updates based on reviews * fix precommit failure and wrong argument naming * minor update * fix indentation * fix indentation * Based on review comment make all args optional --------- Co-authored-by: Ruth Netser <[email protected]> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
- Loading branch information
1 parent
c167ff8
commit 1397696
Showing
3 changed files
with
104 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
from __future__ import annotations | ||
from typing import Any, Dict, Optional | ||
from ocp_resources.resource import NamespacedResource, MissingRequiredArgumentError | ||
|
||
|
||
class ReclaimSpaceCronJob(NamespacedResource): | ||
""" | ||
https://github.com/csi-addons/kubernetes-csi-addons/blob/main/apis/csiaddons/v1alpha1/reclaimspacecronjob_types.go | ||
""" | ||
|
||
api_group = NamespacedResource.ApiGroup.CSIADDONS_OPENSHIFT_IO | ||
|
||
def __init__( | ||
self, | ||
schedule: Optional[str] = "", | ||
job_template: Optional[Dict[str, Any]] = None, | ||
concurrency_policy: Optional[str] = "", | ||
successful_jobs_history_limit: Optional[int] = None, | ||
failed_jobs_history_limit: Optional[int] = None, | ||
**kwargs: Any, | ||
) -> None: | ||
""" | ||
Args: | ||
schedule (Optional, str): schedule of the reclaim space cron job | ||
job_template (Optional, dict): describes the reclaim space job that would be created when a reclaim space cronjob | ||
would be executed | ||
Example: https://github.com/csi-addons/kubernetes-csi-addons/blob/main/docs/reclaimspace.md | ||
concurrency_policy (str, optional): indicates how to treat concurrent execution of a job | ||
successful_jobs_history_limit (int, optional): number of successful jobs to retain | ||
failed_jobs_history_limit (int, optional): number of failed jobs to retain start at scheduled time | ||
""" | ||
super().__init__( | ||
**kwargs, | ||
) | ||
self.job_template = job_template | ||
self.schedule = schedule | ||
self.concurrency_policy = concurrency_policy | ||
self.successful_jobs_history_limit = successful_jobs_history_limit | ||
self.failed_jobs_history_limit = failed_jobs_history_limit | ||
|
||
def to_dict(self) -> None: | ||
super().to_dict() | ||
if not self.yaml_file: | ||
if not (self.job_template and self.schedule): | ||
raise MissingRequiredArgumentError(argument="'job_template' and 'schedule'") | ||
self.res["spec"] = {"jobTemplate": self.job_template, "schedule": self.schedule} | ||
spec_dict = self.res["spec"] | ||
if self.successful_jobs_history_limit: | ||
spec_dict["successfulJobsHistoryLimit"] = self.successful_jobs_history_limit | ||
if self.failed_jobs_history_limit: | ||
spec_dict["failedJobsHistoryLimit"] = self.failed_jobs_history_limit | ||
if self.concurrency_policy: | ||
spec_dict["concurrencyPolicy"] = self.concurrency_policy |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
from __future__ import annotations | ||
from typing import Any, Optional | ||
from ocp_resources.resource import NamespacedResource, MissingRequiredArgumentError | ||
|
||
|
||
class ReclaimSpaceJob(NamespacedResource): | ||
""" | ||
https://github.com/csi-addons/kubernetes-csi-addons/blob/main/apis/csiaddons/v1alpha1/reclaimspacejob_types.go | ||
""" | ||
|
||
api_group = NamespacedResource.ApiGroup.CSIADDONS_OPENSHIFT_IO | ||
|
||
def __init__( | ||
self, | ||
backoff_limit: Optional[int] = None, | ||
target: Optional[dict[str, Any]] = None, | ||
retry_deadline_seconds: Optional[int] = None, | ||
timeout_seconds_reclaim_job: int | None = None, | ||
**kwargs: Any, | ||
) -> None: | ||
""" | ||
Args: | ||
backoff_limit (int, Optional): The number of retries for a reclaim space job. | ||
target (Optional, dict): Volume target on which the operation would be performed. | ||
retryDeadlineSeconds (int, Optional): Duration in seconds relative to the start time that the | ||
operation may be retried. | ||
timeout_seconds_reclaim_job (int, Optional): Specifies the timeout in seconds for the grpc request sent to | ||
the CSI driver. | ||
""" | ||
super().__init__( | ||
**kwargs, | ||
) | ||
self.backoff_limit = backoff_limit | ||
self.target = target | ||
self.retry_deadline_seconds = retry_deadline_seconds | ||
self.timeout_seconds_reclaim_job = timeout_seconds_reclaim_job | ||
|
||
def to_dict(self) -> None: | ||
super().to_dict() | ||
if not self.yaml_file: | ||
if not self.target: | ||
raise MissingRequiredArgumentError(argument="target") | ||
self.res["spec"] = {"target": self.target} | ||
spec_dict = self.res["spec"] | ||
if self.retry_deadline_seconds: | ||
spec_dict["retryDeadlineSeconds"] = self.retry_deadline_seconds | ||
if self.timeout_seconds_reclaim_job: | ||
spec_dict["timeout"] = self.timeout_seconds_reclaim_job | ||
if self.backoff_limit: | ||
spec_dict["backOffLimit"] = self.backoff_limit |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters