diff --git a/src/mas/devops/users.py b/src/mas/devops/users.py index ede57ea8..c3d1d1e6 100644 --- a/src/mas/devops/users.py +++ b/src/mas/devops/users.py @@ -35,14 +35,14 @@ class MASUserUtils(): - Manage API (API keys, security groups) Attributes: - MAXADMIN (str): Constant for the MAXADMIN user identifier. + MXINTADM (str): Constant for the MXINTADM user identifier. mas_instance_id (str): The MAS instance identifier. mas_workspace_id (str): The workspace identifier within the MAS instance. mas_core_namespace (str): Kubernetes namespace for MAS core components. manage_namespace (str): Kubernetes namespace for Manage application. """ - MAXADMIN = "MAXADMIN" + MXINTADM = "MXINTADM" def __init__(self, mas_instance_id: str, mas_workspace_id: str, k8s_client: client.api_client.ApiClient, mas_version: str = '9.0', coreapi_port: int = 443, admin_dashboard_port: int = 443, manage_api_port: int = 443): """ @@ -216,8 +216,8 @@ def get_user(self, user_id): # For MAS version >= 9.1, use the Manage API masperuser endpoint if Version(self.mas_version) >= Version('9.1'): - # Get MAXADMIN API key for authentication - maxadmin_manage_api_key = self.create_or_get_manage_api_key_for_user(MASUserUtils.MAXADMIN, temporary=True) + # Get MXINTADM API key for authentication + mxintadm_manage_api_key = self.create_or_get_manage_api_key_for_user(MASUserUtils.MXINTADM, temporary=True) # First request: Query to find user and get resource_id from href url = f"{self.manage_api_url_internal}/maximo/api/os/masperuser" @@ -227,7 +227,7 @@ def get_user(self, user_id): } headers = { "Accept": "application/json", - "apikey": maxadmin_manage_api_key["apikey"] + "apikey": mxintadm_manage_api_key["apikey"] } response = requests.get( url, @@ -245,7 +245,7 @@ def get_user(self, user_id): # Extract resource_id from href (e.g., "api/os/masperuser/") if href and "/" in href: resource_id = href.split("/")[-1] - self.logger.info(f"Extracted resource_id: {resource_id} from user_info") + self.logger.debug(f"Extracted resource_id: {resource_id} from user_info") # Second request: Get full user details url = f"{self.manage_api_url_internal}/maximo/api/os/masperuser/" @@ -256,7 +256,7 @@ def get_user(self, user_id): } headers = { "Accept": "application/json", - "apikey": maxadmin_manage_api_key["apikey"] + "apikey": mxintadm_manage_api_key["apikey"] } response = requests.get( url, @@ -338,8 +338,8 @@ def get_or_create_user(self, payload): # For MAS version >= 9.1, use the Manage API masapiuser endpoint if Version(self.mas_version) >= Version('9.1'): - # Get MAXADMIN API key for authentication - maxadmin_manage_api_key = self.create_or_get_manage_api_key_for_user(MASUserUtils.MAXADMIN, temporary=True) + # Get MXINTADM API key for authentication + mxintadm_manage_api_key = self.create_or_get_manage_api_key_for_user(MASUserUtils.MXINTADM, temporary=True) url = f"{self.manage_api_url_internal}/maximo/api/os/masperuser" querystring = { @@ -347,7 +347,7 @@ def get_or_create_user(self, payload): } headers = { "Content-Type": "application/json", - "apikey": maxadmin_manage_api_key["apikey"] + "apikey": mxintadm_manage_api_key["apikey"] } self.logger.debug(f"Creating new user {user_id} with Manage API with payload {payload}") response = requests.post( @@ -957,7 +957,7 @@ def create_or_get_manage_api_key_for_user(self, user_id, temporary=False): if "Error" in error_json and "reasonCode" in error_json["Error"] and error_json["Error"]["reasonCode"] == "BMXAA10051E": # BMXAA10051E - Only one API key allowed per user. - self.logger.info(f"Reusing existing Manage API Key for user {user_id}") + self.logger.debug(f"Reusing existing Manage API Key for user {user_id}") pass else: # any other 400 error is unexpected @@ -1479,7 +1479,52 @@ def create_initial_users_for_saas(self, initial_users): def create_initial_user_for_saas(self, user, user_type, groupreassign=None): """ - Create and fully configure a single initial user for SaaS. + Create and fully configure a single initial user for MAS SaaS. + + Args: + user (dict): User definition containing: + - email (str, required): User's email address + - given_name (str, required): User's first name + - family_name (str, required): User's last name + - id (str, optional): User ID (defaults to email) + user_type (str): Either "PRIMARY" or "SECONDARY" to determine permissions level. + + Returns: + None + + Raises: + Exception: If required fields are missing or user creation fails. + + Note: + Appropriate user creation procedure is chosen based on MAS version. + + """ + + if "email" not in user: + raise Exception("'email' not found in at least one of the user defs") + if "given_name" not in user: + raise Exception("'given_name' not found in at least one of the user defs") + if "family_name" not in user: + raise Exception("'family_name' not found in at least one of the user defs") + + user_email = user["email"] + user_given_name = user["given_name"] + user_family_name = user["family_name"] + + if "id" in user: + user_id = user["id"] + else: + # default to email if no id provided + user_id = user_email + + if Version(self.mas_version) < Version('9.1'): + self.create_initial_user_for_saas_pre_9_1(user_email, user_given_name, user_family_name, user_id, user_type) + else: + self.create_initial_user_for_saas_post_9_1(user_email, user_given_name, user_family_name, user_id, user_type, groupreassign) + + def create_initial_user_for_saas_pre_9_1(self, user_email, user_given_name, user_family_name, user_id, user_type): + """ + Create and fully configure a single initial user for MAS SaaS pre 9.1 using the Core APIs This method performs the complete user setup workflow: 1. Creates the user in MAS Core with appropriate permissions and entitlements @@ -1490,11 +1535,10 @@ def create_initial_user_for_saas(self, user, user_type, groupreassign=None): 6. Adds user to Manage security groups (if applicable) Args: - user (dict): User definition containing: - - email (str, required): User's email address - - given_name (str, required): User's first name - - family_name (str, required): User's last name - - id (str, optional): User ID (defaults to email) + user_email (str, required): User's email address + user_given_name (str, required): User's first name + user_family_name (str, required): User's last name + user_id (str, required): User ID user_type (str): Either "PRIMARY" or "SECONDARY" to determine permissions level. Returns: @@ -1504,7 +1548,7 @@ def create_initial_user_for_saas(self, user, user_type, groupreassign=None): Exception: If required fields are missing or user creation fails. Note: - For version < 9.1, + For MAS < 9.1, PRIMARY users get: - userAdmin permission - PREMIUM application entitlement @@ -1512,7 +1556,122 @@ def create_initial_user_for_saas(self, user, user_type, groupreassign=None): - ADMIN role for most apps, MANAGEUSER for Manage - MAXADMIN security group membership - For version >= 9.1, + """ + + username = user_id + # display_name = re.search('^([^@]+)@', user_email).group(1) # local part of the email + display_name = f"{user_given_name} {user_family_name}" + + # Set user permissions and entitlements based on requested user_type + if user_type == "PRIMARY": + permissions = { + "systemAdmin": False, + "userAdmin": True, + "apikeyAdmin": False + } + entitlement = { + "application": "PREMIUM", + "admin": "ADMIN_BASE", + "alwaysReserveLicense": True + } + is_workspace_admin = True + application_role = "ADMIN" + facilities_role = "PREMIUM" + manage_role = "MANAGEUSER" + manage_security_groups = ["MAXADMIN"] + elif user_type == "SECONDARY": + permissions = { + "systemAdmin": False, + "userAdmin": False, + "apikeyAdmin": False + } + entitlement = { + "application": "BASE", + "admin": "NONE", + "alwaysReserveLicense": True + } + is_workspace_admin = False + application_role = "USER" + facilities_role = "BASE" + manage_role = "MANAGEUSER" + # TODO: check which security groups secondary users should be members of + manage_security_groups = [] + else: + raise Exception(f"Unsupported user_type: {user_type}") + + user_def = { + "id": user_id, + "status": {"active": True}, + "username": username, + "owner": "local", + "emails": [ + { + "value": user_email, + "type": "Work", + "primary": True + } + ], + "phoneNumbers": [], + "addresses": [], + "displayName": display_name, + "issuer": "local", + "permissions": permissions, + "entitlement": entitlement, + "givenName": user_given_name, + "familyName": user_family_name, + + } + + self.get_or_create_user(user_def) + + # For version < 9.1, link user to local IDP first, then create API key only if needed for manage_security_groups + + # For version < 9.1, link user to local IDP without manage_api_key and resource_id + self.link_user_to_local_idp(user_id, email_password=True) + + # For version < 9.1, add user to workspace and grant necessary permissions + self.add_user_to_workspace(user_id, is_workspace_admin=is_workspace_admin) + + for mas_application_id in self.mas_workspace_application_ids: + self.await_mas_application_availability(mas_application_id) + if mas_application_id == "manage": + role = manage_role + elif mas_application_id == "facilities": + role = facilities_role + else: + # otherwise grant the user the appropriate role for their user_type + role = application_role + self.set_user_application_permission(user_id, mas_application_id, role) + + for mas_application_id in self.mas_workspace_application_ids: + self.check_user_sync(user_id, mas_application_id) + + if len(manage_security_groups) > 0 and "manage" in self.mas_workspace_application_ids: + mxintadm_manage_api_key = self.create_or_get_manage_api_key_for_user(MASUserUtils.MXINTADM, temporary=True) + for manage_security_group in manage_security_groups: + self.add_user_to_manage_group(user_id, manage_security_group, mxintadm_manage_api_key) + + def create_initial_user_for_saas_post_9_1(self, user_email, user_given_name, user_family_name, user_id, user_type, groupreassign=None): + """ + Create and fully configure a single initial user for MAS SaaS post 9.1 using the Manage APIs + + Args: + user_email (str, required): User's email address + user_given_name (str, required): User's first name + user_family_name (str, required): User's last name + user_id (str, required): User ID + user_type (str): Either "PRIMARY" or "SECONDARY" to determine permissions level. + + Returns: + None + + Raises: + Exception: If required fields are missing or user creation fails. + + + Notes: + For MAS >= 9.1, + PRIMARY users get: - apikeyAdmin permission (API Key Management) - idpAdmin permission (IDP Management) @@ -1527,176 +1686,63 @@ def create_initial_user_for_saas(self, user, user_type, groupreassign=None): - USER role for most apps, MANAGEUSER for Manage - No security group memberships """ - if "email" not in user: - raise Exception("'email' not found in at least one of the user defs") - if "given_name" not in user: - raise Exception("'given_name' not found in at least one of the user defs") - if "family_name" not in user: - raise Exception("'family_name' not found in at least one of the user defs") - - user_email = user["email"] - user_given_name = user["given_name"] - user_family_name = user["family_name"] - if "id" in user: - user_id = user["id"] - else: - # default to email if no id provided - user_id = user_email - - username = user_id # display_name = re.search('^([^@]+)@', user_email).group(1) # local part of the email display_name = f"{user_given_name} {user_family_name}" - # Set user permissions and entitlements based on requested user_type - if Version(self.mas_version) < Version('9.1'): - if user_type == "PRIMARY": - permissions = { - "systemAdmin": False, - "userAdmin": True, - "apikeyAdmin": False - } - entitlement = { - "application": "PREMIUM", - "admin": "ADMIN_BASE", - "alwaysReserveLicense": True - } - is_workspace_admin = True - application_role = "ADMIN" - facilities_role = "PREMIUM" - manage_role = "MANAGEUSER" - manage_security_groups = ["MAXADMIN"] - elif user_type == "SECONDARY": - permissions = { - "systemAdmin": False, - "userAdmin": False, - "apikeyAdmin": False - } - entitlement = { - "application": "BASE", - "admin": "NONE", - "alwaysReserveLicense": True - } - is_workspace_admin = False - application_role = "USER" - facilities_role = "BASE" - manage_role = "MANAGEUSER" - # TODO: check which security groups secondary users should be members of - manage_security_groups = [] - else: - raise Exception(f"Unsupported user_type: {user_type}") - - user_def = { - "id": user_id, - "status": {"active": True}, - "username": username, + if user_type == "PRIMARY": + maxuser_def = { + "userid": user_id, + "personid": user_id, + "loginid": user_id, "owner": "local", - "emails": [ + "systemadmin": False, + "apikeyadmin": True, + "isauthorized": 1, + "idpadmin": True, + "status": "ACTIVE", + "groupuser": [ { - "value": user_email, - "type": "Work", - "primary": True + "groupname": "USERMANAGEMENT" } - ], - "phoneNumbers": [], - "addresses": [], - "displayName": display_name, - "issuer": "local", - "permissions": permissions, - "entitlement": entitlement, - "givenName": user_given_name, - "familyName": user_family_name, - + ] } - else: - if user_type == "PRIMARY": - maxuser_def = { - "userid": user_id, - "personid": user_id, - "loginid": user_id, - "owner": "local", - "systemadmin": False, - "apikeyadmin": True, - "isauthorized": 1, - "idpadmin": True, - "status": "ACTIVE", - "groupuser": [ - { - "groupname": "USERMANAGEMENT" - } - ] - } - is_workspace_admin = True - application_role = "ADMIN" - facilities_role = "PREMIUM" - manage_role = "MANAGEUSER" - manage_security_groups = ["USERMANAGEMENT"] - elif user_type == "SECONDARY": - maxuser_def = { - "userid": user_id, - "personid": user_id, - "loginid": user_id, - "owner": "local", - "systemadmin": False, - "apikeyadmin": False, - "isauthorized": 0, - "idpadmin": False, - "status": "ACTIVE" - } - is_workspace_admin = False - application_role = "USER" - facilities_role = "BASE" - manage_role = "MANAGEUSER" - manage_security_groups = [] - else: - raise Exception(f"Unsupported user_type: {user_type}") - - user_def = { + manage_security_groups = ["USERMANAGEMENT"] + elif user_type == "SECONDARY": + maxuser_def = { + "userid": user_id, "personid": user_id, - "primaryemailtype": "Work", - "primaryemail": user_email, - "primaryphone": "", - "addressline1": "", - "displayName": display_name, - "maxuser": maxuser_def, + "loginid": user_id, + "owner": "local", + "systemadmin": False, + "apikeyadmin": False, + "isauthorized": 0, + "idpadmin": False, + "status": "ACTIVE" } + manage_security_groups = [] + else: + raise Exception(f"Unsupported user_type: {user_type}") + + user_def = { + "personid": user_id, + "primaryemailtype": "Work", + "primaryemail": user_email, + "primaryphone": "", + "addressline1": "", + "displayName": display_name, + "maxuser": maxuser_def, + } resource_id, _ = self.get_or_create_user(user_def) # For version >= 9.1, we always need a Manage API key and resource_id to link user to local IDP - # For version < 9.1, link user to local IDP first, then create API key only if needed for manage_security_groups - maxadmin_manage_api_key = None - if Version(self.mas_version) >= Version('9.1'): - maxadmin_manage_api_key = self.create_or_get_manage_api_key_for_user(MASUserUtils.MAXADMIN, temporary=True) - self.link_user_to_local_idp(user_id, email_password=True, manage_api_key=maxadmin_manage_api_key, resource_id=resource_id) - else: - # For version < 9.1, link user to local IDP without manage_api_key and resource_id - self.link_user_to_local_idp(user_id, email_password=True) - - self.add_user_to_workspace(user_id, is_workspace_admin=is_workspace_admin) - - if Version(self.mas_version) < Version('9.1'): - for mas_application_id in self.mas_workspace_application_ids: - self.await_mas_application_availability(mas_application_id) - if mas_application_id == "manage": - role = manage_role - elif mas_application_id == "facilities": - role = facilities_role - else: - # otherwise grant the user the appropriate role for their user_type - role = application_role - self.set_user_application_permission(user_id, mas_application_id, role) - - for mas_application_id in self.mas_workspace_application_ids: - self.check_user_sync(user_id, mas_application_id) + mxintadm_manage_api_key = self.create_or_get_manage_api_key_for_user(MASUserUtils.MXINTADM, temporary=True) + self.link_user_to_local_idp(user_id, email_password=True, manage_api_key=mxintadm_manage_api_key, resource_id=resource_id) if len(manage_security_groups) > 0 and "manage" in self.mas_workspace_application_ids: - if Version(self.mas_version) < Version('9.1'): - maxadmin_manage_api_key = self.create_or_get_manage_api_key_for_user(MASUserUtils.MAXADMIN, temporary=True) - for manage_security_group in manage_security_groups: - self.add_user_to_manage_group(user_id, manage_security_group, maxadmin_manage_api_key) - elif Version(self.mas_version) >= Version('9.1') and user_type == "PRIMARY" and groupreassign is not None: - if resource_id and maxadmin_manage_api_key: - self.set_user_group_reassignment_auth(user_id, resource_id, groupreassign, maxadmin_manage_api_key) + if user_type == "PRIMARY" and groupreassign is not None: + if resource_id and mxintadm_manage_api_key: + self.set_user_group_reassignment_auth(user_id, resource_id, groupreassign, mxintadm_manage_api_key) else: self.logger.warning(f"Cannot set group reassignment auth: resource_id not found for user {user_id}") diff --git a/test/src/test_users.py b/test/src/test_users.py index f190e098..0307adf3 100644 --- a/test/src/test_users.py +++ b/test/src/test_users.py @@ -140,22 +140,22 @@ def mock_manage_api_key(requests_mock): user_id = "user1" apikey = {"userid": user_id, "apikey": "test-api-key-12345", "href": f"https://{MANAGE_API_URL}/maximo/api/os/mxapikey/theapikeyid"} # pragma: allowlist secret - # Also setup for MAXADMIN user - maxadmin_apikey = {"userid": "MAXADMIN", "apikey": "maxadmin-api-key-67890", "href": f"https://{MANAGE_API_URL}/maximo/api/os/mxapikey/maxadminapikeyid"} # pragma: allowlist secret + # Also setup for MXINTADM user + mxintadm_apikey = {"userid": "MXINTADM", "apikey": "mxintadm-api-key-67890", "href": f"https://{MANAGE_API_URL}/maximo/api/os/mxapikey/mxintadmapikeyid"} # pragma: allowlist secret - def maxadmin_matcher(req): - return req.json().get("userid") == "MAXADMIN" and req.verify == PEM_PATH and req.cert == PEM_PATH + def mxintadm_matcher(req): + return req.json().get("userid") == "MXINTADM" and req.verify == PEM_PATH and req.cert == PEM_PATH def user1_matcher(req): return req.json().get("userid") == user_id and req.verify == PEM_PATH and req.cert == PEM_PATH - # Mock for MAXADMIN API key creation (returns 400 - key already exists) + # Mock for MXINTADM API key creation (returns 400 - key already exists) requests_mock.post( f"{MANAGE_API_URL}/maximo/api/os/mxapiapikey?ccm=1&lean=1", request_headers={"content-type": "application/json"}, json={"Error": {"reasonCode": "BMXAA10051E", "message": "Only one API key allowed per user"}}, status_code=400, - additional_matcher=maxadmin_matcher + additional_matcher=mxintadm_matcher ) # Mock for user1 API key creation @@ -176,16 +176,16 @@ def user1_matcher(req): additional_matcher=lambda req: additional_matcher(req, cert=PEM_PATH) ) - # Mock for MAXADMIN API key retrieval (returns existing key) + # Mock for MXINTADM API key retrieval (returns existing key) requests_mock.get( - f"{MANAGE_API_URL}/maximo/api/os/mxapiapikey?ccm=1&lean=1&oslc.select=*&oslc.where=userid=\"MAXADMIN\"", + f"{MANAGE_API_URL}/maximo/api/os/mxapiapikey?ccm=1&lean=1&oslc.select=*&oslc.where=userid=\"MXINTADM\"", request_headers={"accept": "application/json"}, - json={"member": [maxadmin_apikey]}, + json={"member": [mxintadm_apikey]}, status_code=200, additional_matcher=lambda req: additional_matcher(req, cert=PEM_PATH) ) - yield maxadmin_apikey + yield mxintadm_apikey def test_admin_internal_ca_pem_file_path(user_utils, mock_named_temporary_file, mock_atexit): @@ -2284,8 +2284,7 @@ def test_create_initial_user_for_saas( user_utils.link_user_to_local_idp.assert_called_once_with(user_id, email_password=True, manage_api_key=manage_api_key, resource_id=resource_id) else: user_utils.link_user_to_local_idp.assert_called_once_with(user_id, email_password=True) - - user_utils.add_user_to_workspace.assert_called_once_with(user_id, is_workspace_admin=is_workspace_admin) + user_utils.add_user_to_workspace.assert_called_once_with(user_id, is_workspace_admin=is_workspace_admin) # For version < 9.1, await_mas_application_availability and set_user_application_permission are called # For version >= 9.1, they are NOT called @@ -2314,7 +2313,7 @@ def test_create_initial_user_for_saas( # For version >= 9.1, API key is always created (needed for link_user_to_local_idp) # For version < 9.1, API key is only created if there are manage_security_groups if Version(mas_version) >= Version('9.1') or len(manage_security_groups) > 0: - user_utils.create_or_get_manage_api_key_for_user.assert_called_once_with("MAXADMIN", temporary=True) + user_utils.create_or_get_manage_api_key_for_user.assert_called_once_with("MXINTADM", temporary=True) else: user_utils.create_or_get_manage_api_key_for_user.assert_not_called()