diff --git a/cloudsmith_cli/cli/commands/__init__.py b/cloudsmith_cli/cli/commands/__init__.py index af10c90e..d7d588ee 100644 --- a/cloudsmith_cli/cli/commands/__init__.py +++ b/cloudsmith_cli/cli/commands/__init__.py @@ -4,6 +4,7 @@ auth, check, copy, + credential_helper, delete, dependencies, docs, diff --git a/cloudsmith_cli/cli/commands/credential_helper/__init__.py b/cloudsmith_cli/cli/commands/credential_helper/__init__.py new file mode 100644 index 00000000..10727bf4 --- /dev/null +++ b/cloudsmith_cli/cli/commands/credential_helper/__init__.py @@ -0,0 +1,31 @@ +""" +Credential helper commands for Cloudsmith. + +This module provides credential helper commands for package managers +that follow their respective credential helper protocols. +""" + +import click + +from ..main import main +from .docker import docker as docker_cmd + + +@click.group() +def credential_helper(): + """ + Credential helpers for package managers. + + These commands provide credentials for package managers like Docker. + They are typically called by wrapper binaries + (e.g., docker-credential-cloudsmith) or used directly for debugging. + + Examples: + # Test Docker credential helper + $ echo "docker.cloudsmith.io" | cloudsmith credential-helper docker + """ + + +credential_helper.add_command(docker_cmd, name="docker") + +main.add_command(credential_helper, name="credential-helper") diff --git a/cloudsmith_cli/cli/commands/credential_helper/docker.py b/cloudsmith_cli/cli/commands/credential_helper/docker.py new file mode 100644 index 00000000..8d0f99fd --- /dev/null +++ b/cloudsmith_cli/cli/commands/credential_helper/docker.py @@ -0,0 +1,80 @@ +""" +Docker credential helper command. + +Implements the Docker credential helper protocol for Cloudsmith registries. + +See: https://github.com/docker/docker-credential-helpers +""" + +import json +import sys + +import click + +from ....credential_helpers.docker import get_credentials +from ...decorators import common_api_auth_options, resolve_credentials + + +@click.command() +@common_api_auth_options +@resolve_credentials +def docker(opts): + """ + Docker credential helper for Cloudsmith registries. + + Reads a Docker registry server URL from stdin and returns credentials in JSON format. + This command implements the 'get' operation of the Docker credential helper protocol. + + Only provides credentials for Cloudsmith Docker registries (docker.cloudsmith.io). + + Input (stdin): + Server URL as plain text (e.g., "docker.cloudsmith.io") + + Output (stdout): + JSON: {"Username": "token", "Secret": ""} + + Exit codes: + 0: Success + 1: Error (no credentials available, not a Cloudsmith registry, etc.) + + Examples: + # Manual testing + $ echo "docker.cloudsmith.io" | cloudsmith credential-helper docker + {"Username":"token","Secret":"eyJ0eXAiOiJKV1Qi..."} + + # Called by Docker via wrapper + $ echo "docker.cloudsmith.io" | docker-credential-cloudsmith get + {"Username":"token","Secret":"eyJ0eXAiOiJKV1Qi..."} + + Environment variables: + CLOUDSMITH_API_KEY: API key for authentication (optional) + CLOUDSMITH_ORG: Organization slug (required for custom domain support) + """ + try: + server_url = sys.stdin.read().strip() + + if not server_url: + click.echo("Error: No server URL provided on stdin", err=True) + sys.exit(1) + + credentials = get_credentials( + server_url, + credential=opts.credential, + session=opts.session, + api_host=opts.api_host or "https://api.cloudsmith.io", + ) + + if not credentials: + click.echo( + "Error: Unable to retrieve credentials. " + "Make sure you have a valid cloudsmith-cli session, " + "this can be checked with `cloudsmith whoami`.", + err=True, + ) + sys.exit(1) + + click.echo(json.dumps(credentials)) + + except OSError as e: + click.echo(f"Error: {e}", err=True) + sys.exit(1) diff --git a/cloudsmith_cli/cli/commands/whoami.py b/cloudsmith_cli/cli/commands/whoami.py index ffbf9842..59fa31c2 100644 --- a/cloudsmith_cli/cli/commands/whoami.py +++ b/cloudsmith_cli/cli/commands/whoami.py @@ -1,14 +1,11 @@ """CLI/Commands - Retrieve authentication status.""" -import os - import click from ...core import keyring from ...core.api.exceptions import ApiException from ...core.api.user import get_token_metadata, get_user_brief from .. import decorators, utils -from ..config import CredentialsReader from ..exceptions import handle_api_exceptions from .main import main @@ -26,26 +23,17 @@ def _get_active_method(api_config): def _get_api_key_source(opts): """Determine where the API key was loaded from. - Checks in priority order matching actual resolution: - CLI --api-key flag > CLOUDSMITH_API_KEY env var > credentials.ini. + Uses the credential provider chain result attached by initialise_api. """ - if not opts.api_key: - return {"configured": False, "source": None, "source_key": None} - - env_key = os.environ.get("CLOUDSMITH_API_KEY") - - # If env var is set but differs from the resolved key, CLI flag won - if env_key and opts.api_key != env_key: - source, key = "CLI --api-key flag", "cli_flag" - elif env_key: - suffix = env_key[-4:] - source, key = f"CLOUDSMITH_API_KEY env var (ends with ...{suffix})", "env_var" - elif creds := CredentialsReader.find_existing_files(): - source, key = f"credentials.ini ({creds[0]})", "credentials_file" - else: - source, key = "CLI --api-key flag", "cli_flag" - - return {"configured": True, "source": source, "source_key": key} + credential = getattr(opts, "credential", None) + if credential: + return { + "configured": True, + "source": credential.source_detail or credential.source_name, + "source_key": credential.source_name, + } + + return {"configured": False, "source": None, "source_key": None} def _get_sso_status(api_host): diff --git a/cloudsmith_cli/cli/decorators.py b/cloudsmith_cli/cli/decorators.py index ba75e236..bda1fd69 100644 --- a/cloudsmith_cli/cli/decorators.py +++ b/cloudsmith_cli/cli/decorators.py @@ -7,7 +7,10 @@ from cloudsmith_cli.cli import validators from ..core.api.init import initialise_api as _initialise_api +from ..core.credentials.chain import CredentialProviderChain +from ..core.credentials.models import CredentialContext from ..core.mcp import server +from ..core.rest import create_requests_session as _create_session from . import config, utils @@ -20,6 +23,14 @@ def report_retry(seconds, context=None): ) +def _pop_boolean_flag(kwargs, name, invert=False): + """Pop a boolean flag from kwargs, optionally inverting it.""" + value = kwargs.pop(name) + if value is not None and invert: + value = not value + return value + + def common_package_action_options(f): """Add common options for package actions.""" @@ -214,15 +225,17 @@ def common_api_auth_options(f): def wrapper(ctx, *args, **kwargs): # pylint: disable=missing-docstring opts = config.get_or_create_options(ctx) - opts.api_key = kwargs.pop("api_key") + api_key = kwargs.pop("api_key") + if api_key: + opts.api_key = api_key kwargs["opts"] = opts return ctx.invoke(f, *args, **kwargs) return wrapper -def initialise_api(f): - """Initialise the Cloudsmith API for use.""" +def initialise_session(f): + """Create a shared HTTP session with proxy/SSL/user-agent settings.""" @click.option( "--api-host", envvar="CLOUDSMITH_API_HOST", help="The API host to connect to." @@ -252,6 +265,78 @@ def initialise_api(f): envvar="CLOUDSMITH_API_HEADERS", help="A CSV list of extra headers (key=value) to send to the API.", ) + @click.pass_context + @functools.wraps(f) + def wrapper(ctx, *args, **kwargs): + # pylint: disable=missing-docstring + opts = config.get_or_create_options(ctx) + opts.api_host = kwargs.pop("api_host") + opts.api_proxy = kwargs.pop("api_proxy") + opts.api_ssl_verify = _pop_boolean_flag( + kwargs, "without_api_ssl_verify", invert=True + ) + opts.api_user_agent = kwargs.pop("api_user_agent") + opts.api_headers = kwargs.pop("api_headers") + + opts.session = _create_session( + proxy=opts.api_proxy, + ssl_verify=opts.api_ssl_verify, + user_agent=opts.api_user_agent, + headers=opts.api_headers, + ) + + kwargs["opts"] = opts + return ctx.invoke(f, *args, **kwargs) + + return wrapper + + +def resolve_credentials(f): + """Resolve credentials via the provider chain. Depends on initialise_session.""" + + @click.pass_context + @functools.wraps(f) + def wrapper(ctx, *args, **kwargs): + # pylint: disable=missing-docstring + opts = config.get_or_create_options(ctx) + + context = CredentialContext( + session=opts.session, + api_key=opts.api_key, + api_host=opts.api_host or "https://api.cloudsmith.io", + creds_file_path=ctx.meta.get("creds_file"), + profile=ctx.meta.get("profile"), + debug=opts.debug, + ) + + chain = CredentialProviderChain() + credential = chain.resolve(context) + + if context.keyring_refresh_failed: + click.secho( + "An error occurred when attempting to refresh your SSO access token. " + "To refresh this session, run 'cloudsmith auth'", + fg="yellow", + err=True, + ) + if credential: + click.secho( + "Falling back to API key authentication.", + fg="yellow", + err=True, + ) + + opts.credential = credential + + kwargs["opts"] = opts + return ctx.invoke(f, *args, **kwargs) + + return initialise_session(wrapper) + + +def initialise_api(f): + """Initialise the Cloudsmith API for use. Depends on resolve_credentials.""" + @click.option( "-R", "--without-rate-limit", @@ -294,20 +379,8 @@ def initialise_api(f): @functools.wraps(f) def wrapper(ctx, *args, **kwargs): # pylint: disable=missing-docstring - def _set_boolean(name, invert=False): - value = kwargs.pop(name) - value = value if value is not None else None - if value is not None and invert: - value = not value - return value - opts = config.get_or_create_options(ctx) - opts.api_host = kwargs.pop("api_host") - opts.api_proxy = kwargs.pop("api_proxy") - opts.api_ssl_verify = _set_boolean("without_api_ssl_verify", invert=True) - opts.api_user_agent = kwargs.pop("api_user_agent") - opts.api_headers = kwargs.pop("api_headers") - opts.rate_limit = _set_boolean("without_rate_limit", invert=True) + opts.rate_limit = _pop_boolean_flag(kwargs, "without_rate_limit", invert=True) opts.rate_limit_warning = kwargs.pop("rate_limit_warning") opts.error_retry_max = kwargs.pop("error_retry_max") opts.error_retry_backoff = kwargs.pop("error_retry_backoff") @@ -320,7 +393,7 @@ def call_print_rate_limit_info_with_opts(rate_info): opts.api_config = _initialise_api( debug=opts.debug, host=opts.api_host, - key=opts.api_key, + credential=opts.credential, proxy=opts.api_proxy, ssl_verify=opts.api_ssl_verify, user_agent=opts.api_user_agent, @@ -336,7 +409,7 @@ def call_print_rate_limit_info_with_opts(rate_info): kwargs["opts"] = opts return ctx.invoke(f, *args, **kwargs) - return wrapper + return resolve_credentials(wrapper) def initialise_mcp(f): diff --git a/cloudsmith_cli/cli/tests/conftest.py b/cloudsmith_cli/cli/tests/conftest.py index c42f23f4..653e574c 100644 --- a/cloudsmith_cli/cli/tests/conftest.py +++ b/cloudsmith_cli/cli/tests/conftest.py @@ -5,6 +5,7 @@ from ...core.api.init import initialise_api from ...core.api.repos import create_repo, delete_repo +from ...core.credentials.models import CredentialResult from .utils import random_str @@ -51,7 +52,9 @@ def organization(): @pytest.fixture() def tmp_repository(organization, api_host, api_key): """Yield a temporary repository.""" - initialise_api(host=api_host, key=api_key) + initialise_api( + host=api_host, credential=CredentialResult(api_key=api_key, source_name="test") + ) repo_data = create_repo(organization, {"name": random_str()}) yield repo_data delete_repo(organization, repo_data["slug"]) diff --git a/cloudsmith_cli/cli/tests/test_webserver.py b/cloudsmith_cli/cli/tests/test_webserver.py index 538ad147..12ecdf77 100644 --- a/cloudsmith_cli/cli/tests/test_webserver.py +++ b/cloudsmith_cli/cli/tests/test_webserver.py @@ -40,7 +40,11 @@ def test_refresh_api_config_passes_sso_token(self): mock_init_api.assert_called_once() call_kwargs = mock_init_api.call_args.kwargs - assert call_kwargs.get("access_token") == "test_sso_token_123" + credential = call_kwargs.get("credential") + assert credential is not None + assert credential.api_key == "test_sso_token_123" + assert credential.auth_type == "bearer" + assert credential.source_name == "sso" class TestAuthenticationWebRequestHandlerKeyring: diff --git a/cloudsmith_cli/cli/webserver.py b/cloudsmith_cli/cli/webserver.py index eff5d523..060d2ff7 100644 --- a/cloudsmith_cli/cli/webserver.py +++ b/cloudsmith_cli/cli/webserver.py @@ -9,6 +9,7 @@ from ..core.api.exceptions import ApiException from ..core.api.init import initialise_api +from ..core.credentials.models import CredentialResult from ..core.keyring import store_sso_tokens from .saml import exchange_2fa_token @@ -79,7 +80,15 @@ def refresh_api_config_after_auth(self): user_agent=getattr(self.api_opts, "user_agent", None), headers=getattr(self.api_opts, "headers", None), rate_limit=getattr(self.api_opts, "rate_limit", True), - access_token=self.sso_access_token, + credential=( + CredentialResult( + api_key=self.sso_access_token, + source_name="sso", + auth_type="bearer", + ) + if self.sso_access_token + else None + ), ) def finish_request(self, request, client_address): diff --git a/cloudsmith_cli/core/api/init.py b/cloudsmith_cli/core/api/init.py index b6a856bc..ecbc8f6f 100644 --- a/cloudsmith_cli/core/api/init.py +++ b/cloudsmith_cli/core/api/init.py @@ -6,16 +6,13 @@ import click import cloudsmith_api -from ...cli import saml -from .. import keyring from ..rest import RestClient -from .exceptions import ApiException def initialise_api( debug=False, host=None, - key=None, + credential=None, proxy=None, ssl_verify=True, user_agent=None, @@ -26,7 +23,6 @@ def initialise_api( error_retry_backoff=None, error_retry_codes=None, error_retry_cb=None, - access_token=None, ): """Initialise the cloudsmith_api.Configuration.""" # FIXME: pylint: disable=too-many-arguments @@ -45,65 +41,15 @@ def initialise_api( config.verify_ssl = ssl_verify config.client_side_validation = False - # Use directly provided access token (e.g. from SSO callback), - # or fall back to keyring lookup if enabled. - if not access_token: - access_token = keyring.get_access_token(config.host) - - if access_token: - auth_header = config.headers.get("Authorization") - - # overwrite auth header if empty or is basic auth without username or password - if not auth_header or auth_header == config.get_basic_auth_token(): - refresh_token = keyring.get_refresh_token(config.host) - - try: - if keyring.should_refresh_access_token(config.host): - new_access_token, new_refresh_token = saml.refresh_access_token( - config.host, - access_token, - refresh_token, - session=saml.create_configured_session(config), - ) - keyring.store_sso_tokens( - config.host, new_access_token, new_refresh_token - ) - # Use the new tokens - access_token = new_access_token - except ApiException: - keyring.update_refresh_attempted_at(config.host) - - click.secho( - "An error occurred when attempting to refresh your SSO access token. To refresh this session, run 'cloudsmith auth'", - fg="yellow", - err=True, - ) - - # Clear access_token to prevent using expired token - access_token = None - - # Fall back to API key auth if available - if key: - click.secho( - "Falling back to API key authentication.", - fg="yellow", - err=True, - ) - config.api_key["X-Api-Key"] = key - - # Only use SSO token if refresh didn't fail - if access_token: - config.headers["Authorization"] = "Bearer {access_token}".format( - access_token=access_token - ) - - if config.debug: - click.echo("SSO access token config value set") - elif key: - config.api_key["X-Api-Key"] = key - - if config.debug: - click.echo("User API key config value set") + if credential: + if credential.auth_type == "bearer": + config.headers["Authorization"] = f"Bearer {credential.api_key}" + if config.debug: + click.echo("SSO access token config value set") + else: + config.api_key["X-Api-Key"] = credential.api_key + if config.debug: + click.echo("User API key config value set") auth_header = headers and config.headers.get("Authorization") if auth_header and " " in auth_header: diff --git a/cloudsmith_cli/core/credentials/__init__.py b/cloudsmith_cli/core/credentials/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cloudsmith_cli/core/credentials/chain.py b/cloudsmith_cli/core/credentials/chain.py new file mode 100644 index 00000000..259490fc --- /dev/null +++ b/cloudsmith_cli/core/credentials/chain.py @@ -0,0 +1,61 @@ +"""Credential provider chain for the Cloudsmith CLI. + +Implements an AWS SDK-style credential resolution chain that evaluates +credential sources sequentially and returns the first valid result. +""" + +from __future__ import annotations + +import logging + +from .models import CredentialContext, CredentialResult +from .provider import CredentialProvider + +logger = logging.getLogger(__name__) + + +class CredentialProviderChain: + """Evaluates credential providers in order, returning the first valid result. + + If no providers are given, uses the default chain: + Keyring → CLIFlag. + """ + + def __init__(self, providers: list[CredentialProvider] | None = None): + if providers is not None: + self.providers = providers + else: + from .providers import CLIFlagProvider, KeyringProvider + + self.providers = [ + KeyringProvider(), + CLIFlagProvider(), + ] + + def resolve(self, context: CredentialContext) -> CredentialResult | None: + """Evaluate each provider in order. Return the first successful result.""" + for provider in self.providers: + try: + result = provider.resolve(context) + if result is not None: + if context.debug: + logger.debug( + "Credentials resolved by %s: %s", + provider.name, + result.source_detail or result.source_name, + ) + return result + if context.debug: + logger.debug( + "Provider %s did not resolve credentials, trying next", + provider.name, + ) + except Exception: # pylint: disable=broad-exception-caught + # Intentionally broad - one provider failing shouldn't stop others + logger.debug( + "Provider %s raised an exception, skipping", + provider.name, + exc_info=True, + ) + continue + return None diff --git a/cloudsmith_cli/core/credentials/models.py b/cloudsmith_cli/core/credentials/models.py new file mode 100644 index 00000000..73b3416e --- /dev/null +++ b/cloudsmith_cli/core/credentials/models.py @@ -0,0 +1,33 @@ +"""Credential data models for the Cloudsmith CLI.""" + +from __future__ import annotations + +from dataclasses import dataclass + +import requests + + +@dataclass +class CredentialContext: + """Context passed to credential providers during resolution. + + All values are populated directly from Click options / ``opts``. + """ + + session: requests.Session | None = None + api_key: str | None = None + api_host: str = "https://api.cloudsmith.io" + creds_file_path: str | None = None + profile: str | None = None + debug: bool = False + keyring_refresh_failed: bool = False + + +@dataclass +class CredentialResult: + """Result from a successful credential resolution.""" + + api_key: str + source_name: str + source_detail: str | None = None + auth_type: str = "api_key" diff --git a/cloudsmith_cli/core/credentials/provider.py b/cloudsmith_cli/core/credentials/provider.py new file mode 100644 index 00000000..78b18886 --- /dev/null +++ b/cloudsmith_cli/core/credentials/provider.py @@ -0,0 +1,17 @@ +"""Base credential provider interface.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod + +from .models import CredentialContext, CredentialResult + + +class CredentialProvider(ABC): + """Base class for credential providers.""" + + name: str = "base" + + @abstractmethod + def resolve(self, context: CredentialContext) -> CredentialResult | None: + """Attempt to resolve credentials. Return CredentialResult or None.""" diff --git a/cloudsmith_cli/core/credentials/providers/__init__.py b/cloudsmith_cli/core/credentials/providers/__init__.py new file mode 100644 index 00000000..5482e397 --- /dev/null +++ b/cloudsmith_cli/core/credentials/providers/__init__.py @@ -0,0 +1,9 @@ +"""Credential providers for the Cloudsmith CLI.""" + +from .cli_flag import CLIFlagProvider +from .keyring_provider import KeyringProvider + +__all__ = [ + "CLIFlagProvider", + "KeyringProvider", +] diff --git a/cloudsmith_cli/core/credentials/providers/cli_flag.py b/cloudsmith_cli/core/credentials/providers/cli_flag.py new file mode 100644 index 00000000..0a05d027 --- /dev/null +++ b/cloudsmith_cli/core/credentials/providers/cli_flag.py @@ -0,0 +1,23 @@ +"""CLI flag credential provider.""" + +from __future__ import annotations + +from ..models import CredentialContext, CredentialResult +from ..provider import CredentialProvider + + +class CLIFlagProvider(CredentialProvider): + """Resolves credentials from a CLI flag value passed via CredentialContext.""" + + name = "cli_flag" + + def resolve(self, context: CredentialContext) -> CredentialResult | None: + api_key = context.api_key + if api_key and api_key.strip(): + suffix = api_key.strip()[-4:] + return CredentialResult( + api_key=api_key.strip(), + source_name="cli_flag", + source_detail=f"--api-key flag, CLOUDSMITH_API_KEY, or credentials.ini (ends with ...{suffix})", + ) + return None diff --git a/cloudsmith_cli/core/credentials/providers/keyring_provider.py b/cloudsmith_cli/core/credentials/providers/keyring_provider.py new file mode 100644 index 00000000..c8844f0c --- /dev/null +++ b/cloudsmith_cli/core/credentials/providers/keyring_provider.py @@ -0,0 +1,54 @@ +"""Keyring credential provider.""" + +from __future__ import annotations + +import logging + +from ....cli.saml import refresh_access_token +from ....core import keyring +from ..models import CredentialContext, CredentialResult +from ..provider import CredentialProvider + +logger = logging.getLogger(__name__) + + +class KeyringProvider(CredentialProvider): + """Resolves credentials from SAML tokens stored in the system keyring.""" + + name = "keyring" + + def resolve(self, context: CredentialContext) -> CredentialResult | None: + if not keyring.should_use_keyring(): + return None + + api_host = context.api_host + access_token = keyring.get_access_token(api_host) + + if not access_token: + return None + + try: + if keyring.should_refresh_access_token(api_host): + if not context.session: + return None + refresh_token = keyring.get_refresh_token(api_host) + new_access_token, new_refresh_token = refresh_access_token( + api_host, + access_token, + refresh_token, + session=context.session, + ) + keyring.store_sso_tokens(api_host, new_access_token, new_refresh_token) + access_token = new_access_token + except Exception: # pylint: disable=broad-exception-caught + keyring.update_refresh_attempted_at(api_host) + context.keyring_refresh_failed = True + logger.debug("Failed to refresh SAML token", exc_info=True) + return None + + return CredentialResult( + api_key=access_token, + source_name="keyring", + source_detail="SAML token from system keyring", + auth_type="bearer", + ) diff --git a/cloudsmith_cli/core/rest.py b/cloudsmith_cli/core/rest.py index 3820b50b..e9652931 100644 --- a/cloudsmith_cli/core/rest.py +++ b/cloudsmith_cli/core/rest.py @@ -61,28 +61,26 @@ def create_requests_session( session=None, error_retry_cb=None, respect_retry_after_header=True, + user_agent=None, + headers=None, ): """Create a requests session that retries some errors.""" # pylint: disable=too-many-branches config = Configuration() if retries is None: - if config.error_retry_max is None: # pylint: disable=no-member - retries = 5 - else: - retries = config.error_retry_max # pylint: disable=no-member + retry_max = getattr(config, "error_retry_max", None) + retries = retry_max if retry_max is not None else 5 if backoff_factor is None: - if config.error_retry_backoff is None: # pylint: disable=no-member - backoff_factor = 0.23 - else: - backoff_factor = config.error_retry_backoff # pylint: disable=no-member + retry_backoff = getattr(config, "error_retry_backoff", None) + backoff_factor = retry_backoff if retry_backoff is not None else 0.23 if status_forcelist is None: - if config.error_retry_codes is None: # pylint: disable=no-member - status_forcelist = [500, 502, 503, 504] - else: - status_forcelist = config.error_retry_codes # pylint: disable=no-member + retry_codes = getattr(config, "error_retry_codes", None) + status_forcelist = ( + retry_codes if retry_codes is not None else [500, 502, 503, 504] + ) if ssl_verify is None: ssl_verify = config.verify_ssl @@ -125,6 +123,12 @@ def create_requests_session( session.mount("http://", adapter) session.mount("https://", adapter) + if user_agent: + session.headers["User-Agent"] = user_agent + + if headers: + session.headers.update(headers) + return session diff --git a/cloudsmith_cli/core/tests/test_cli_flag_provider.py b/cloudsmith_cli/core/tests/test_cli_flag_provider.py new file mode 100644 index 00000000..dc4c0ff7 --- /dev/null +++ b/cloudsmith_cli/core/tests/test_cli_flag_provider.py @@ -0,0 +1,34 @@ +"""Tests for the CLI flag credential provider.""" + +from cloudsmith_cli.core.credentials.models import CredentialContext +from cloudsmith_cli.core.credentials.providers import CLIFlagProvider + + +class TestCLIFlagProvider: + def test_resolves_from_context(self): + provider = CLIFlagProvider() + context = CredentialContext(api_key="my-api-key-1234") + result = provider.resolve(context) + assert result is not None + assert result.api_key == "my-api-key-1234" + assert result.source_name == "cli_flag" + assert result.auth_type == "api_key" + assert "1234" in result.source_detail + + def test_returns_none_when_not_set(self): + provider = CLIFlagProvider() + context = CredentialContext(api_key=None) + result = provider.resolve(context) + assert result is None + + def test_returns_none_for_empty_value(self): + provider = CLIFlagProvider() + context = CredentialContext(api_key=" ") + result = provider.resolve(context) + assert result is None + + def test_strips_whitespace(self): + provider = CLIFlagProvider() + context = CredentialContext(api_key=" my-key ") + result = provider.resolve(context) + assert result.api_key == "my-key" diff --git a/cloudsmith_cli/core/tests/test_credential_context.py b/cloudsmith_cli/core/tests/test_credential_context.py new file mode 100644 index 00000000..49dc57d9 --- /dev/null +++ b/cloudsmith_cli/core/tests/test_credential_context.py @@ -0,0 +1,14 @@ +"""Tests for the CredentialContext class.""" + +from cloudsmith_cli.core.credentials.models import CredentialContext + + +class TestCredentialContext: + def test_keyring_refresh_failed_defaults_false(self): + context = CredentialContext() + assert context.keyring_refresh_failed is False + + def test_keyring_refresh_failed_can_be_set(self): + context = CredentialContext() + context.keyring_refresh_failed = True + assert context.keyring_refresh_failed is True diff --git a/cloudsmith_cli/core/tests/test_credential_provider_chain.py b/cloudsmith_cli/core/tests/test_credential_provider_chain.py new file mode 100644 index 00000000..58cddb00 --- /dev/null +++ b/cloudsmith_cli/core/tests/test_credential_provider_chain.py @@ -0,0 +1,77 @@ +"""Tests for the credential provider chain.""" + +from cloudsmith_cli.core.credentials.chain import CredentialProviderChain +from cloudsmith_cli.core.credentials.models import CredentialContext, CredentialResult +from cloudsmith_cli.core.credentials.provider import CredentialProvider + + +class DummyProvider(CredentialProvider): + """Test provider that returns a configurable result.""" + + def __init__(self, name, result=None, should_raise=False): + self.name = name + self._result = result + self._should_raise = should_raise + + def resolve(self, context): + if self._should_raise: + raise RuntimeError("Provider error") + return self._result + + +class TestCredentialProviderChain: + def test_first_provider_wins(self): + result1 = CredentialResult(api_key="key1", source_name="first") + result2 = CredentialResult(api_key="key2", source_name="second") + chain = CredentialProviderChain( + [ + DummyProvider("p1", result=result1), + DummyProvider("p2", result=result2), + ] + ) + result = chain.resolve(CredentialContext()) + assert result.api_key == "key1" + assert result.source_name == "first" + + def test_falls_through_to_second(self): + result2 = CredentialResult(api_key="key2", source_name="second") + chain = CredentialProviderChain( + [ + DummyProvider("p1", result=None), + DummyProvider("p2", result=result2), + ] + ) + result = chain.resolve(CredentialContext()) + assert result.api_key == "key2" + + def test_returns_none_when_all_fail(self): + chain = CredentialProviderChain( + [ + DummyProvider("p1", result=None), + DummyProvider("p2", result=None), + ] + ) + result = chain.resolve(CredentialContext()) + assert result is None + + def test_skips_erroring_provider(self): + result2 = CredentialResult(api_key="key2", source_name="second") + chain = CredentialProviderChain( + [ + DummyProvider("p1", should_raise=True), + DummyProvider("p2", result=result2), + ] + ) + result = chain.resolve(CredentialContext()) + assert result.api_key == "key2" + + def test_empty_chain(self): + chain = CredentialProviderChain([]) + result = chain.resolve(CredentialContext()) + assert result is None + + def test_default_chain_order(self): + chain = CredentialProviderChain() + assert len(chain.providers) == 2 + assert chain.providers[0].name == "keyring" + assert chain.providers[1].name == "cli_flag" diff --git a/cloudsmith_cli/core/tests/test_init.py b/cloudsmith_cli/core/tests/test_init.py index 5906796c..f0b691e6 100644 --- a/cloudsmith_cli/core/tests/test_init.py +++ b/cloudsmith_cli/core/tests/test_init.py @@ -1,62 +1,8 @@ -import os -from unittest.mock import patch - -import pytest from cloudsmith_api import Configuration -from ...cli import saml -from .. import keyring from ..api.init import initialise_api -@pytest.fixture -def mocked_get_access_token(): - with patch.object( - keyring, "get_access_token", return_value="dummy_access_token" - ) as get_access_token_mock: - yield get_access_token_mock - - -@pytest.fixture -def mocked_get_refresh_token(): - with patch.object( - keyring, "get_refresh_token", return_value="dummy_refresh_token" - ) as get_refresh_token_mock: - yield get_refresh_token_mock - - -@pytest.fixture -def mocked_should_refresh_access_token(): - with patch.object( - keyring, "should_refresh_access_token", return_value=False - ) as should_refresh_access_token_mock: - yield should_refresh_access_token_mock - - -@pytest.fixture -def mocked_refresh_access_token(): - with patch.object( - saml, - "refresh_access_token", - return_value=("new_access_token", "new_refresh_token"), - ) as refresh_access_token_mock: - yield refresh_access_token_mock - - -@pytest.fixture -def mocked_store_sso_tokens(): - with patch.object(keyring, "store_sso_tokens") as store_sso_tokens_mock: - yield store_sso_tokens_mock - - -@pytest.fixture -def mocked_update_refresh_attempted_at(): - with patch.object( - keyring, "update_refresh_attempted_at" - ) as update_refresh_attempted_at_mock: - yield update_refresh_attempted_at_mock - - class TestInitialiseApi: def setup_class(cls): # pylint: disable=no-self-argument # For the purposes of these tests, we need to explicitly call set_default(None) at the @@ -65,14 +11,10 @@ def setup_class(cls): # pylint: disable=no-self-argument # Configuration class to its vanilla, unmodified behaviour/state. Configuration.set_default(None) - def test_initialise_api_sets_cloudsmith_api_config_default( - self, mocked_get_access_token - ): + def test_initialise_api_sets_cloudsmith_api_config_default(self): """Assert that the extra attributes we add to the cloudsmith_cli.Configuration class are present on newly-created instances of that class. """ - mocked_get_access_token.return_value = None - # Read and understand the Configuration class's initialiser. # Notice how the _default class attribute is used if not None. # https://github.com/cloudsmith-io/cloudsmith-api/blob/57963fff5b7818783b3d87246495275545d505df/bindings/python/src/cloudsmith_api/configuration.py#L32-L40 @@ -122,64 +64,49 @@ def test_initialise_api_sets_cloudsmith_api_config_default( is not new_config_after_initialise ) - def test_initialise_api_with_refreshable_access_token_set( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - mocked_refresh_access_token, - mocked_store_sso_tokens, - mocked_update_refresh_attempted_at, - ): - mocked_should_refresh_access_token.return_value = True - - # Ensure keyring is enabled for this test - env = os.environ.copy() - env.pop("CLOUDSMITH_NO_KEYRING", None) - with patch.dict(os.environ, env, clear=True): - config = initialise_api(host="https://example.com") + def test_initialise_api_sets_bearer_auth_with_access_token(self): + """Verify access_token is set as Bearer auth header.""" + from cloudsmith_cli.core.credentials.models import CredentialResult - assert config.headers == {"Authorization": "Bearer new_access_token"} - mocked_refresh_access_token.assert_called_once() - mocked_store_sso_tokens.assert_called_once_with( - "https://example.com", "new_access_token", "new_refresh_token" + credential = CredentialResult( + api_key="test_access_token", source_name="test", auth_type="bearer" ) + config = initialise_api( + host="https://example.com", + credential=credential, + ) + assert config.headers == {"Authorization": "Bearer test_access_token"} - def test_initialise_api_with_recently_refreshed_access_token_and_empty_basic_auth_set( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - mocked_refresh_access_token, - mocked_store_sso_tokens, - mocked_update_refresh_attempted_at, - ): - auth_header = Configuration().get_basic_auth_token() + def test_initialise_api_sets_api_key(self): + """Verify key is set as X-Api-Key header.""" + from cloudsmith_cli.core.credentials.models import CredentialResult - # Ensure keyring is enabled for this test - env = os.environ.copy() - env.pop("CLOUDSMITH_NO_KEYRING", None) - with patch.dict(os.environ, env, clear=True): - config = initialise_api( - host="https://example.com", headers={"Authorization": auth_header} - ) + credential = CredentialResult( + api_key="test_api_key", source_name="test", auth_type="api_key" + ) + config = initialise_api( + host="https://example.com", + credential=credential, + ) + assert config.api_key["X-Api-Key"] == "test_api_key" - assert config.headers == {"Authorization": "Bearer dummy_access_token"} - assert config.username == "" - assert config.password == "" - mocked_refresh_access_token.assert_not_called() - mocked_store_sso_tokens.assert_not_called() - mocked_update_refresh_attempted_at.assert_not_called() + def test_initialise_api_bearer_credential(self): + """Verify bearer credential sets Authorization header, not X-Api-Key.""" + from cloudsmith_cli.core.credentials.models import CredentialResult - def test_initialise_api_with_recently_refreshed_access_token_and_present_basic_auth( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - mocked_refresh_access_token, - mocked_store_sso_tokens, - mocked_update_refresh_attempted_at, - ): + Configuration.set_default(None) + credential = CredentialResult( + api_key="test_access_token", source_name="test", auth_type="bearer" + ) + config = initialise_api( + host="https://example.com", + credential=credential, + ) + assert config.headers == {"Authorization": "Bearer test_access_token"} + assert "X-Api-Key" not in config.api_key + + def test_initialise_api_with_basic_auth_header(self): + """Verify basic auth header is parsed into username and password.""" temp_config = Configuration() temp_config.username = "username" temp_config.password = "password" @@ -191,181 +118,3 @@ def test_initialise_api_with_recently_refreshed_access_token_and_present_basic_a assert config.headers == {"Authorization": auth_header} assert config.username == "username" assert config.password == "password" - mocked_refresh_access_token.assert_not_called() - mocked_store_sso_tokens.assert_not_called() - mocked_update_refresh_attempted_at.assert_not_called() - - def test_initialise_api_skips_keyring_when_env_var_set( - self, - mocked_get_access_token, - ): - """Verify keyring returns None when CLOUDSMITH_NO_KEYRING=1.""" - mocked_get_access_token.return_value = None - with patch.dict(os.environ, {"CLOUDSMITH_NO_KEYRING": "1"}): - config = initialise_api(host="https://example.com", key="test_api_key") - - # get_access_token is called but returns None due to internal guard - mocked_get_access_token.assert_called_once() - # API key should be used instead - assert config.api_key["X-Api-Key"] == "test_api_key" - - def test_initialise_api_uses_keyring_when_env_var_not_set( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - ): - """Verify keyring is accessed when CLOUDSMITH_NO_KEYRING is not set.""" - env = os.environ.copy() - env.pop("CLOUDSMITH_NO_KEYRING", None) - with patch.dict(os.environ, env, clear=True): - config = initialise_api(host="https://example.com") - - # Keyring should be accessed - mocked_get_access_token.assert_called_once() - assert config.headers == {"Authorization": "Bearer dummy_access_token"} - - def test_initialise_api_falls_back_to_api_key_when_sso_refresh_fails( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - mocked_refresh_access_token, - mocked_store_sso_tokens, - mocked_update_refresh_attempted_at, - ): - """Verify API key is used as fallback when SSO token refresh fails.""" - from ..api.exceptions import ApiException - - # Simulate SSO token refresh failure - mocked_should_refresh_access_token.return_value = True - mocked_refresh_access_token.side_effect = ApiException( - status=401, detail="Unauthorized" - ) - - # Ensure keyring is enabled for this test - env = os.environ.copy() - env.pop("CLOUDSMITH_NO_KEYRING", None) - with patch.dict(os.environ, env, clear=True): - config = initialise_api(host="https://example.com", key="fallback_api_key") - - # Should not use expired SSO token - assert ( - "Authorization" not in config.headers - or config.headers.get("Authorization") != "Bearer dummy_access_token" - ) - # Should fall back to API key - assert config.api_key["X-Api-Key"] == "fallback_api_key" - mocked_update_refresh_attempted_at.assert_called_once() - mocked_store_sso_tokens.assert_not_called() - - def test_initialise_api_no_auth_when_sso_refresh_fails_without_api_key( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - mocked_refresh_access_token, - mocked_store_sso_tokens, - mocked_update_refresh_attempted_at, - ): - """Verify expired SSO token is not used when refresh fails and no API key available.""" - from ..api.exceptions import ApiException - - # Reset Configuration to clear any state from previous tests - Configuration.set_default(None) - - # Simulate SSO token refresh failure - mocked_should_refresh_access_token.return_value = True - mocked_refresh_access_token.side_effect = ApiException( - status=401, detail="Unauthorized" - ) - - # Ensure keyring is enabled for this test - env = os.environ.copy() - env.pop("CLOUDSMITH_NO_KEYRING", None) - with patch.dict(os.environ, env, clear=True): - config = initialise_api(host="https://example.com", key=None) - - # Should not use expired SSO token - assert ( - "Authorization" not in config.headers - or config.headers.get("Authorization") != "Bearer dummy_access_token" - ) - # Should not have API key either - assert "X-Api-Key" not in config.api_key - mocked_update_refresh_attempted_at.assert_called_once() - mocked_store_sso_tokens.assert_not_called() - - def test_initialise_api_uses_direct_access_token_when_keyring_disabled( - self, - mocked_get_access_token, - ): - """Verify a directly provided access_token is used even when keyring is disabled. - - This is the critical path for --request-api-key with CLOUDSMITH_NO_KEYRING=1. - The SSO callback provides the access token directly, bypassing keyring storage. - """ - with patch.dict(os.environ, {"CLOUDSMITH_NO_KEYRING": "1"}): - config = initialise_api( - host="https://example.com", - access_token="sso_direct_token_abc123", - ) - - # Keyring should NOT be accessed - mocked_get_access_token.assert_not_called() - # The directly provided access token should be used as Bearer auth - assert config.headers == {"Authorization": "Bearer sso_direct_token_abc123"} - - def test_initialise_api_direct_access_token_takes_precedence_over_keyring( - self, - mocked_get_access_token, - mocked_should_refresh_access_token, - ): - """Verify a directly provided access_token takes precedence over keyring.""" - env = os.environ.copy() - env.pop("CLOUDSMITH_NO_KEYRING", None) - with patch.dict(os.environ, env, clear=True): - config = initialise_api( - host="https://example.com", - access_token="direct_token_xyz", - ) - - # Keyring should NOT be accessed because we have a direct token - mocked_get_access_token.assert_not_called() - # The direct access token should be used - assert config.headers == {"Authorization": "Bearer direct_token_xyz"} - - def test_initialise_api_direct_access_token_skips_refresh( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - mocked_refresh_access_token, - mocked_store_sso_tokens, - mocked_update_refresh_attempted_at, - ): - """Verify a directly provided access_token skips the refresh cycle entirely. - - When the SSO callback provides a fresh token - directly (e.g. for --request-api-key with CLOUDSMITH_NO_KEYRING=1), - we must NOT attempt to refresh it. The refresh path would fail because - there is no refresh_token in keyring, clearing the access_token and - leaving zero authentication. - """ - with patch.dict(os.environ, {"CLOUDSMITH_NO_KEYRING": "1"}): - config = initialise_api( - host="https://example.com", - access_token="fresh_sso_token", - ) - - # Keyring lookup should be skipped (direct token provided) - mocked_get_access_token.assert_not_called() - # should_refresh_access_token is called but returns False - # due to internal should_use_keyring() guard - mocked_should_refresh_access_token.assert_called_once() - # Refresh logic should NOT be triggered - mocked_refresh_access_token.assert_not_called() - mocked_store_sso_tokens.assert_not_called() - mocked_update_refresh_attempted_at.assert_not_called() - # The fresh SSO token should be used as-is - assert config.headers == {"Authorization": "Bearer fresh_sso_token"} diff --git a/cloudsmith_cli/core/tests/test_keyring_provider.py b/cloudsmith_cli/core/tests/test_keyring_provider.py new file mode 100644 index 00000000..dcd1ba52 --- /dev/null +++ b/cloudsmith_cli/core/tests/test_keyring_provider.py @@ -0,0 +1,81 @@ +"""Tests for the keyring credential provider.""" + +import os +from unittest.mock import MagicMock, patch + +from cloudsmith_cli.core.credentials.models import CredentialContext +from cloudsmith_cli.core.credentials.providers import KeyringProvider + + +class TestKeyringProvider: + def test_returns_none_when_keyring_disabled(self): + provider = KeyringProvider() + with patch.dict(os.environ, {"CLOUDSMITH_NO_KEYRING": "1"}): + result = provider.resolve(CredentialContext()) + assert result is None + + def test_returns_none_when_no_token(self): + from cloudsmith_cli.core import keyring + + provider = KeyringProvider() + env = os.environ.copy() + env.pop("CLOUDSMITH_NO_KEYRING", None) + with patch.dict(os.environ, env, clear=True): + with patch.object(keyring, "should_use_keyring", return_value=True): + with patch.object(keyring, "get_access_token", return_value=None): + result = provider.resolve(CredentialContext()) + assert result is None + + def test_returns_bearer_token(self): + from cloudsmith_cli.core import keyring + + provider = KeyringProvider() + env = os.environ.copy() + env.pop("CLOUDSMITH_NO_KEYRING", None) + with patch.dict(os.environ, env, clear=True): + with patch.object(keyring, "should_use_keyring", return_value=True): + with patch.object( + keyring, "get_access_token", return_value="sso_token" + ): + with patch.object( + keyring, "should_refresh_access_token", return_value=False + ): + result = provider.resolve(CredentialContext()) + assert result is not None + assert result.api_key == "sso_token" + assert result.auth_type == "bearer" + assert result.source_name == "keyring" + + def test_returns_none_on_refresh_failure(self): + from cloudsmith_cli.cli import saml + from cloudsmith_cli.core import keyring + from cloudsmith_cli.core.api.exceptions import ApiException + + provider = KeyringProvider() + context = CredentialContext(session=MagicMock()) + env = os.environ.copy() + env.pop("CLOUDSMITH_NO_KEYRING", None) + with patch.dict(os.environ, env, clear=True): + with patch.object(keyring, "should_use_keyring", return_value=True): + with patch.object( + keyring, "get_access_token", return_value="old_token" + ): + with patch.object( + keyring, "should_refresh_access_token", return_value=True + ): + with patch.object( + keyring, "get_refresh_token", return_value="refresh_tok" + ): + with patch.object( + saml, + "refresh_access_token", + side_effect=ApiException( + status=401, detail="Unauthorized" + ), + ): + with patch.object( + keyring, "update_refresh_attempted_at" + ): + result = provider.resolve(context) + assert result is None + assert context.keyring_refresh_failed is True diff --git a/cloudsmith_cli/core/tests/test_rest.py b/cloudsmith_cli/core/tests/test_rest.py index 364af4e7..d2a6e6d8 100644 --- a/cloudsmith_cli/core/tests/test_rest.py +++ b/cloudsmith_cli/core/tests/test_rest.py @@ -2,12 +2,24 @@ import pytest from ..api.init import initialise_api -from ..rest import RestClient +from ..rest import RestClient, create_requests_session + + +@pytest.fixture(autouse=True) +def patch_httpretty_socket(monkeypatch): + """Patch httpretty's fake socket to handle shutdown() which urllib3 2.0+ calls.""" + import httpretty.core + + monkeypatch.setattr( + httpretty.core.fakesock.socket, + "shutdown", + lambda self, how: None, + raising=False, + ) class TestRestClient: @httpretty.activate(allow_net_connect=False, verbose=True) - @pytest.mark.usefixtures("mock_keyring") def test_implicit_retry_for_status_codes(self): """Assert that the rest client retries certain status codes automatically.""" # initialise_api() needs to be called before RestClient can be instantiated, @@ -41,30 +53,15 @@ def test_implicit_retry_for_status_codes(self): assert r.status == 200 -@pytest.fixture -def mock_keyring(monkeypatch): - """Mock keyring functions to prevent reading real SSO tokens from the system keyring. - - This is necessary because initialise_api() checks the keyring for SSO tokens, - and if found, it attempts to refresh them via a network request. When running - this test in isolation with httpretty mocking enabled, that network request - will fail because it's not mocked. - """ - # Import here to avoid circular imports - import httpretty.core - - from .. import keyring +class TestCreateRequestsSession: + @pytest.fixture(autouse=True) + def setup(self): + initialise_api() - # Mock all keyring getter functions to return None/False - monkeypatch.setattr(keyring, "get_access_token", lambda api_host: None) - monkeypatch.setattr(keyring, "get_refresh_token", lambda api_host: None) - monkeypatch.setattr(keyring, "should_refresh_access_token", lambda api_host: False) + def test_sets_user_agent_header(self): + session = create_requests_session(user_agent="test-agent/1.0") + assert session.headers["User-Agent"] == "test-agent/1.0" - # Patch httpretty's fake socket to handle shutdown() which urllib3 2.0+ calls - # This fixes: "Failed to socket.shutdown because a real socket does not exist" - monkeypatch.setattr( - httpretty.core.fakesock.socket, - "shutdown", - lambda self, how: None, - raising=False, - ) + def test_sets_extra_headers(self): + session = create_requests_session(headers={"X-Custom": "value"}) + assert session.headers["X-Custom"] == "value" diff --git a/cloudsmith_cli/credential_helpers/__init__.py b/cloudsmith_cli/credential_helpers/__init__.py new file mode 100644 index 00000000..6e6715ce --- /dev/null +++ b/cloudsmith_cli/credential_helpers/__init__.py @@ -0,0 +1,6 @@ +""" +Credential helpers for various package managers. + +This package provides credential helper implementations for Docker, pip, npm, etc. +Each helper follows its respective package manager's credential helper protocol. +""" diff --git a/cloudsmith_cli/credential_helpers/common.py b/cloudsmith_cli/credential_helpers/common.py new file mode 100644 index 00000000..300d468b --- /dev/null +++ b/cloudsmith_cli/credential_helpers/common.py @@ -0,0 +1,86 @@ +""" +Shared utilities for credential helpers. + +Provides domain checking used by all credential helpers. +""" + +import logging +import os + +logger = logging.getLogger(__name__) + + +def extract_hostname(url): + """ + Extract bare hostname from any URL format. + + Handles protocols, sparse+ prefix, ports, paths, and trailing slashes. + + Args: + url: URL in any format (e.g., "sparse+https://cargo.cloudsmith.io/org/repo/") + + Returns: + str: Lowercase hostname (e.g., "cargo.cloudsmith.io") + """ + if not url: + return "" + + normalized = url.lower().strip() + + # Remove sparse+ prefix (Cargo) + if normalized.startswith("sparse+"): + normalized = normalized[7:] + + # Remove protocol + if "://" in normalized: + normalized = normalized.split("://", 1)[1] + + # Remove userinfo (user@host) + if "@" in normalized.split("/")[0]: + normalized = normalized.split("@", 1)[1] + + # Extract hostname (before first / or :) + hostname = normalized.split("/")[0].split(":")[0] + + return hostname + + +def is_cloudsmith_domain(url, session=None, api_key=None, api_host=None): + """ + Check if a URL points to a Cloudsmith service. + + Checks standard *.cloudsmith.io domains first (no auth needed). + If not a standard domain, queries the Cloudsmith API for custom domains. + + Args: + url: URL or hostname to check + session: Pre-configured requests.Session with proxy/SSL settings + api_key: API key for authenticating custom domain lookups + api_host: Cloudsmith API host URL + + Returns: + bool: True if this is a Cloudsmith domain + """ + hostname = extract_hostname(url) + if not hostname: + return False + + # Standard Cloudsmith domains — no auth needed + if hostname.endswith("cloudsmith.io") or hostname == "cloudsmith.io": + return True + + # Custom domains require org + auth + org = os.environ.get("CLOUDSMITH_ORG", "").strip() + if not org: + return False + + if not api_key: + return False + + from .custom_domains import get_custom_domains_for_org + + custom_domains = get_custom_domains_for_org( + org, session=session, api_key=api_key, api_host=api_host + ) + + return hostname in [d.lower() for d in custom_domains] diff --git a/cloudsmith_cli/credential_helpers/custom_domains.py b/cloudsmith_cli/credential_helpers/custom_domains.py new file mode 100644 index 00000000..790ec44d --- /dev/null +++ b/cloudsmith_cli/credential_helpers/custom_domains.py @@ -0,0 +1,199 @@ +""" +Helper for discovering Cloudsmith custom domains. + +This module provides functions to fetch custom domains from the Cloudsmith API +for use in credential helpers. Results are cached on the filesystem. +""" + +import json +import logging +import time +from pathlib import Path +from typing import List, Optional + +import requests + +logger = logging.getLogger(__name__) + +# Cache custom domains for 1 hour +CACHE_TTL_SECONDS = 3600 + + +def get_cache_dir() -> Path: + """ + Get the cache directory for custom domains. + + Returns: + Path to cache directory (e.g., ~/.cloudsmith/cache/custom_domains/) + """ + home = Path.home() + cache_dir = home / ".cloudsmith" / "cache" / "custom_domains" + cache_dir.mkdir(parents=True, exist_ok=True) + return cache_dir + + +def get_cache_path(org: str) -> Path: + """ + Get the cache file path for an organization's custom domains. + + Args: + org: Organization slug + + Returns: + Path to cache file + """ + cache_dir = get_cache_dir() + safe_org = "".join(c if c.isalnum() or c in "-_" else "_" for c in org) + return cache_dir / f"{safe_org}.json" + + +def is_cache_valid(cache_path: Path) -> bool: + """ + Check if a cache file exists and is still valid. + + Args: + cache_path: Path to cache file + + Returns: + bool: True if cache exists and hasn't expired + """ + if not cache_path.exists(): + return False + + try: + mtime = cache_path.stat().st_mtime + age = time.time() - mtime + return age < CACHE_TTL_SECONDS + except OSError: + return False + + +def read_cache(cache_path: Path) -> Optional[List[str]]: + """ + Read custom domains from cache file. + + Args: + cache_path: Path to cache file + + Returns: + List of domain strings or None if cache invalid/missing + """ + if not is_cache_valid(cache_path): + return None + + try: + with open(cache_path, encoding="utf-8") as f: + data = json.load(f) + if isinstance(data, dict) and "domains" in data: + domains = data["domains"] + if isinstance(domains, list): + logger.debug( + "Read %d domains from cache: %s", len(domains), cache_path + ) + return domains + except (OSError, json.JSONDecodeError) as exc: + logger.debug("Failed to read cache %s: %s", cache_path, exc) + + return None + + +def write_cache(cache_path: Path, domains: List[str]) -> None: + """ + Write custom domains to cache file. + + Args: + cache_path: Path to cache file + domains: List of domain strings to cache + """ + try: + data = { + "domains": domains, + "cached_at": time.time(), + } + with open(cache_path, "w", encoding="utf-8") as f: + json.dump(data, f) + logger.debug("Wrote %d domains to cache: %s", len(domains), cache_path) + except OSError as exc: + logger.debug("Failed to write cache %s: %s", cache_path, exc) + + +def get_custom_domains_for_org( # pylint: disable=too-many-return-statements + org: str, + session=None, + api_key: str = None, + api_host: str = None, +) -> List[str]: + """ + Fetch custom domains for a Cloudsmith organization. + + Results are cached on the filesystem for 1 hour to avoid excessive API calls. + + Args: + org: Organization slug + session: Pre-configured requests.Session with proxy/SSL settings. + If None, a plain requests session is used. + api_key: Optional API key for authentication + api_host: Cloudsmith API host URL. Defaults to https://api.cloudsmith.io. + + Returns: + List of custom domain strings (e.g., ['docker.customer.com', 'dl.customer.com']) + Empty list if API call fails or org has no custom domains + """ + cache_path = get_cache_path(org) + cached_domains = read_cache(cache_path) + if cached_domains is not None: + logger.debug("Using cached custom domains for %s", org) + return cached_domains + + logger.debug("Fetching custom domains from API for %s", org) + + try: + if session is None: + session = requests.Session() + + if api_key: + session.headers["Authorization"] = f"Bearer {api_key}" + + host = api_host or "https://api.cloudsmith.io" + url = f"{host}/orgs/{org}/custom-domains/" + + response = session.get(url, timeout=10) + + if response.status_code in (401, 403): + logger.debug( + "Custom domains API requires auth - assuming no custom domains for %s", + org, + ) + return [] # Don't cache 401/403 - might work later with auth + + if response.status_code == 404: + logger.debug("Organization %s not found or has no custom domains", org) + write_cache(cache_path, []) # Cache empty result to avoid repeated 404s + return [] + + if response.status_code != 200: + logger.debug( + "Failed to fetch custom domains for %s: HTTP %d", + org, + response.status_code, + ) + return [] + + data = response.json() + + # Expected format: [{"host": "docker.customer.com", ...}, ...] + domains = [] + if isinstance(data, list): + for item in data: + if isinstance(item, dict) and "host" in item: + domains.append(item["host"]) + + logger.debug("Fetched %d custom domains for %s", len(domains), org) + + write_cache(cache_path, domains) + + return domains + + except (requests.RequestException, ValueError) as exc: + logger.debug("Error fetching custom domains: %s", exc) + return [] diff --git a/cloudsmith_cli/credential_helpers/docker/__init__.py b/cloudsmith_cli/credential_helpers/docker/__init__.py new file mode 100644 index 00000000..a96d42d1 --- /dev/null +++ b/cloudsmith_cli/credential_helpers/docker/__init__.py @@ -0,0 +1,3 @@ +from .credentials import get_credentials + +__all__ = ["get_credentials"] diff --git a/cloudsmith_cli/credential_helpers/docker/credentials.py b/cloudsmith_cli/credential_helpers/docker/credentials.py new file mode 100644 index 00000000..a5f4e9bd --- /dev/null +++ b/cloudsmith_cli/credential_helpers/docker/credentials.py @@ -0,0 +1,38 @@ +""" +Docker credential helper logic for Cloudsmith. + +This module provides functions for retrieving credentials for Docker registries +using the existing Cloudsmith credential provider chain (OIDC, API keys, config, keyring). +""" + +from ..common import is_cloudsmith_domain + + +def get_credentials(server_url, credential=None, session=None, api_host=None): + """ + Get credentials for a Cloudsmith Docker registry. + + Verifies the URL is a Cloudsmith registry (including custom domains) + and returns credentials if available. + + Args: + server_url: The Docker registry server URL + credential: Pre-resolved CredentialResult from the provider chain + session: Pre-configured requests.Session with proxy/SSL settings + api_host: Cloudsmith API host URL + + Returns: + dict: Credentials with 'Username' and 'Secret' keys, or None + """ + if not credential or not credential.api_key: + return None + + if not is_cloudsmith_domain( + server_url, + session=session, + api_key=credential.api_key, + api_host=api_host, + ): + return None + + return {"Username": "token", "Secret": credential.api_key} diff --git a/cloudsmith_cli/credential_helpers/docker/wrapper.py b/cloudsmith_cli/credential_helpers/docker/wrapper.py new file mode 100644 index 00000000..85f050ee --- /dev/null +++ b/cloudsmith_cli/credential_helpers/docker/wrapper.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python +""" +Wrapper for docker-credential-cloudsmith. + +This is the entry point binary that Docker calls. It delegates to the main +cloudsmith credential-helper docker command. + +See: https://github.com/docker/docker-credential-helpers + +Configure in ~/.docker/config.json: + { + "credHelpers": { + "docker.cloudsmith.io": "cloudsmith" + } + } +""" +import subprocess +import sys + + +def main(): + """ + Docker credential helper wrapper. + + Docker calls this with the operation as argv[1]: + - get: Retrieve credentials + - store: Store credentials (not supported) + - erase: Erase credentials (not supported) + - list: List credentials (not supported) + + We only support 'get' and delegate to: cloudsmith credential-helper docker + """ + if len(sys.argv) < 2: + print( + "Error: Missing operation argument. " + "Usage: docker-credential-cloudsmith ", + file=sys.stderr, + ) + sys.exit(1) + + operation = sys.argv[1] + + if operation == "get": + try: + result = subprocess.run( + ["cloudsmith", "credential-helper", "docker"], + stdin=sys.stdin, + capture_output=False, + check=False, + ) + sys.exit(result.returncode) + except FileNotFoundError: + print( + "Error: 'cloudsmith' command not found. " + "Make sure cloudsmith-cli is installed.", + file=sys.stderr, + ) + sys.exit(1) + elif operation in ("store", "erase", "list"): + print( + f"Error: Operation '{operation}' is not supported. " + "Only 'get' is available for Cloudsmith credential helper.", + file=sys.stderr, + ) + sys.exit(1) + else: + print( + f"Error: Unknown operation '{operation}'. " + "Valid operations: get, store, erase, list", + file=sys.stderr, + ) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index cf013b1f..d05aa94d 100644 --- a/setup.py +++ b/setup.py @@ -65,7 +65,10 @@ def get_long_description(): "urllib3>=2.5", ], entry_points={ - "console_scripts": ["cloudsmith=cloudsmith_cli.cli.commands.main:main"] + "console_scripts": [ + "cloudsmith=cloudsmith_cli.cli.commands.main:main", + "docker-credential-cloudsmith=cloudsmith_cli.credential_helpers.docker.wrapper:main", + ] }, keywords=["cloudsmith", "cli", "devops"], classifiers=[