Skip to content
65 changes: 35 additions & 30 deletions src/mas/devops/ocp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from kubernetes import client
from kubernetes.stream import stream
from kubernetes.stream.ws_client import ERROR_CHANNEL
from kubernetes.dynamic.resource import ResourceInstance

import yaml

Expand Down Expand Up @@ -85,9 +86,6 @@ def getClusterVersion(dynClient: DynamicClient) -> str:

Returns:
str: The cluster version string (e.g., "4.12.0"), or None if not found

Raises:
NotFoundError: If the ClusterVersion resource cannot be retrieved
"""
clusterVersionAPI = dynClient.resources.get(api_version="config.openshift.io/v1", kind="ClusterVersion")

Expand Down Expand Up @@ -131,8 +129,6 @@ def getNamespace(dynClient: DynamicClient, namespace: str) -> dict:
Returns:
dict: The namespace resource as a dictionary, or an empty dict if not found

Raises:
NotFoundError: If the namespace does not exist
"""
namespaceAPI = dynClient.resources.get(api_version="v1", kind="Namespace")

Expand All @@ -142,7 +138,6 @@ def getNamespace(dynClient: DynamicClient, namespace: str) -> dict:
return ns
except NotFoundError:
logger.debug(f"Namespace {namespace} does not exist")

return {}


Expand All @@ -160,9 +155,6 @@ def createNamespace(dynClient: DynamicClient, namespace: str, kyvernoLabel: str

Returns:
bool: Always returns True

Raises:
NotFoundError: If the namespace resource cannot be accessed
"""
namespaceAPI = dynClient.resources.get(api_version="v1", kind="Namespace")
try:
Expand Down Expand Up @@ -204,9 +196,6 @@ def deleteNamespace(dynClient: DynamicClient, namespace: str) -> bool:

Returns:
bool: Always returns True

Raises:
NotFoundError: If the namespace does not exist (caught and logged)
"""
namespaceAPI = dynClient.resources.get(api_version="v1", kind="Namespace")
try:
Expand All @@ -229,9 +218,6 @@ def waitForCRD(dynClient: DynamicClient, crdName: str) -> bool:

Returns:
bool: True if the CRD becomes established, False if timeout is reached

Raises:
NotFoundError: If the CRD is not found (caught and retried)
"""
crdAPI = dynClient.resources.get(api_version="apiextensions.k8s.io/v1", kind="CustomResourceDefinition")
maxRetries = 100
Expand Down Expand Up @@ -274,9 +260,6 @@ def waitForDeployment(dynClient: DynamicClient, namespace: str, deploymentName:

Returns:
bool: True if the deployment becomes ready, False if timeout is reached

Raises:
NotFoundError: If the deployment is not found (caught and retried)
"""
deploymentAPI = dynClient.resources.get(api_version="apps/v1", kind="Deployment")
maxRetries = 100
Expand Down Expand Up @@ -309,9 +292,6 @@ def getConsoleURL(dynClient: DynamicClient) -> str:

Returns:
str: The HTTPS URL of the OpenShift console (e.g., "https://console-openshift-console.apps.cluster.example.com")

Raises:
NotFoundError: If the console route is not found
"""
routesAPI = dynClient.resources.get(api_version="route.openshift.io/v1", kind="Route")
consoleRoute = routesAPI.get(name="console", namespace="openshift-console")
Expand All @@ -327,9 +307,6 @@ def getNodes(dynClient: DynamicClient) -> dict:

Returns:
list: List of node resources as dictionaries

Raises:
NotFoundError: If nodes cannot be retrieved
"""
nodesAPI = dynClient.resources.get(api_version="v1", kind="Node")
nodes = nodesAPI.get().to_dict()['items']
Expand All @@ -346,9 +323,6 @@ def getStorageClass(dynClient: DynamicClient, name: str) -> dict | None:

Returns:
StorageClass: The StorageClass resource, or None if not found

Raises:
NotFoundError: If the StorageClass does not exist (caught and returns None)
"""
try:
storageClassAPI = dynClient.resources.get(api_version="storage.k8s.io/v1", kind="StorageClass")
Expand All @@ -367,15 +341,46 @@ def getStorageClasses(dynClient: DynamicClient) -> list:

Returns:
list: List of StorageClass resources

Raises:
NotFoundError: If StorageClasses cannot be retrieved
"""
storageClassAPI = dynClient.resources.get(api_version="storage.k8s.io/v1", kind="StorageClass")
storageClasses = storageClassAPI.get().items
return storageClasses


def getClusterIssuers(dynClient: DynamicClient) -> list:
"""
Get all ClusterIssuers in the cluster.

Parameters:
dynClient (DynamicClient): OpenShift Dynamic Client

Returns:
list: List of ClusterIssuers resources or an empty list if no cluster issuers
"""
clusterIssuerAPI = dynClient.resources.get(api_version="cert-manager.io/v1", kind="ClusterIssuer")
clusterIssuers = clusterIssuerAPI.get().items
return clusterIssuers


def getClusterIssuer(dynClient: DynamicClient, name: str) -> ResourceInstance | None:
"""
Get a specific ClusterIssuer by name.

Parameters:
dynClient (DynamicClient): OpenShift Dynamic Client
name (str): The name of the ClusterIssuer to retrieve

Returns:
ClusterIssuer: The ClusterIssuer resource, or None if not found
"""
try:
clusterIssuerAPI = dynClient.resources.get(api_version="cert-manager.io/v1", kind="ClusterIssuer")
clusterIssuer = clusterIssuerAPI.get(name=name)
return clusterIssuer
except NotFoundError:
return None


def getStorageClassVolumeBindingMode(dynClient: DynamicClient, storageClassName: str) -> str:
"""
Get the volumeBindingMode for a storage class.
Expand Down
Loading