forked from RedHatQE/openshift-python-wrapper
-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_source.py
61 lines (51 loc) · 2.04 KB
/
data_source.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from warnings import warn
from kubernetes.dynamic.exceptions import ResourceNotFoundError
from ocp_resources.persistent_volume_claim import PersistentVolumeClaim
from ocp_resources.resource import MissingRequiredArgumentError, NamespacedResource
from ocp_resources.volume_snapshot import VolumeSnapshot
class DataSource(NamespacedResource):
"""
DataSource object.
https://kubevirt.io/cdi-api-reference/main/definitions.html#_v1beta1_datasource
"""
api_group = NamespacedResource.ApiGroup.CDI_KUBEVIRT_IO
def __init__(self, source=None, **kwargs):
"""
Args:
source (dict): The source of the data.
"""
super().__init__(**kwargs)
self._source = source
def to_dict(self) -> None:
super().to_dict()
if not self.yaml_file:
if not self._source:
raise MissingRequiredArgumentError(argument="source")
self.res.update({"spec": {"source": self._source}})
@property
def pvc(self):
warn("pvc will be deprecated in v4.16, Use source instead", DeprecationWarning, stacklevel=2)
data_source_pvc = self.instance.spec.source.pvc
pvc_name = data_source_pvc.name
pvc_namespace = data_source_pvc.namespace
try:
return PersistentVolumeClaim(
client=self.client,
name=pvc_name,
namespace=pvc_namespace,
)
except ResourceNotFoundError:
self.logger.warning(
f"dataSource {self.name} is pointing to a non-existing PVC, name:"
f" {pvc_name}, namespace: {pvc_namespace}"
)
@property
def source(self):
_instance_source = self.instance.spec.source
_source = [*_instance_source][0][0]
_source_mapping = {"pvc": PersistentVolumeClaim, "snapshot": VolumeSnapshot}
return _source_mapping[_source](
client=self.client,
name=_instance_source[_source].name,
namespace=_instance_source[_source].namespace,
)