Refactoring of k8s manager and tests
Changes:
- Official kubernetes python lib
- Rewrite k8s api wrapper in OOP manner
- Use api where its possible instead of cli
- Remove virtlet code because its can be replaced with pod api
- Remove unused/oudated manager code
- Remove bug workaround in k8s upgrade template
- Remove netchecker obsolete code
- Remove unfinished test_rbd_flexvolume_driver
Change-Id: I446a240123282196a6ba54f588aea84791f175ba
Related-PROD: PROD-21700
diff --git a/tcp_tests/managers/k8s/__init__.py b/tcp_tests/managers/k8s/__init__.py
index bd76aa7..9a5c573 100644
--- a/tcp_tests/managers/k8s/__init__.py
+++ b/tcp_tests/managers/k8s/__init__.py
@@ -11,10 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
-import urllib3
-
from tcp_tests.managers.k8s.cluster import K8sCluster
+from tcp_tests.managers.k8s.base import read_yaml_file, read_yaml_str
-__all__ = ['K8sCluster']
-
-urllib3.disable_warnings() # Supress https insecure warning
+__all__ = ['K8sCluster', 'read_yaml_file', 'read_yaml_str', 'read_yaml_url']
diff --git a/tcp_tests/managers/k8s/base.py b/tcp_tests/managers/k8s/base.py
index 7adb41a..6ca6a88 100644
--- a/tcp_tests/managers/k8s/base.py
+++ b/tcp_tests/managers/k8s/base.py
@@ -12,97 +12,140 @@
# License for the specific language governing permissions and limitations
-class K8sBaseResource(object):
- """docstring for K8sBaseResource"""
+import yaml
+import requests
+import os
- def __init__(self, manager, data):
+from tcp_tests import logger
+
+LOG = logger.logger
+
+
+class K8sBaseResource(object):
+ resource_type = None
+
+ def __init__(self, manager, name=None, namespace=None, data=None):
self._manager = manager
- self._add_details(data)
+ self._name = name
+ self._namespace = namespace
+ self._read_cache = None
+ if data is not None:
+ self._update_cache(data)
def __repr__(self):
- reprkeys = sorted(k
- for k in self.__dict__.keys()
- if k[0] != '_' and
- k not in ['manager'])
- info = ", ".join("%s=%s" % (k, getattr(self, k)) for k in reprkeys)
- return "<%s %s>" % (self.__class__.__name__, info)
+ uid = 'unknown-uid'
+ if self._read_cache is not None:
+ uid = self.uid
+ return "<{0} name='{1}' namespace='{2}' uuid='{3}'>".format(
+ self.__class__.__name__, self.name, self.namespace, uid)
@property
- def api_version(self):
- return self._data.api_version
-
- def _add_details(self, data):
- self._data = data
- for k in [k for k in dir(data)
- if not any((k.startswith('_'), k in ('to_dict', 'to_str')))]:
- try:
- setattr(self, k, getattr(data, k))
- except AttributeError:
- # In this case we already defined the attribute on the class
- pass
-
- def __eq__(self, other):
- if not isinstance(other, K8sBaseResource):
- return NotImplemented
- # two resources of different types are not equal
- if not isinstance(other, self.__class__):
- return False
- return self._info == other._info
-
-
-class K8sBaseManager(object):
-
- resource_class = None
-
- def __init__(self, api, namespace):
- self._api = api
- self._namespace = namespace
- self._raw = None
-
- @property
- def api(self):
- return self._api
+ def name(self):
+ return self._name
@property
def namespace(self):
return self._namespace
- def get(self, *args, **kwargs):
- if not hasattr(self, '_get'):
- raise NotImplementedError(
- '{} does not have {}'.format(self, '_get'))
+ @property
+ def uid(self):
+ return self.read(cached=True).metadata.uid
- return self.resource_class(self, self._get(*args, **kwargs))
+ def _update_cache(self, data):
+ self._read_cache = data
+ self._namespace = data.metadata.namespace
+ self._name = data.metadata.name
- def list(self, *args, **kwargs):
- if not hasattr(self, '_list'):
- raise NotImplementedError(
- '{} does not have {}'.format(self, '_list'))
+ def read(self, cached=False, **kwargs):
+ if not cached:
+ self._update_cache(self._read(**kwargs))
+ return self._read_cache
- lst = self._list(*args, **kwargs)
+ def create(self, body, **kwargs):
+ LOG.info("K8S API Creating {0} with body:\n{1}".format(
+ self.resource_type, body))
- return [self.resource_class(self, item) for item in lst.items]
+ self._update_cache(self._create(body, **kwargs))
+ return self
- def create(self, *args, **kwargs):
- if not hasattr(self, '_create'):
- raise NotImplementedError(
- '{} does not have {}'.format(self, '_create'))
- return self.resource_class(self, self._create(*args, **kwargs))
+ def patch(self, body, **kwargs):
- def replace(self, *args, **kwargs):
- if not hasattr(self, '_replace'):
- raise NotImplementedError(
- '{} does not have {}'.format(self, '_replace'))
- return self._replace(*args, **kwargs)
+ LOG.info("K8S API Patching {0} name={1} ns={2} with body:\n{3}".format(
+ self.resource_type, self.name, self.namespace, body))
- def delete(self, *args, **kwargs):
- if not hasattr(self, '_delete'):
- raise NotImplementedError(
- '{} does not have {}'.format(self, '_delete'))
- return self._delete(*args, **kwargs)
+ self._update_cache(self._patch(body, **kwargs))
+ return self
- def deletecollection(self, *args, **kwargs):
- if not hasattr(self, '_deletecollection'):
- raise NotImplementedError(
- '{} does not have {}'.format(self, '_deletecollection'))
- return self._deletecollection(*args, **kwargs)
+ def replace(self, body, **kwargs):
+ self._update_cache(self._replace(body, **kwargs))
+ return self
+
+ def delete(self, **kwargs):
+ self._delete(**kwargs)
+ return self
+
+ def __eq__(self, other):
+ if not isinstance(other, K8sBaseResource):
+ return NotImplemented
+
+ if not isinstance(other, self.__class__):
+ return False
+
+ return self.uid == other.uid
+
+
+class K8sBaseManager(object):
+ resource_class = None
+
+ def __init__(self, cluster):
+ self._cluster = cluster
+
+ @property
+ def resource_type(self):
+ return self.resource_class.resource_type
+
+ def get(self, name=None, namespace=None, data=None):
+ namespace = namespace or self._cluster.default_namespace
+ return self.resource_class(self, name, namespace, data)
+
+ def create(self, name=None, namespace=None, body=None, **kwargs):
+ return self.get(name=name, namespace=namespace).create(body, **kwargs)
+
+ def __resource_from_data(self, data):
+ return self.resource_class(self, data=data)
+
+ def __list_filter(self, items, name_prefix=None):
+ if name_prefix is not None:
+ items = [item for item in items if
+ item.metadata.name.startswith(name_prefix)]
+ return items
+
+ def __list_to_resource(self, items):
+ return [self.__resource_from_data(item) for item in items]
+
+ def list(self, namespace=None, name_prefix=None, **kwargs):
+ namespace = namespace or self._cluster.default_namespace
+ items = self._list(namespace=namespace, **kwargs).items
+ items = self.__list_filter(items, name_prefix=name_prefix)
+ return self.__list_to_resource(items)
+
+ def list_all(self, name_prefix=None, **kwargs):
+ items = self._list_all(**kwargs).items
+ items = self.__list_filter(items, name_prefix=name_prefix)
+ return self.__list_to_resource(items)
+
+
+def read_yaml_str(yaml_str):
+ """ load yaml from string helper """
+ return yaml.safe_load(yaml_str)
+
+
+def read_yaml_file(file_path, *args):
+ """ load yaml from joined file_path and *args helper """
+ with open(os.path.join(file_path, *args)) as f:
+ return yaml.safe_load(f)
+
+
+def read_yaml_url(yaml_file_url):
+ """ load yaml from url helper """
+ return yaml.safe_load(requests.get(yaml_file_url).text)
diff --git a/tcp_tests/managers/k8s/cluster.py b/tcp_tests/managers/k8s/cluster.py
index 4bda03f..8ffb4d1 100644
--- a/tcp_tests/managers/k8s/cluster.py
+++ b/tcp_tests/managers/k8s/cluster.py
@@ -12,19 +12,14 @@
# License for the specific language governing permissions and limitations
-import base64
-import ssl
-
-from k8sclient.client import api_client
-from k8sclient.client.apis import apiv_api
-from k8sclient.client.apis import apisextensionsvbeta_api
-from k8sclient.client.apis import apisbatchv_api
+import kubernetes
+from kubernetes import client
from tcp_tests.managers.k8s.componentstatuses import \
K8sComponentStatusManager
from tcp_tests.managers.k8s.daemonsets import K8sDaemonSetManager
from tcp_tests.managers.k8s.deployments import K8sDeploymentManager
-from tcp_tests.managers.k8s.endpoints import K8sEndpointManager
+from tcp_tests.managers.k8s.endpoints import K8sEndpointsManager
from tcp_tests.managers.k8s.events import K8sEventManager
from tcp_tests.managers.k8s.horizontalpodautoscalers import \
K8sHorizontalPodAutoscalerManager
@@ -46,65 +41,73 @@
K8sServiceAccountManager
from tcp_tests.managers.k8s.services import K8sServiceManager
from tcp_tests.managers.k8s.replicasets import K8sReplicaSetManager
+from tcp_tests.managers.k8s.networkpolicies import K8sNetworkPolicyManager
class K8sCluster(object):
- """docstring for K8sCluster"""
-
- def __init__(self, schema="https", user=None, password=None,
+ def __init__(self, schema="https", user=None, password=None, ca=None,
host='localhost', port='443', default_namespace='default'):
- if user and password:
- auth_string = '%s:%s' % (user, password)
- auth = base64.encodestring(auth_string.encode()).decode()[:-1]
- auth = "Basic {}".format(auth)
- self._client = api_client.ApiClient(
- '{schema}://{host}:{port}'.format(
- schema=schema, host=host, port=port))
- self._client.set_default_header('Authorization', auth)
- restcli_impl = self._client.RESTClient.IMPL
- restcli_impl.ssl_pool_manager.connection_pool_kw['cert_reqs'] = \
- ssl.CERT_NONE
+ self.default_namespace = default_namespace
- else:
- self._client = api_client.ApiClient(
- '{schema}://{host}:{port}'.format(
- schema=schema, host=host, port=port))
- self._api = apiv_api.ApivApi(self._client)
- self._bapi = apisbatchv_api.ApisbatchvApi(self._client)
- self._eapi = apisextensionsvbeta_api.ApisextensionsvbetaApi(
- self._client)
- self._default_namespace = default_namespace
+ api_server = '{0}://{1}:{2}'.format(schema, host, port)
- self.nodes = K8sNodeManager(self._api, self._default_namespace)
- self.pods = K8sPodManager(self._api, self._default_namespace)
- self.endpoints = K8sEndpointManager(self._api, self._default_namespace)
- self.namespaces = K8sNamespaceManager(self._api,
- self._default_namespace)
- self.services = K8sServiceManager(self._api, self._default_namespace)
- self.serviceaccounts = K8sServiceAccountManager(
- self._api, self._default_namespace)
- self.secrets = K8sSecretManager(self._api, self._default_namespace)
- self.events = K8sEventManager(self._api, self._default_namespace)
- self.limitranges = K8sLimitRangeManager(self._api,
- self._default_namespace)
- self.jobs = K8sJobManager(self._bapi, self._default_namespace)
- self.daemonsets = K8sDaemonSetManager(self._eapi,
- self._default_namespace)
- self.ingresses = K8sIngressManager(self._eapi, self._default_namespace)
- self.deployments = K8sDeploymentManager(self._eapi,
- self._default_namespace)
- self.horizontalpodautoscalers = K8sHorizontalPodAutoscalerManager(
- self._eapi, self._default_namespace)
- self.componentstatuses = K8sComponentStatusManager(
- self._api, self._default_namespace)
- self.resourcequotas = K8sResourceQuotaManager(
- self._api, self._default_namespace)
- self.replicationcontrollers = K8sReplicationControllerManager(
- self._api, self._default_namespace)
- self.pvolumeclaims = K8sPersistentVolumeClaimManager(
- self._api, self._default_namespace)
- self.pvolumes = K8sPersistentVolumeManager(
- self._api, self._default_namespace)
- self.replicasets = K8sReplicaSetManager(
- self._eapi, self._default_namespace
- )
+ config_data = {
+ 'apiVersion': 'v1',
+ 'kind': 'Config',
+ 'preferences': {},
+ 'current-context': 'cluster-remote',
+ 'clusters': [{
+ 'name': 'cluster',
+ 'cluster': {
+ 'server': api_server,
+ 'certificate-authority-data': ca,
+ },
+ }],
+ 'users': [{
+ 'name': 'remote',
+ 'user': {
+ 'password': password,
+ 'username': user,
+ },
+ }],
+ 'contexts': [{
+ 'name': 'cluster-remote',
+ 'context': {
+ 'cluster': 'cluster',
+ 'user': 'remote',
+ },
+ }],
+ }
+
+ configuration = type.__call__(client.Configuration)
+ loader = kubernetes.config.kube_config.KubeConfigLoader(config_data)
+ loader.load_and_set(configuration)
+ api_client = client.ApiClient(configuration=configuration)
+
+ self.api_core = client.CoreV1Api(api_client)
+ self.api_apps = client.AppsV1Api(api_client)
+ self.api_extensions = client.ExtensionsV1beta1Api(api_client)
+ self.api_autoscaling = client.AutoscalingV1Api(api_client)
+ self.api_batch = client.BatchV1Api(api_client)
+
+ self.nodes = K8sNodeManager(self)
+ self.pods = K8sPodManager(self)
+ self.endpoints = K8sEndpointsManager(self)
+ self.namespaces = K8sNamespaceManager(self)
+ self.services = K8sServiceManager(self)
+ self.serviceaccounts = K8sServiceAccountManager(self)
+ self.secrets = K8sSecretManager(self)
+ self.events = K8sEventManager(self)
+ self.limitranges = K8sLimitRangeManager(self)
+ self.jobs = K8sJobManager(self)
+ self.daemonsets = K8sDaemonSetManager(self)
+ self.ingresses = K8sIngressManager(self)
+ self.deployments = K8sDeploymentManager(self)
+ self.horizontalpodautoscalers = K8sHorizontalPodAutoscalerManager(self)
+ self.componentstatuses = K8sComponentStatusManager(self)
+ self.resourcequotas = K8sResourceQuotaManager(self)
+ self.replicationcontrollers = K8sReplicationControllerManager(self)
+ self.pvolumeclaims = K8sPersistentVolumeClaimManager(self)
+ self.pvolumes = K8sPersistentVolumeManager(self)
+ self.replicasets = K8sReplicaSetManager(self)
+ self.networkpolicies = K8sNetworkPolicyManager(self)
diff --git a/tcp_tests/managers/k8s/componentstatuses.py b/tcp_tests/managers/k8s/componentstatuses.py
index a991576..7a9c27a 100644
--- a/tcp_tests/managers/k8s/componentstatuses.py
+++ b/tcp_tests/managers/k8s/componentstatuses.py
@@ -17,23 +17,21 @@
class K8sComponentStatus(K8sBaseResource):
- """docstring for K8sComponentStatus"""
+ resource_type = 'componentstatus'
- def __repr__(self):
- return "<K8sComponentStatus: %s>" % self.name
-
- @property
- def name(self):
- return self.metadata.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_component_status(self.name, **kwargs)
class K8sComponentStatusManager(K8sBaseManager):
- """docstring for ClassName"""
-
resource_class = K8sComponentStatus
- def _get(self, name, **kwargs):
- return self.api.read_namespaced_component_status(name=name, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_core
- def _list(self, **kwargs):
- return self.api.list_namespaced_component_status(**kwargs)
+ def _list(self, namespace, **kwargs):
+ return self.api.list_component_status(**kwargs)
+
+ def _list_all(self, **kwargs):
+ return self._list(None, **kwargs)
diff --git a/tcp_tests/managers/k8s/daemonsets.py b/tcp_tests/managers/k8s/daemonsets.py
index dc2c5f7..15e6076 100644
--- a/tcp_tests/managers/k8s/daemonsets.py
+++ b/tcp_tests/managers/k8s/daemonsets.py
@@ -12,68 +12,45 @@
# License for the specific language governing permissions and limitations
+from kubernetes import client
+
from tcp_tests.managers.k8s.base import K8sBaseResource
from tcp_tests.managers.k8s.base import K8sBaseManager
class K8sDaemonSet(K8sBaseResource):
- """docstring for K8sDaemonSet"""
+ resource_type = 'daemonset'
- def __repr__(self):
- return "<K8sDaemonSet: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_namespaced_daemon_set(
+ self.name, self.namespace, **kwargs)
- @property
- def name(self):
- return self.metadata.name
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_namespaced_daemon_set(
+ self.namespace, body, **kwargs)
- @property
- def namespace(self):
- return self.metadata.namespace
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_namespaced_daemon_set(
+ self.name, self.namespace, body, **kwargs)
+
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_namespaced_daemon_set(
+ self.name, self.namespace, body, **kwargs)
+
+ def _delete(self, **kwargs):
+ self._manager.api.delete_namespaced_daemon_set(
+ self.name, self.namespace, client.V1DeleteOptions(), **kwargs)
class K8sDaemonSetManager(K8sBaseManager):
- """docstring for ClassName"""
-
resource_class = K8sDaemonSet
- def _get(self, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.read_namespaced_daemon_set(
- name=name, namespace=namespace, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_apps
- def _list(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.list_namespaced_daemon_set(
- namespace=namespace, **kwargs)
+ def _list(self, namespace, **kwargs):
+ return self.api.list_namespaced_daemon_set(namespace, **kwargs)
- def _full_list(self, **kwargs):
- return self.api.list_daemon_set(**kwargs)
-
- def _create(self, body, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.create_namespaced_daemon_set(
- body=body, namespace=namespace, **kwargs)
-
- def _replace(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.replace_namespaced_daemon_set(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _delete(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.delete_namespaced_daemon_set(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _deletecollection(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.deletecollection_namespaced_daemon_set(
- namespace=namespace, **kwargs)
-
- def full_list(self, *args, **kwargs):
- lst = self._full_list(*args, **kwargs)
- return [self.resource_class(self, item) for item in lst.items]
-
- def update(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.patch_namespaced_daemon_set(
- body=body, name=name, namespace=namespace, **kwargs)
+ def _list_all(self, **kwargs):
+ return self.api.list_daemon_set_for_all_namespaces(**kwargs)
diff --git a/tcp_tests/managers/k8s/deployments.py b/tcp_tests/managers/k8s/deployments.py
index 5d47d70..3894ac4 100644
--- a/tcp_tests/managers/k8s/deployments.py
+++ b/tcp_tests/managers/k8s/deployments.py
@@ -12,63 +12,56 @@
# License for the specific language governing permissions and limitations
+from kubernetes import client
+
+from devops.helpers import helpers
+
from tcp_tests.managers.k8s.base import K8sBaseResource
from tcp_tests.managers.k8s.base import K8sBaseManager
class K8sDeployment(K8sBaseResource):
- """docstring for K8sDeployment"""
+ resource_type = 'deployment'
- def __repr__(self):
- return "<K8sDeployment: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_namespaced_deployment(
+ self.name, self.namespace, **kwargs)
- @property
- def name(self):
- return self.metadata.name
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_namespaced_deployment(
+ self.namespace, body, **kwargs)
- @property
- def namespace(self):
- return self.metadata.namespace
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_namespaced_deployment(
+ self.name, self.namespace, body, **kwargs)
+
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_namespaced_deployment(
+ self.name, self.namespace, body, **kwargs)
+
+ def _delete(self, **kwargs):
+ self._manager.api.delete_namespaced_deployment(
+ self.name, self.namespace, client.V1DeleteOptions(), **kwargs)
+
+ def is_ready(self):
+ dep = self.read()
+ return dep.status.available_replicas == dep.status.replicas
+
+ def wait_ready(self, timeout=120, interval=5):
+ helpers.wait(lambda: self.is_ready(),
+ timeout=timeout, interval=interval)
+ return self
class K8sDeploymentManager(K8sBaseManager):
- """docstring for ClassName"""
-
resource_class = K8sDeployment
- def _get(self, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.read_namespaced_deployment(
- name=name, namespace=namespace, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_apps
- def _list(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.list_namespaced_deployment(
- namespace=namespace, **kwargs)
+ def _list(self, namespace, **kwargs):
+ return self.api.list_namespaced_deployment(namespace, **kwargs)
- def _full_list(self, **kwargs):
- return self.api.list_deployment(**kwargs)
-
- def _create(self, body, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.create_namespaced_deployment(
- body=body, namespace=namespace, **kwargs)
-
- def _replace(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.replace_namespaced_deployment(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _delete(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.delete_namespaced_deployment(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _deletecollection(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.deletecollection_namespaced_deployment(
- namespace=namespace, **kwargs)
-
- def full_list(self, *args, **kwargs):
- lst = self._full_list(*args, **kwargs)
- return [self.resource_class(self, item) for item in lst.items]
+ def _list_all(self, **kwargs):
+ return self.api.list_deployment_for_all_namespaces(**kwargs)
diff --git a/tcp_tests/managers/k8s/endpoints.py b/tcp_tests/managers/k8s/endpoints.py
index ed1066e..0ddb8ae 100644
--- a/tcp_tests/managers/k8s/endpoints.py
+++ b/tcp_tests/managers/k8s/endpoints.py
@@ -12,59 +12,45 @@
# License for the specific language governing permissions and limitations
+from kubernetes import client
+
from tcp_tests.managers.k8s.base import K8sBaseResource
from tcp_tests.managers.k8s.base import K8sBaseManager
-class K8sEndpoint(K8sBaseResource):
- """docstring for K8sEndpoint"""
+class K8sEndpoints(K8sBaseResource):
+ resource_type = 'endpoints'
- def __repr__(self):
- return "<K8sEndpoint: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_namespaced_endpoints(
+ self.name, self.namespace, **kwargs)
+
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_namespaced_endpoints(
+ self.namespace, body, **kwargs)
+
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_namespaced_endpoints(
+ self.name, self.namespace, body, **kwargs)
+
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_namespaced_endpoints(
+ self.name, self.namespace, body, **kwargs)
+
+ def _delete(self, **kwargs):
+ self._manager.api.delete_namespaced_endpoints(
+ self.name, self.namespace, client.V1DeleteOptions(), **kwargs)
+
+
+class K8sEndpointsManager(K8sBaseManager):
+ resource_class = K8sEndpoints
@property
- def name(self):
- return self.metadata.name
+ def api(self):
+ return self._cluster.api_core
+ def _list(self, namespace, **kwargs):
+ return self.api.list_namespaced_endpoints(namespace, **kwargs)
-class K8sEndpointManager(K8sBaseManager):
- """docstring for ClassName"""
-
- resource_class = K8sEndpoint
-
- def _get(self, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.read_namespaced_endpoints(
- name=name, namespace=namespace, **kwargs)
-
- def _list(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.list_namespaced_endpoints(
- namespace=namespace, **kwargs)
-
- def _full_list(self, **kwargs):
- return self.api.list_endpoints(**kwargs)
-
- def _create(self, body, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.create_namespaced_endpoints(
- body=body, namespace=namespace, **kwargs)
-
- def _replace(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.replace_namespaced_endpoints(
- body=body, name=body, namespace=namespace, **kwargs)
-
- def _delete(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.delete_namespaced_endpoints(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _deletecollection(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.deletecollection_namespaced_endpoints(
- namespace=namespace, **kwargs)
-
- def full_list(self, *args, **kwargs):
- lst = self._full_list(*args, **kwargs)
- return [self.resource_class(self, item) for item in lst.items]
+ def _list_all(self, **kwargs):
+ return self.api.list_endpoints_for_all_namespaces(**kwargs)
diff --git a/tcp_tests/managers/k8s/events.py b/tcp_tests/managers/k8s/events.py
index 099e9e4..a0d865a 100644
--- a/tcp_tests/managers/k8s/events.py
+++ b/tcp_tests/managers/k8s/events.py
@@ -12,58 +12,45 @@
# License for the specific language governing permissions and limitations
+from kubernetes import client
+
from tcp_tests.managers.k8s.base import K8sBaseResource
from tcp_tests.managers.k8s.base import K8sBaseManager
class K8sEvent(K8sBaseResource):
- """docstring for K8sEvent"""
+ resource_type = 'event'
- def __repr__(self):
- return "<K8sEvent: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_namespaced_event(
+ self.name, self.namespace, **kwargs)
- @property
- def name(self):
- return self.metadata.name
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_namespaced_event(
+ self.namespace, body, **kwargs)
+
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_namespaced_event(
+ self.name, self.namespace, body, **kwargs)
+
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_namespaced_event(
+ self.name, self.namespace, body, **kwargs)
+
+ def _delete(self, **kwargs):
+ self._manager.api.delete_namespaced_event(
+ self.name, self.namespace, client.V1DeleteOptions(), **kwargs)
class K8sEventManager(K8sBaseManager):
- """docstring for ClassName"""
-
resource_class = K8sEvent
- def _get(self, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.read_namespaced_event(
- name=name, namespace=namespace, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_core
- def _list(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.list_namespaced_event(namespace=namespace, **kwargs)
+ def _list(self, namespace, **kwargs):
+ return self.api.list_namespaced_event(namespace, **kwargs)
- def _full_list(self, **kwargs):
- return self.api.list_event(**kwargs)
-
- def _create(self, body, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.create_namespaced_event(
- body=body, namespace=namespace, **kwargs)
-
- def _replace(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.replace_namespaced_event(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _delete(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.delete_namespaced_event(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _deletecollection(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.deletecollection_namespaced_event(
- namespace=namespace, **kwargs)
-
- def full_list(self, *args, **kwargs):
- lst = self._full_list(*args, **kwargs)
- return [self.resource_class(self, item) for item in lst.items]
+ def _list_all(self, **kwargs):
+ return self.api.list_event_for_all_namespaces(**kwargs)
diff --git a/tcp_tests/managers/k8s/horizontalpodautoscalers.py b/tcp_tests/managers/k8s/horizontalpodautoscalers.py
index 6ce78e7..b7842ac 100644
--- a/tcp_tests/managers/k8s/horizontalpodautoscalers.py
+++ b/tcp_tests/managers/k8s/horizontalpodautoscalers.py
@@ -12,59 +12,47 @@
# License for the specific language governing permissions and limitations
+from kubernetes import client
+
from tcp_tests.managers.k8s.base import K8sBaseResource
from tcp_tests.managers.k8s.base import K8sBaseManager
class K8sHorizontalPodAutoscaler(K8sBaseResource):
- """docstring for K8sHorizontalPodAutoscaler"""
+ resource_type = 'horizontalpodautoscaler'
- def __repr__(self):
- return "<K8sHorizontalPodAutoscaler: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_namespaced_horizontal_pod_autoscaler(
+ self.name, self.namespace, **kwargs)
- @property
- def name(self):
- return self.metadata.name
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_namespaced_horizontal_pod_autoscaler(
+ self.namespace, body, **kwargs)
+
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_namespaced_horizontal_pod_autoscaler(
+ self.name, self.namespace, body, **kwargs)
+
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_namespaced_horizontal_pod_autoscaler(
+ self.name, self.namespace, body, **kwargs)
+
+ def _delete(self, **kwargs):
+ self._manager.api.delete_namespaced_horizontal_pod_autoscaler(
+ self.name, self.namespace, client.V1DeleteOptions(), **kwargs)
class K8sHorizontalPodAutoscalerManager(K8sBaseManager):
- """docstring for ClassName"""
-
resource_class = K8sHorizontalPodAutoscaler
- def _get(self, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.read_namespaced_horizontal_pod_autoscaler(
- name=name, namespace=namespace, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_autoscaling
- def _list(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
+ def _list(self, namespace, **kwargs):
return self.api.list_namespaced_horizontal_pod_autoscaler(
- namespace=namespace, **kwargs)
+ namespace, **kwargs)
- def _full_list(self, **kwargs):
- return self.api.list_horizontal_pod_autoscaler(**kwargs)
-
- def _create(self, body, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.create_namespaced_horizontal_pod_autoscaler(
- body=body, namespace=namespace, **kwargs)
-
- def _replace(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.replace_namespaced_horizontal_pod_autoscaler(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _delete(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.delete_namespaced_horizontal_pod_autoscaler(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _deletecollection(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.deletecollection_namespaced_horizontal_pod_autoscaler(
- namespace=namespace, **kwargs)
-
- def full_list(self, *args, **kwargs):
- lst = self._full_list(*args, **kwargs)
- return [self.resource_class(self, item) for item in lst.items]
+ def _list_all(self, **kwargs):
+ return self.api.list_horizontal_pod_autoscaler_for_all_namespaces(
+ **kwargs)
diff --git a/tcp_tests/managers/k8s/ingresses.py b/tcp_tests/managers/k8s/ingresses.py
index 81b240e..906dc31 100644
--- a/tcp_tests/managers/k8s/ingresses.py
+++ b/tcp_tests/managers/k8s/ingresses.py
@@ -12,59 +12,45 @@
# License for the specific language governing permissions and limitations
+from kubernetes import client
+
from tcp_tests.managers.k8s.base import K8sBaseResource
from tcp_tests.managers.k8s.base import K8sBaseManager
class K8sIngress(K8sBaseResource):
- """docstring for K8sIngress"""
+ resource_type = 'ingress'
- def __repr__(self):
- return "<K8sIngress: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_namespaced_ingress(
+ self.name, self.namespace, **kwargs)
- @property
- def name(self):
- return self.metadata.name
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_namespaced_ingress(
+ self.namespace, body, **kwargs)
+
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_namespaced_ingress(
+ self.name, self.namespace, body, **kwargs)
+
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_namespaced_ingress(
+ self.name, self.namespace, body, **kwargs)
+
+ def _delete(self, **kwargs):
+ self._manager.api.delete_namespaced_ingress(
+ self.name, self.namespace, client.V1DeleteOptions(), **kwargs)
class K8sIngressManager(K8sBaseManager):
- """docstring for ClassName"""
-
resource_class = K8sIngress
- def _get(self, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.read_namespaced_ingress(
- name=name, namespace=namespace, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_extensions
- def _list(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.list_namespaced_ingress(
- namespace=namespace, **kwargs)
+ def _list(self, namespace, **kwargs):
+ return self.api.list_namespaced_ingress(namespace, **kwargs)
- def _full_list(self, **kwargs):
- return self.api.list_ingress(**kwargs)
-
- def _create(self, body, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.create_namespaced_ingress(
- body=body, namespace=namespace, **kwargs)
-
- def _replace(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.replace_namespaced_ingress(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _delete(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.delete_namespaced_ingress(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _deletecollection(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.deletecollection_namespaced_ingress(
- namespace=namespace, **kwargs)
-
- def full_list(self, *args, **kwargs):
- lst = self._full_list(*args, **kwargs)
- return [self.resource_class(self, item) for item in lst.items]
+ def _list_all(self, **kwargs):
+ return self.api.list_ingress_for_all_namespaces(**kwargs)
diff --git a/tcp_tests/managers/k8s/jobs.py b/tcp_tests/managers/k8s/jobs.py
index a2dbb81..740a662 100644
--- a/tcp_tests/managers/k8s/jobs.py
+++ b/tcp_tests/managers/k8s/jobs.py
@@ -12,58 +12,45 @@
# License for the specific language governing permissions and limitations
+from kubernetes import client
+
from tcp_tests.managers.k8s.base import K8sBaseResource
from tcp_tests.managers.k8s.base import K8sBaseManager
class K8sJob(K8sBaseResource):
- """docstring for K8sJob"""
+ resource_type = 'job'
- def __repr__(self):
- return "<K8sJob: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_namespaced_job(
+ self.name, self.namespace, **kwargs)
- @property
- def name(self):
- return self.metadata.name
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_namespaced_job(
+ self.namespace, body, **kwargs)
+
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_namespaced_job(
+ self.name, self.namespace, body, **kwargs)
+
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_namespaced_job(
+ self.name, self.namespace, body, **kwargs)
+
+ def _delete(self, **kwargs):
+ self._manager.api.delete_namespaced_job(
+ self.name, self.namespace, client.V1DeleteOptions(), **kwargs)
class K8sJobManager(K8sBaseManager):
- """docstring for ClassName"""
-
resource_class = K8sJob
- def _get(self, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.read_namespaced_job(
- name=name, namespace=namespace, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_batch
- def _list(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.list_namespaced_job(namespace=namespace, **kwargs)
+ def _list(self, namespace, **kwargs):
+ return self.api.list_namespaced_job(namespace, **kwargs)
- def _create(self, body, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.create_namespaced_job(
- body=body, namespace=namespace, **kwargs)
-
- def _replace(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.replace_namespaced_job(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _delete(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.delete_namespaced_job(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _deletecollection(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.deletecollection_namespaced_job(
- namespace=namespace, **kwargs)
-
- def full_list(self, *args, **kwargs):
- lst = self._full_list(*args, **kwargs)
- return [self.resource_class(self, item) for item in lst.items]
-
- def _full_list(self, **kwargs):
- return self.api.list_job(**kwargs)
+ def _list_all(self, **kwargs):
+ return self.api.list_job_for_all_namespaces(**kwargs)
diff --git a/tcp_tests/managers/k8s/limitranges.py b/tcp_tests/managers/k8s/limitranges.py
index 7136300..3a9b2de 100644
--- a/tcp_tests/managers/k8s/limitranges.py
+++ b/tcp_tests/managers/k8s/limitranges.py
@@ -12,52 +12,45 @@
# License for the specific language governing permissions and limitations
+from kubernetes import client
+
from tcp_tests.managers.k8s.base import K8sBaseResource
from tcp_tests.managers.k8s.base import K8sBaseManager
class K8sLimitRange(K8sBaseResource):
- """docstring for K8sLimitRange"""
+ resource_type = 'limitrange'
- def __repr__(self):
- return "<K8sLimitRange: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_namespaced_limit_range(
+ self.name, self.namespace, **kwargs)
- @property
- def name(self):
- return self.metadata.name
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_namespaced_limit_range(
+ self.namespace, body, **kwargs)
+
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_namespaced_limit_range(
+ self.name, self.namespace, body, **kwargs)
+
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_namespaced_limit_range(
+ self.name, self.namespace, body, **kwargs)
+
+ def _delete(self, **kwargs):
+ self._manager.api.delete_namespaced_limit_range(
+ self.name, self.namespace, client.V1DeleteOptions(), **kwargs)
class K8sLimitRangeManager(K8sBaseManager):
- """docstring for ClassName"""
-
resource_class = K8sLimitRange
- def _get(self, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.read_namespaced_limit_range(
- name=name, namespace=namespace, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_core
- def _list(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.list_namespaced_limit_range(
- namespace=namespace, **kwargs)
+ def _list(self, namespace, **kwargs):
+ return self.api.list_namespaced_limit_range(namespace, **kwargs)
- def _create(self, body, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.create_namespaced_limit_range(
- body=body, namespace=namespace, **kwargs)
-
- def _replace(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.replace_namespaced_limit_range(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _delete(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.delete_namespaced_limit_range(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _deletecollection(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.deletecollection_namespaced_limit_range(
- namespace=namespace, **kwargs)
+ def _list_all(self, **kwargs):
+ return self.api.list_limit_range_for_all_namespaces(**kwargs)
diff --git a/tcp_tests/managers/k8s/namespaces.py b/tcp_tests/managers/k8s/namespaces.py
index 8c36302..224ae33 100644
--- a/tcp_tests/managers/k8s/namespaces.py
+++ b/tcp_tests/managers/k8s/namespaces.py
@@ -12,40 +12,41 @@
# License for the specific language governing permissions and limitations
+from kubernetes import client
+
from tcp_tests.managers.k8s.base import K8sBaseResource
from tcp_tests.managers.k8s.base import K8sBaseManager
class K8sNamespace(K8sBaseResource):
- """docstring for ClassName"""
+ resource_type = 'namespace'
- def __repr__(self):
- return "<K8sNamespace: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_namespace(self.name, **kwargs)
- @property
- def name(self):
- return self.metadata.name
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_namespace(body, **kwargs)
+
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_namespace(self.name, body, **kwargs)
+
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_namespace(self.name, body, **kwargs)
+
+ def _delete(self, **kwargs):
+ self._manager.api.delete_namespace(
+ self.name, client.V1DeleteOptions(), **kwargs)
class K8sNamespaceManager(K8sBaseManager):
- """docstring for ClassName"""
-
resource_class = K8sNamespace
- def _get(self, name, **kwargs):
- return self.api.read_namespaced_namespace(name, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_core
- def _list(self, **kwargs):
- return self.api.list_namespaced_namespace(**kwargs)
+ def _list(self, namespace, **kwargs):
+ return self.api.list_namespace(**kwargs)
- def _create(self, body, **kwargs):
- return self.api.create_namespaced_namespace(body, **kwargs)
-
- def _replace(self, body, name, **kwargs):
- return self.api.replace_namespaced_namespace(body, name, **kwargs)
-
- def _delete(self, body, name, **kwargs):
- return self.api.delete_namespaced_namespace(body, name, **kwargs)
-
- def _deletecollection(self, **kwargs):
- return self.api.deletecollection_namespaced_namespace(**kwargs)
+ def _list_all(self, **kwargs):
+ return self._list(None, **kwargs)
diff --git a/tcp_tests/managers/k8s/networkpolicies.py b/tcp_tests/managers/k8s/networkpolicies.py
new file mode 100644
index 0000000..eb92a12
--- /dev/null
+++ b/tcp_tests/managers/k8s/networkpolicies.py
@@ -0,0 +1,56 @@
+# Copyright 2017 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+
+
+from kubernetes import client
+
+from tcp_tests.managers.k8s.base import K8sBaseResource
+from tcp_tests.managers.k8s.base import K8sBaseManager
+
+
+class K8sNetworkPolicy(K8sBaseResource):
+ resource_type = 'networkpolicy'
+
+ def _read(self, **kwargs):
+ return self._manager.api.read_namespaced_network_policy(
+ self.name, self.namespace, **kwargs)
+
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_namespaced_network_policy(
+ self.namespace, body, **kwargs)
+
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_namespaced_network_policy(
+ self.name, self.namespace, body, **kwargs)
+
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_namespaced_network_policy(
+ self.name, self.namespace, body, **kwargs)
+
+ def _delete(self, **kwargs):
+ self._manager.api.delete_namespaced_network_policy(
+ self.name, self.namespace, client.V1DeleteOptions(), **kwargs)
+
+
+class K8sNetworkPolicyManager(K8sBaseManager):
+ resource_class = K8sNetworkPolicy
+
+ @property
+ def api(self):
+ return self._cluster.api_extensions
+
+ def _list(self, namespace, **kwargs):
+ return self.api.list_namespaced_network_policy(namespace, **kwargs)
+
+ def _list_all(self, **kwargs):
+ return self.api.list_network_policy_for_all_namespaces(**kwargs)
diff --git a/tcp_tests/managers/k8s/nodes.py b/tcp_tests/managers/k8s/nodes.py
index c6d4dbe..4b0451e 100644
--- a/tcp_tests/managers/k8s/nodes.py
+++ b/tcp_tests/managers/k8s/nodes.py
@@ -12,70 +12,41 @@
# License for the specific language governing permissions and limitations
+from kubernetes import client
+
from tcp_tests.managers.k8s.base import K8sBaseResource
from tcp_tests.managers.k8s.base import K8sBaseManager
class K8sNode(K8sBaseResource):
- """docstring for ClassName"""
+ resource_type = 'node'
- def __repr__(self):
- return "<K8sNode: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_node(self.name, **kwargs)
- @property
- def name(self):
- return self.metadata.name
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_node(body, **kwargs)
- @property
- def labels(self):
- return self.metadata.labels
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_node(self.name, body, **kwargs)
- @labels.setter
- def labels(self, labels):
- current_labels = {
- label: None for label in self.labels
- }
- current_labels.update(labels)
- self.add_labels(labels=current_labels)
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_node(self.name, body, **kwargs)
- def add_labels(self, labels):
- if not isinstance(labels, dict):
- raise TypeError("labels must be a dict!")
- body = {
- "metadata":
- {
- "labels": labels
- }
- }
- self._add_details(self._manager.update(body=body, name=self.name))
-
- def remove_labels(self, list_labels):
- labels = {label: None for label in list_labels}
- self.add_labels(labels=labels)
+ def _delete(self, **kwargs):
+ self._manager.api.delete_node(
+ self.name, client.V1DeleteOptions(), **kwargs)
class K8sNodeManager(K8sBaseManager):
- """docstring for ClassName"""
-
resource_class = K8sNode
- def _get(self, name, **kwargs):
- return self.api.read_namespaced_node(name=name, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_core
- def _list(self, **kwargs):
- return self.api.list_namespaced_node(**kwargs)
+ def _list(self, namespace, **kwargs):
+ return self.api.list_node(**kwargs)
- def _create(self, body, **kwargs):
- return self.api.create_namespaced_node(body=body, **kwargs)
-
- def _replace(self, body, name, **kwargs):
- return self.api.replace_namespaced_node(body=body, name=name, **kwargs)
-
- def _delete(self, body, name, **kwargs):
- return self.api.delete_namespaced_node(body=body, name=name, **kwargs)
-
- def _deletecollection(self, **kwargs):
- return self.api.deletecollection_namespaced_node(**kwargs)
-
- def update(self, body, name, **kwargs):
- return self.api.patch_namespaced_node(body=body, name=name, **kwargs)
+ def _list_all(self, **kwargs):
+ return self._list(None, **kwargs)
diff --git a/tcp_tests/managers/k8s/persistentvolumeclaims.py b/tcp_tests/managers/k8s/persistentvolumeclaims.py
index f28f622..b28b869 100644
--- a/tcp_tests/managers/k8s/persistentvolumeclaims.py
+++ b/tcp_tests/managers/k8s/persistentvolumeclaims.py
@@ -12,52 +12,47 @@
# License for the specific language governing permissions and limitations
+from kubernetes import client
+
from tcp_tests.managers.k8s.base import K8sBaseResource
from tcp_tests.managers.k8s.base import K8sBaseManager
class K8sPersistentVolumeClaim(K8sBaseResource):
- """docstring for K8sPersistentVolumeClaim"""
+ resource_type = 'persistentvolumeclaim'
- def __repr__(self):
- return "<K8sPersistentVolumeClaim: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_namespaced_persistent_volume_claim(
+ self.name, self.namespace, **kwargs)
- @property
- def name(self):
- return self.metadata.name
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_namespaced_persistent_volume_claim(
+ self.namespace, body, **kwargs)
+
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_namespaced_persistent_volume_claim(
+ self.name, self.namespace, body, **kwargs)
+
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_namespaced_persistent_volume_claim(
+ self.name, self.namespace, body, **kwargs)
+
+ def _delete(self, **kwargs):
+ self._manager.api.delete_namespaced_persistent_volume_claim(
+ self.name, self.namespace, client.V1DeleteOptions(), **kwargs)
class K8sPersistentVolumeClaimManager(K8sBaseManager):
- """docstring for ClassName"""
-
resource_class = K8sPersistentVolumeClaim
- def _get(self, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.read_namespaced_persistent_volume_claim(
- name=name, namespace=namespace, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_core
- def _list(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
+ def _list(self, namespace, **kwargs):
return self.api.list_namespaced_persistent_volume_claim(
- namespace=namespace, **kwargs)
+ namespace, **kwargs)
- def _create(self, body, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.create_namespaced_persistent_volume_claim(
- body=body, namespace=namespace, **kwargs)
-
- def _replace(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.replace_namespaced_persistent_volume_claim(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _delete(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.delete_namespaced_persistent_volume_claim(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _deletecollection(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.deletecollection_namespaced_persistent_volume_claim(
- namespace=namespace, **kwargs)
+ def _list_all(self, **kwargs):
+ return self.api.list_persistent_volume_claim_for_all_namespaces(
+ **kwargs)
diff --git a/tcp_tests/managers/k8s/persistentvolumes.py b/tcp_tests/managers/k8s/persistentvolumes.py
index 8ab7ec2..7424935 100644
--- a/tcp_tests/managers/k8s/persistentvolumes.py
+++ b/tcp_tests/managers/k8s/persistentvolumes.py
@@ -12,46 +12,43 @@
# License for the specific language governing permissions and limitations
+from kubernetes import client
+
from tcp_tests.managers.k8s.base import K8sBaseResource
from tcp_tests.managers.k8s.base import K8sBaseManager
class K8sPersistentVolume(K8sBaseResource):
- """docstring for K8sPersistentVolume"""
+ resource_type = 'persistentvolume'
- def __repr__(self):
- return "<K8sPersistentVolume: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_persistent_volume(self.name, **kwargs)
- @property
- def name(self):
- return self.metadata.name
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_persistent_volume(body, **kwargs)
+
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_persistent_volume(
+ self.name, body, **kwargs)
+
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_persistent_volume(
+ self.name, body, **kwargs)
+
+ def _delete(self, **kwargs):
+ self._manager.api.delete_persistent_volume(
+ self.name, client.V1DeleteOptions(), **kwargs)
class K8sPersistentVolumeManager(K8sBaseManager):
- """docstring for ClassName"""
-
resource_class = K8sPersistentVolume
- def _get(self, name, **kwargs):
- return self.api.read_namespaced_persistent_volume(
- name=name, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_core
- def _list(self, **kwargs):
- return self.api.list_namespaced_persistent_volume(
- **kwargs)
+ def _list(self, namespace, **kwargs):
+ return self.api.list_persistent_volume(**kwargs)
- def _create(self, body, **kwargs):
- return self.api.create_namespaced_persistent_volume(
- body, **kwargs)
-
- def _replace(self, body, name, **kwargs):
- return self.api.replace_namespaced_persistent_volume(
- body=body, name=name, **kwargs)
-
- def _delete(self, body, name, **kwargs):
- return self.api.delete_namespaced_persistent_volume(
- body=body, name=name, **kwargs)
-
- def _deletecollection(self, **kwargs):
- return self.api.deletecollection_namespaced_persistent_volume(
- **kwargs)
+ def _list_all(self, **kwargs):
+ return self._list(None, **kwargs)
diff --git a/tcp_tests/managers/k8s/pods.py b/tcp_tests/managers/k8s/pods.py
index 94abc20..98192a6 100644
--- a/tcp_tests/managers/k8s/pods.py
+++ b/tcp_tests/managers/k8s/pods.py
@@ -11,6 +11,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
+
+from kubernetes import client
+
from devops.helpers import helpers
from tcp_tests.managers.k8s.base import K8sBaseResource
@@ -18,91 +21,51 @@
class K8sPod(K8sBaseResource):
- """docstring for K8sPod"""
+ resource_type = 'pod'
- def __repr__(self):
- return "<K8sPod: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_namespaced_pod(
+ self.name, self.namespace, **kwargs)
- @property
- def name(self):
- return self.metadata.name
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_namespaced_pod(
+ self.namespace, body, **kwargs)
- @property
- def phase(self):
- return self.status.phase
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_namespaced_pod(
+ self.name, self.namespace, body, **kwargs)
- @property
- def namespace(self):
- return self.metadata.namespace
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_namespaced_pod(
+ self.name, self.namespace, body, **kwargs)
- def wait_phase(self, phase, timeout=60, interval=5):
- """Wait phase of pod_name from namespace while timeout
+ def _delete(self, **kwargs):
+ self._manager.api.delete_namespaced_pod(
+ self.name, self.namespace, client.V1DeleteOptions(), **kwargs)
- :param list or str: phase
- :param int: timeout
+ def wait_phase(self, phases, timeout=60, interval=3):
+ if isinstance(phases, str):
+ phases = [phases]
- :rtype: None
- """
- if isinstance(phase, str):
- phase = [phase]
+ helpers.wait(lambda: self.read().status.phase in phases,
+ timeout=timeout, interval=interval,
+ timeout_msg='Timeout waiting, pod {0} phase is not in '
+ '"{1}"'.format(self.name, phases))
+ return self
- def check():
- self._add_details(self._manager.get(name=self.name,
- namespace=self.namespace))
- return self.phase in phase
-
- helpers.wait(check, timeout=timeout, interval=interval,
- timeout_msg='Timeout waiting({timeout}s), pod {pod_name} '
- 'is not in "{phase}" phase'.format(
- timeout=timeout,
- pod_name=self.name,
- phase=phase))
-
- def wait_running(self, timeout=60, interval=5):
- self.wait_phase(['Running'], timeout=timeout, interval=interval)
+ def wait_running(self, timeout=240, interval=3):
+ return self.wait_phase('Running', timeout=timeout, interval=interval)
class K8sPodManager(K8sBaseManager):
- """docstring for ClassName"""
-
resource_class = K8sPod
- def _get(self, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.read_namespaced_pod(
- name=name, namespace=namespace, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_core
- def _list(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.list_namespaced_pod(namespace=namespace, **kwargs)
+ def _list(self, namespace, **kwargs):
+ return self.api.list_namespaced_pod(namespace, **kwargs)
- def _create(self, body, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.create_namespaced_pod(
- body=body, namespace=namespace, **kwargs)
-
- def _replace(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.replace_namespaced_pod(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _delete(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- # NOTE: the following two lines should be deleted after
- # serialization is fixed in python-k8sclient
- if isinstance(body, self.resource_class):
- body = body.swagger_types
- return self.api.delete_namespaced_pod(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _deletecollection(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.deletecollection_namespaced_pod(
- namespace=namespace, **kwargs)
-
- def full_list(self, *args, **kwargs):
- lst = self._full_list(*args, **kwargs)
- return [self.resource_class(self, item) for item in lst.items]
-
- def _full_list(self, **kwargs):
- return self.api.list_pod(**kwargs)
+ def _list_all(self, **kwargs):
+ return self.api.list_pod_for_all_namespaces(**kwargs)
diff --git a/tcp_tests/managers/k8s/replicasets.py b/tcp_tests/managers/k8s/replicasets.py
index 31b6db6..7132157 100644
--- a/tcp_tests/managers/k8s/replicasets.py
+++ b/tcp_tests/managers/k8s/replicasets.py
@@ -12,59 +12,45 @@
# License for the specific language governing permissions and limitations
+from kubernetes import client
+
from tcp_tests.managers.k8s.base import K8sBaseResource
from tcp_tests.managers.k8s.base import K8sBaseManager
class K8sReplicaSet(K8sBaseResource):
- """docstring for K8sPod"""
+ resource_type = 'replicaset'
- def __repr__(self):
- return "<K8sPod: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_namespaced_replica_set(
+ self.name, self.namespace, **kwargs)
- @property
- def name(self):
- return self.metadata.name
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_namespaced_replica_set(
+ self.namespace, body, **kwargs)
+
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_namespaced_replica_set(
+ self.name, self.namespace, body, **kwargs)
+
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_namespaced_replica_set(
+ self.name, self.namespace, body, **kwargs)
+
+ def _delete(self, **kwargs):
+ self._manager.api.delete_namespaced_replica_set(
+ self.name, self.namespace, client.V1DeleteOptions(), **kwargs)
class K8sReplicaSetManager(K8sBaseManager):
- """docstring for ClassName"""
-
resource_class = K8sReplicaSet
- def _get(self, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.read_namespaced_replica_set(
- name=name, namespace=namespace, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_apps
- def _list(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.list_namespaced_replica_set(namespace=namespace,
- **kwargs)
+ def _list(self, namespace, **kwargs):
+ return self.api.list_namespaced_replica_set(namespace, **kwargs)
- def _create(self, body, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.create_namespaced_replica_set(
- body=body, namespace=namespace, **kwargs)
-
- def _replace(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.replace_namespaced_replica_set(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _delete(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.delete_namespaced_replica_set(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _deletecollection(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.deletecollection_namespaced_replica_set(
- namespace=namespace, **kwargs)
-
- def full_list(self, *args, **kwargs):
- lst = self._full_list(*args, **kwargs)
- return [self.resource_class(self, item) for item in lst.items]
-
- def _full_list(self, **kwargs):
- return self.api.list_replica_set(**kwargs)
+ def _list_all(self, **kwargs):
+ return self.api.list_replica_set_for_all_namespaces(**kwargs)
diff --git a/tcp_tests/managers/k8s/replicationcontrollers.py b/tcp_tests/managers/k8s/replicationcontrollers.py
index 6cf7da4..aabb092 100644
--- a/tcp_tests/managers/k8s/replicationcontrollers.py
+++ b/tcp_tests/managers/k8s/replicationcontrollers.py
@@ -12,59 +12,47 @@
# License for the specific language governing permissions and limitations
+from kubernetes import client
+
from tcp_tests.managers.k8s.base import K8sBaseResource
from tcp_tests.managers.k8s.base import K8sBaseManager
class K8sReplicationController(K8sBaseResource):
- """docstring for K8sReplicationController"""
+ resource_type = 'replicationcontroller'
- def __repr__(self):
- return "<K8sReplicationController: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_namespaced_replication_controller(
+ self.name, self.namespace, **kwargs)
- @property
- def name(self):
- return self.metadata.name
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_namespaced_replication_controller(
+ self.namespace, body, **kwargs)
+
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_namespaced_replication_controller(
+ self.name, self.namespace, body, **kwargs)
+
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_namespaced_replication_controller(
+ self.name, self.namespace, body, **kwargs)
+
+ def _delete(self, **kwargs):
+ self._manager.api.delete_namespaced_replication_controller(
+ self.name, self.namespace, client.V1DeleteOptions(), **kwargs)
class K8sReplicationControllerManager(K8sBaseManager):
- """docstring for ClassName"""
-
resource_class = K8sReplicationController
- def _get(self, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.read_namespaced_replication_controller(
- name=name, namespace=namespace, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_core
- def _list(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
+ def _list(self, namespace, **kwargs):
return self.api.list_namespaced_replication_controller(
- namespace=namespace, **kwargs)
+ namespace, **kwargs)
- def _create(self, body, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.create_namespaced_replication_controller(
- body=body, namespace=namespace, **kwargs)
-
- def _replace(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.replace_namespaced_replication_controller(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _delete(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.delete_namespaced_replication_controller(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _deletecollection(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.deletecollection_namespaced_replication_controller(
- namespace=namespace, **kwargs)
-
- def full_list(self, *args, **kwargs):
- lst = self._full_list(*args, **kwargs)
- return [self.resource_class(self, item) for item in lst.items]
-
- def _full_list(self, **kwargs):
- return self.api.list_replication_controller(**kwargs)
+ def _list_all(self, **kwargs):
+ return self.api.list_replication_controller_for_all_namespaces(
+ **kwargs)
diff --git a/tcp_tests/managers/k8s/resourcequotas.py b/tcp_tests/managers/k8s/resourcequotas.py
index 49d81d5..2edf00d 100644
--- a/tcp_tests/managers/k8s/resourcequotas.py
+++ b/tcp_tests/managers/k8s/resourcequotas.py
@@ -12,59 +12,45 @@
# License for the specific language governing permissions and limitations
+from kubernetes import client
+
from tcp_tests.managers.k8s.base import K8sBaseResource
from tcp_tests.managers.k8s.base import K8sBaseManager
class K8sResourceQuota(K8sBaseResource):
- """docstring for K8sResourceQuota"""
+ resource_type = 'resourcequota'
- def __repr__(self):
- return "<K8sResourceQuota: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_namespaced_resource_quota(
+ self.name, self.namespace, **kwargs)
- @property
- def name(self):
- return self.metadata.name
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_namespaced_resource_quota(
+ self.namespace, body, **kwargs)
+
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_namespaced_resource_quota(
+ self.name, self.namespace, body, **kwargs)
+
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_namespaced_resource_quota(
+ self.name, self.namespace, body, **kwargs)
+
+ def _delete(self, **kwargs):
+ self._manager.api.delete_namespaced_resource_quota(
+ self.name, self.namespace, client.V1DeleteOptions(), **kwargs)
class K8sResourceQuotaManager(K8sBaseManager):
- """docstring for ClassName"""
-
resource_class = K8sResourceQuota
- def _get(self, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.read_namespaced_resource_quota(
- name=name, namespace=namespace, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_core
- def _list(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.list_namespaced_resource_quota(
- namespace=namespace, **kwargs)
+ def _list(self, namespace, **kwargs):
+ return self.api.list_namespaced_resource_quota(namespace, **kwargs)
- def _create(self, body, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.create_namespaced_resource_quota(
- body=body, namespace=namespace, **kwargs)
-
- def _replace(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.replace_namespaced_resource_quota(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _delete(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.delete_namespaced_resource_quota(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _deletecollection(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.deletecollection_namespaced_resource_quota(
- namespace=namespace, **kwargs)
-
- def full_list(self, *args, **kwargs):
- lst = self._full_list(*args, **kwargs)
- return [self.resource_class(self, item) for item in lst.items]
-
- def _full_list(self, **kwargs):
- return self.api.list_resourse_quota(**kwargs)
+ def _list_all(self, **kwargs):
+ return self.api.list_resource_quota_for_all_namespaces(**kwargs)
diff --git a/tcp_tests/managers/k8s/secrets.py b/tcp_tests/managers/k8s/secrets.py
index 355c884..474c119 100644
--- a/tcp_tests/managers/k8s/secrets.py
+++ b/tcp_tests/managers/k8s/secrets.py
@@ -12,59 +12,45 @@
# License for the specific language governing permissions and limitations
+from kubernetes import client
+
from tcp_tests.managers.k8s.base import K8sBaseResource
from tcp_tests.managers.k8s.base import K8sBaseManager
class K8sSecret(K8sBaseResource):
- """docstring for K8sSecret"""
+ resource_type = 'secret'
- def __repr__(self):
- return "<K8sSecret: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_namespaced_secret(
+ self.name, self.namespace, **kwargs)
- @property
- def name(self):
- return self.metadata.name
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_namespaced_secret(
+ self.namespace, body, **kwargs)
+
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_namespaced_secret(
+ self.name, self.namespace, body, **kwargs)
+
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_namespaced_secret(
+ self.name, self.namespace, body, **kwargs)
+
+ def _delete(self, **kwargs):
+ self._manager.api.delete_namespaced_secret(
+ self.name, self.namespace, client.V1DeleteOptions(), **kwargs)
class K8sSecretManager(K8sBaseManager):
- """docstring for ClassName"""
-
resource_class = K8sSecret
- def _get(self, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.read_namespaced_secret(
- name=name, namespace=namespace, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_core
- def _list(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.list_namespaced_secret(
- namespace=namespace, **kwargs)
+ def _list(self, namespace, **kwargs):
+ return self.api.list_namespaced_secret(namespace, **kwargs)
- def _create(self, body, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.create_namespaced_secret(
- body=body, namespace=namespace, **kwargs)
-
- def _replace(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.replace_namespaced_secret(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _delete(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.delete_namespaced_secret(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _deletecollection(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.deletecollection_namespaced_secret(
- namespace=namespace, **kwargs)
-
- def full_list(self, *args, **kwargs):
- lst = self._full_list(*args, **kwargs)
- return [self.resource_class(self, item) for item in lst.items]
-
- def _full_list(self, **kwargs):
- return self.api.list_secret(**kwargs)
+ def _list_all(self, **kwargs):
+ return self.api.list_secret_for_all_namespaces(**kwargs)
diff --git a/tcp_tests/managers/k8s/serviceaccounts.py b/tcp_tests/managers/k8s/serviceaccounts.py
index bf58b4c..3b779eb 100644
--- a/tcp_tests/managers/k8s/serviceaccounts.py
+++ b/tcp_tests/managers/k8s/serviceaccounts.py
@@ -12,59 +12,45 @@
# License for the specific language governing permissions and limitations
+from kubernetes import client
+
from tcp_tests.managers.k8s.base import K8sBaseResource
from tcp_tests.managers.k8s.base import K8sBaseManager
class K8sServiceAccount(K8sBaseResource):
- """docstring for K8sServiceAccount"""
+ resource_type = 'serviceaccount'
- def __repr__(self):
- return "<K8sServiceAccount: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_namespaced_service_account(
+ self.name, self.namespace, **kwargs)
- @property
- def name(self):
- return self.metadata.name
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_namespaced_service_account(
+ self.namespace, body, **kwargs)
+
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_namespaced_service_account(
+ self.name, self.namespace, body, **kwargs)
+
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_namespaced_service_account(
+ self.name, self.namespace, body, **kwargs)
+
+ def _delete(self, **kwargs):
+ self._manager.api.delete_namespaced_service_account(
+ self.name, self.namespace, client.V1DeleteOptions(), **kwargs)
class K8sServiceAccountManager(K8sBaseManager):
- """docstring for K8sServiceAccountManager"""
-
resource_class = K8sServiceAccount
- def _get(self, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.read_namespaced_service_account(
- name=name, namespace=namespace, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_core
- def _list(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.list_namespaced_service_account(
- namespace=namespace, **kwargs)
+ def _list(self, namespace, **kwargs):
+ return self.api.list_namespaced_service_account(namespace, **kwargs)
- def _create(self, body, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.create_namespaced_service_account(
- body=body, namespace=namespace, **kwargs)
-
- def _replace(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.replace_namespaced_service_account(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _delete(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.delete_namespaced_service_account(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _deletecollection(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.deletecollection_namespaced_service_account(
- namespace=namespace, **kwargs)
-
- def full_list(self, *args, **kwargs):
- lst = self._full_list(*args, **kwargs)
- return [self.resource_class(self, item) for item in lst.items]
-
- def _full_list(self, **kwargs):
- return self.api.list_service_account(**kwargs)
+ def _list_all(self, **kwargs):
+ return self.api.list_service_account_for_all_namespaces(**kwargs)
diff --git a/tcp_tests/managers/k8s/services.py b/tcp_tests/managers/k8s/services.py
index 97a6be7..6c4a22b 100644
--- a/tcp_tests/managers/k8s/services.py
+++ b/tcp_tests/managers/k8s/services.py
@@ -12,57 +12,51 @@
# License for the specific language governing permissions and limitations
+from kubernetes import client
+
from tcp_tests.managers.k8s.base import K8sBaseResource
from tcp_tests.managers.k8s.base import K8sBaseManager
class K8sService(K8sBaseResource):
- """docstring for K8sService"""
+ resource_type = 'service'
- def __repr__(self):
- return "<K8sService: %s>" % self.name
+ def _read(self, **kwargs):
+ return self._manager.api.read_namespaced_service(
+ self.name, self.namespace, **kwargs)
- @property
- def name(self):
- return self.metadata.name
+ def _create(self, body, **kwargs):
+ return self._manager.api.create_namespaced_service(
+ self.namespace, body, **kwargs)
- @property
- def namespace(self):
- return self.metadata.namespace
+ def _patch(self, body, **kwargs):
+ return self._manager.api.patch_namespaced_service(
+ self.name, self.namespace, body, **kwargs)
+
+ def _replace(self, body, **kwargs):
+ return self._manager.api.replace_namespaced_service(
+ self.name, self.namespace, body, **kwargs)
+
+ def _delete(self, **kwargs):
+ self._manager.api.delete_namespaced_service(
+ self.name, self.namespace, client.V1DeleteOptions(), **kwargs)
+
+ def get_ip(self, external=False):
+ if external:
+ return self.read().status.load_balancer.ingress[0].ip
+ else:
+ return self.read().spec.cluster_ip
class K8sServiceManager(K8sBaseManager):
- """docstring for K8sServiceManager"""
-
resource_class = K8sService
- def _get(self, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.read_namespaced_service(
- name=name, namespace=namespace, **kwargs)
+ @property
+ def api(self):
+ return self._cluster.api_core
- def _list(self, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.list_namespaced_service(namespace=namespace, **kwargs)
+ def _list(self, namespace, **kwargs):
+ return self.api.list_namespaced_service(namespace, **kwargs)
- def _create(self, body, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.create_namespaced_service(
- body=body, namespace=namespace, **kwargs)
-
- def _replace(self, body, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.replace_namespaced_service(
- body=body, name=name, namespace=namespace, **kwargs)
-
- def _delete(self, name, namespace=None, **kwargs):
- namespace = namespace or self.namespace
- return self.api.delete_namespaced_service(
- name=name, namespace=namespace, **kwargs)
-
- def full_list(self, *args, **kwargs):
- lst = self._full_list(*args, **kwargs)
- return [self.resource_class(self, item) for item in lst.items]
-
- def _full_list(self, **kwargs):
- return self.api.list_service(**kwargs)
+ def _list_all(self, **kwargs):
+ return self.api.list_service_for_all_namespaces(**kwargs)