From 40a9961edff9f1167eeab75e0fa0a534c991e5d0 Mon Sep 17 00:00:00 2001 From: Ian Duffy Date: Thu, 12 Mar 2026 13:04:35 +0000 Subject: [PATCH 1/2] feat: add credential provider chain concept --- cloudsmith_cli/cli/commands/whoami.py | 32 +- cloudsmith_cli/cli/decorators.py | 108 +++++- cloudsmith_cli/cli/tests/conftest.py | 5 +- cloudsmith_cli/cli/tests/test_webserver.py | 6 +- cloudsmith_cli/cli/webserver.py | 11 +- cloudsmith_cli/core/api/init.py | 74 +--- cloudsmith_cli/core/credentials/__init__.py | 100 ++++++ .../core/credentials/providers/__init__.py | 9 + .../core/credentials/providers/cli_flag.py | 22 ++ .../credentials/providers/keyring_provider.py | 54 +++ cloudsmith_cli/core/credentials/session.py | 63 ++++ .../core/tests/test_cli_flag_provider.py | 34 ++ .../core/tests/test_credential_context.py | 14 + .../tests/test_credential_provider_chain.py | 80 +++++ cloudsmith_cli/core/tests/test_init.py | 327 ++---------------- .../core/tests/test_keyring_provider.py | 81 +++++ cloudsmith_cli/core/tests/test_rest.py | 43 +-- 17 files changed, 637 insertions(+), 426 deletions(-) create mode 100644 cloudsmith_cli/core/credentials/__init__.py create mode 100644 cloudsmith_cli/core/credentials/providers/__init__.py create mode 100644 cloudsmith_cli/core/credentials/providers/cli_flag.py create mode 100644 cloudsmith_cli/core/credentials/providers/keyring_provider.py create mode 100644 cloudsmith_cli/core/credentials/session.py create mode 100644 cloudsmith_cli/core/tests/test_cli_flag_provider.py create mode 100644 cloudsmith_cli/core/tests/test_credential_context.py create mode 100644 cloudsmith_cli/core/tests/test_credential_provider_chain.py create mode 100644 cloudsmith_cli/core/tests/test_keyring_provider.py diff --git a/cloudsmith_cli/cli/commands/whoami.py b/cloudsmith_cli/cli/commands/whoami.py index ffbf9842..59fa31c2 100644 --- a/cloudsmith_cli/cli/commands/whoami.py +++ b/cloudsmith_cli/cli/commands/whoami.py @@ -1,14 +1,11 @@ """CLI/Commands - Retrieve authentication status.""" -import os - import click from ...core import keyring from ...core.api.exceptions import ApiException from ...core.api.user import get_token_metadata, get_user_brief from .. import decorators, utils -from ..config import CredentialsReader from ..exceptions import handle_api_exceptions from .main import main @@ -26,26 +23,17 @@ def _get_active_method(api_config): def _get_api_key_source(opts): """Determine where the API key was loaded from. - Checks in priority order matching actual resolution: - CLI --api-key flag > CLOUDSMITH_API_KEY env var > credentials.ini. + Uses the credential provider chain result attached by initialise_api. """ - if not opts.api_key: - return {"configured": False, "source": None, "source_key": None} - - env_key = os.environ.get("CLOUDSMITH_API_KEY") - - # If env var is set but differs from the resolved key, CLI flag won - if env_key and opts.api_key != env_key: - source, key = "CLI --api-key flag", "cli_flag" - elif env_key: - suffix = env_key[-4:] - source, key = f"CLOUDSMITH_API_KEY env var (ends with ...{suffix})", "env_var" - elif creds := CredentialsReader.find_existing_files(): - source, key = f"credentials.ini ({creds[0]})", "credentials_file" - else: - source, key = "CLI --api-key flag", "cli_flag" - - return {"configured": True, "source": source, "source_key": key} + credential = getattr(opts, "credential", None) + if credential: + return { + "configured": True, + "source": credential.source_detail or credential.source_name, + "source_key": credential.source_name, + } + + return {"configured": False, "source": None, "source_key": None} def _get_sso_status(api_host): diff --git a/cloudsmith_cli/cli/decorators.py b/cloudsmith_cli/cli/decorators.py index ba75e236..06ce7836 100644 --- a/cloudsmith_cli/cli/decorators.py +++ b/cloudsmith_cli/cli/decorators.py @@ -7,6 +7,8 @@ from cloudsmith_cli.cli import validators from ..core.api.init import initialise_api as _initialise_api +from ..core.credentials import CredentialContext, CredentialProviderChain +from ..core.credentials.session import create_session as _create_session from ..core.mcp import server from . import config, utils @@ -20,6 +22,14 @@ def report_retry(seconds, context=None): ) +def _pop_boolean_flag(kwargs, name, invert=False): + """Pop a boolean flag from kwargs, optionally inverting it.""" + value = kwargs.pop(name) + if value is not None and invert: + value = not value + return value + + def common_package_action_options(f): """Add common options for package actions.""" @@ -214,15 +224,17 @@ def common_api_auth_options(f): def wrapper(ctx, *args, **kwargs): # pylint: disable=missing-docstring opts = config.get_or_create_options(ctx) - opts.api_key = kwargs.pop("api_key") + api_key = kwargs.pop("api_key") + if api_key: + opts.api_key = api_key kwargs["opts"] = opts return ctx.invoke(f, *args, **kwargs) return wrapper -def initialise_api(f): - """Initialise the Cloudsmith API for use.""" +def initialise_session(f): + """Create a shared HTTP session with proxy/SSL/user-agent settings.""" @click.option( "--api-host", envvar="CLOUDSMITH_API_HOST", help="The API host to connect to." @@ -252,6 +264,78 @@ def initialise_api(f): envvar="CLOUDSMITH_API_HEADERS", help="A CSV list of extra headers (key=value) to send to the API.", ) + @click.pass_context + @functools.wraps(f) + def wrapper(ctx, *args, **kwargs): + # pylint: disable=missing-docstring + opts = config.get_or_create_options(ctx) + opts.api_host = kwargs.pop("api_host") + opts.api_proxy = kwargs.pop("api_proxy") + opts.api_ssl_verify = _pop_boolean_flag( + kwargs, "without_api_ssl_verify", invert=True + ) + opts.api_user_agent = kwargs.pop("api_user_agent") + opts.api_headers = kwargs.pop("api_headers") + + opts.session = _create_session( + proxy=opts.api_proxy, + ssl_verify=opts.api_ssl_verify, + user_agent=opts.api_user_agent, + headers=opts.api_headers, + ) + + kwargs["opts"] = opts + return ctx.invoke(f, *args, **kwargs) + + return wrapper + + +def resolve_credentials(f): + """Resolve credentials via the provider chain. Depends on initialise_session.""" + + @click.pass_context + @functools.wraps(f) + def wrapper(ctx, *args, **kwargs): + # pylint: disable=missing-docstring + opts = config.get_or_create_options(ctx) + + context = CredentialContext( + session=opts.session, + api_key=opts.api_key, + api_host=opts.api_host or "https://api.cloudsmith.io", + creds_file_path=ctx.meta.get("creds_file"), + profile=ctx.meta.get("profile"), + debug=opts.debug, + ) + + chain = CredentialProviderChain() + credential = chain.resolve(context) + + if context.keyring_refresh_failed: + click.secho( + "An error occurred when attempting to refresh your SSO access token. " + "To refresh this session, run 'cloudsmith auth'", + fg="yellow", + err=True, + ) + if credential: + click.secho( + "Falling back to API key authentication.", + fg="yellow", + err=True, + ) + + opts.credential = credential + + kwargs["opts"] = opts + return ctx.invoke(f, *args, **kwargs) + + return initialise_session(wrapper) + + +def initialise_api(f): + """Initialise the Cloudsmith API for use. Depends on resolve_credentials.""" + @click.option( "-R", "--without-rate-limit", @@ -294,20 +378,8 @@ def initialise_api(f): @functools.wraps(f) def wrapper(ctx, *args, **kwargs): # pylint: disable=missing-docstring - def _set_boolean(name, invert=False): - value = kwargs.pop(name) - value = value if value is not None else None - if value is not None and invert: - value = not value - return value - opts = config.get_or_create_options(ctx) - opts.api_host = kwargs.pop("api_host") - opts.api_proxy = kwargs.pop("api_proxy") - opts.api_ssl_verify = _set_boolean("without_api_ssl_verify", invert=True) - opts.api_user_agent = kwargs.pop("api_user_agent") - opts.api_headers = kwargs.pop("api_headers") - opts.rate_limit = _set_boolean("without_rate_limit", invert=True) + opts.rate_limit = _pop_boolean_flag(kwargs, "without_rate_limit", invert=True) opts.rate_limit_warning = kwargs.pop("rate_limit_warning") opts.error_retry_max = kwargs.pop("error_retry_max") opts.error_retry_backoff = kwargs.pop("error_retry_backoff") @@ -320,7 +392,7 @@ def call_print_rate_limit_info_with_opts(rate_info): opts.api_config = _initialise_api( debug=opts.debug, host=opts.api_host, - key=opts.api_key, + credential=opts.credential, proxy=opts.api_proxy, ssl_verify=opts.api_ssl_verify, user_agent=opts.api_user_agent, @@ -336,7 +408,7 @@ def call_print_rate_limit_info_with_opts(rate_info): kwargs["opts"] = opts return ctx.invoke(f, *args, **kwargs) - return wrapper + return resolve_credentials(wrapper) def initialise_mcp(f): diff --git a/cloudsmith_cli/cli/tests/conftest.py b/cloudsmith_cli/cli/tests/conftest.py index c42f23f4..ad6262dd 100644 --- a/cloudsmith_cli/cli/tests/conftest.py +++ b/cloudsmith_cli/cli/tests/conftest.py @@ -5,6 +5,7 @@ from ...core.api.init import initialise_api from ...core.api.repos import create_repo, delete_repo +from ...core.credentials import CredentialResult from .utils import random_str @@ -51,7 +52,9 @@ def organization(): @pytest.fixture() def tmp_repository(organization, api_host, api_key): """Yield a temporary repository.""" - initialise_api(host=api_host, key=api_key) + initialise_api( + host=api_host, credential=CredentialResult(api_key=api_key, source_name="test") + ) repo_data = create_repo(organization, {"name": random_str()}) yield repo_data delete_repo(organization, repo_data["slug"]) diff --git a/cloudsmith_cli/cli/tests/test_webserver.py b/cloudsmith_cli/cli/tests/test_webserver.py index 538ad147..12ecdf77 100644 --- a/cloudsmith_cli/cli/tests/test_webserver.py +++ b/cloudsmith_cli/cli/tests/test_webserver.py @@ -40,7 +40,11 @@ def test_refresh_api_config_passes_sso_token(self): mock_init_api.assert_called_once() call_kwargs = mock_init_api.call_args.kwargs - assert call_kwargs.get("access_token") == "test_sso_token_123" + credential = call_kwargs.get("credential") + assert credential is not None + assert credential.api_key == "test_sso_token_123" + assert credential.auth_type == "bearer" + assert credential.source_name == "sso" class TestAuthenticationWebRequestHandlerKeyring: diff --git a/cloudsmith_cli/cli/webserver.py b/cloudsmith_cli/cli/webserver.py index eff5d523..3d2ffca9 100644 --- a/cloudsmith_cli/cli/webserver.py +++ b/cloudsmith_cli/cli/webserver.py @@ -9,6 +9,7 @@ from ..core.api.exceptions import ApiException from ..core.api.init import initialise_api +from ..core.credentials import CredentialResult from ..core.keyring import store_sso_tokens from .saml import exchange_2fa_token @@ -79,7 +80,15 @@ def refresh_api_config_after_auth(self): user_agent=getattr(self.api_opts, "user_agent", None), headers=getattr(self.api_opts, "headers", None), rate_limit=getattr(self.api_opts, "rate_limit", True), - access_token=self.sso_access_token, + credential=( + CredentialResult( + api_key=self.sso_access_token, + source_name="sso", + auth_type="bearer", + ) + if self.sso_access_token + else None + ), ) def finish_request(self, request, client_address): diff --git a/cloudsmith_cli/core/api/init.py b/cloudsmith_cli/core/api/init.py index b6a856bc..ecbc8f6f 100644 --- a/cloudsmith_cli/core/api/init.py +++ b/cloudsmith_cli/core/api/init.py @@ -6,16 +6,13 @@ import click import cloudsmith_api -from ...cli import saml -from .. import keyring from ..rest import RestClient -from .exceptions import ApiException def initialise_api( debug=False, host=None, - key=None, + credential=None, proxy=None, ssl_verify=True, user_agent=None, @@ -26,7 +23,6 @@ def initialise_api( error_retry_backoff=None, error_retry_codes=None, error_retry_cb=None, - access_token=None, ): """Initialise the cloudsmith_api.Configuration.""" # FIXME: pylint: disable=too-many-arguments @@ -45,65 +41,15 @@ def initialise_api( config.verify_ssl = ssl_verify config.client_side_validation = False - # Use directly provided access token (e.g. from SSO callback), - # or fall back to keyring lookup if enabled. - if not access_token: - access_token = keyring.get_access_token(config.host) - - if access_token: - auth_header = config.headers.get("Authorization") - - # overwrite auth header if empty or is basic auth without username or password - if not auth_header or auth_header == config.get_basic_auth_token(): - refresh_token = keyring.get_refresh_token(config.host) - - try: - if keyring.should_refresh_access_token(config.host): - new_access_token, new_refresh_token = saml.refresh_access_token( - config.host, - access_token, - refresh_token, - session=saml.create_configured_session(config), - ) - keyring.store_sso_tokens( - config.host, new_access_token, new_refresh_token - ) - # Use the new tokens - access_token = new_access_token - except ApiException: - keyring.update_refresh_attempted_at(config.host) - - click.secho( - "An error occurred when attempting to refresh your SSO access token. To refresh this session, run 'cloudsmith auth'", - fg="yellow", - err=True, - ) - - # Clear access_token to prevent using expired token - access_token = None - - # Fall back to API key auth if available - if key: - click.secho( - "Falling back to API key authentication.", - fg="yellow", - err=True, - ) - config.api_key["X-Api-Key"] = key - - # Only use SSO token if refresh didn't fail - if access_token: - config.headers["Authorization"] = "Bearer {access_token}".format( - access_token=access_token - ) - - if config.debug: - click.echo("SSO access token config value set") - elif key: - config.api_key["X-Api-Key"] = key - - if config.debug: - click.echo("User API key config value set") + if credential: + if credential.auth_type == "bearer": + config.headers["Authorization"] = f"Bearer {credential.api_key}" + if config.debug: + click.echo("SSO access token config value set") + else: + config.api_key["X-Api-Key"] = credential.api_key + if config.debug: + click.echo("User API key config value set") auth_header = headers and config.headers.get("Authorization") if auth_header and " " in auth_header: diff --git a/cloudsmith_cli/core/credentials/__init__.py b/cloudsmith_cli/core/credentials/__init__.py new file mode 100644 index 00000000..d55413ce --- /dev/null +++ b/cloudsmith_cli/core/credentials/__init__.py @@ -0,0 +1,100 @@ +"""Credential Provider Chain for Cloudsmith CLI. + +Implements an AWS SDK-style credential resolution chain that evaluates +credential sources sequentially and returns the first valid result. +""" + +from __future__ import annotations + +import logging +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + import requests + +logger = logging.getLogger(__name__) + + +@dataclass +class CredentialContext: + """Context passed to credential providers during resolution. + + All values are populated directly from Click options / ``opts``. + """ + + session: requests.Session | None = None + api_key: str | None = None + api_host: str = "https://api.cloudsmith.io" + creds_file_path: str | None = None + profile: str | None = None + debug: bool = False + keyring_refresh_failed: bool = False + + +@dataclass +class CredentialResult: + """Result from a successful credential resolution.""" + + api_key: str + source_name: str + source_detail: str | None = None + auth_type: str = "api_key" + + +class CredentialProvider(ABC): + """Base class for credential providers.""" + + name: str = "base" + + @abstractmethod + def resolve(self, context: CredentialContext) -> CredentialResult | None: + """Attempt to resolve credentials. Return CredentialResult or None.""" + + +class CredentialProviderChain: + """Evaluates credential providers in order, returning the first valid result. + + If no providers are given, uses the default chain: + Keyring → CLIFlag. + """ + + def __init__(self, providers: list[CredentialProvider] | None = None): + if providers is not None: + self.providers = providers + else: + from .providers import CLIFlagProvider, KeyringProvider + + self.providers = [ + KeyringProvider(), + CLIFlagProvider(), + ] + + def resolve(self, context: CredentialContext) -> CredentialResult | None: + """Evaluate each provider in order. Return the first successful result.""" + for provider in self.providers: + try: + result = provider.resolve(context) + if result is not None: + if context.debug: + logger.debug( + "Credentials resolved by %s: %s", + provider.name, + result.source_detail or result.source_name, + ) + return result + if context.debug: + logger.debug( + "Provider %s did not resolve credentials, trying next", + provider.name, + ) + except Exception: # pylint: disable=broad-exception-caught + # Intentionally broad - one provider failing shouldn't stop others + logger.debug( + "Provider %s raised an exception, skipping", + provider.name, + exc_info=True, + ) + continue + return None diff --git a/cloudsmith_cli/core/credentials/providers/__init__.py b/cloudsmith_cli/core/credentials/providers/__init__.py new file mode 100644 index 00000000..5482e397 --- /dev/null +++ b/cloudsmith_cli/core/credentials/providers/__init__.py @@ -0,0 +1,9 @@ +"""Credential providers for the Cloudsmith CLI.""" + +from .cli_flag import CLIFlagProvider +from .keyring_provider import KeyringProvider + +__all__ = [ + "CLIFlagProvider", + "KeyringProvider", +] diff --git a/cloudsmith_cli/core/credentials/providers/cli_flag.py b/cloudsmith_cli/core/credentials/providers/cli_flag.py new file mode 100644 index 00000000..99b55dcf --- /dev/null +++ b/cloudsmith_cli/core/credentials/providers/cli_flag.py @@ -0,0 +1,22 @@ +"""CLI flag credential provider.""" + +from __future__ import annotations + +from .. import CredentialContext, CredentialProvider, CredentialResult + + +class CLIFlagProvider(CredentialProvider): + """Resolves credentials from a CLI flag value passed via CredentialContext.""" + + name = "cli_flag" + + def resolve(self, context: CredentialContext) -> CredentialResult | None: + api_key = context.api_key + if api_key and api_key.strip(): + suffix = api_key.strip()[-4:] + return CredentialResult( + api_key=api_key.strip(), + source_name="cli_flag", + source_detail=f"--api-key flag, CLOUDSMITH_API_KEY, or credentials.ini (ends with ...{suffix})", + ) + return None diff --git a/cloudsmith_cli/core/credentials/providers/keyring_provider.py b/cloudsmith_cli/core/credentials/providers/keyring_provider.py new file mode 100644 index 00000000..c4f44e95 --- /dev/null +++ b/cloudsmith_cli/core/credentials/providers/keyring_provider.py @@ -0,0 +1,54 @@ +"""Keyring credential provider.""" + +from __future__ import annotations + +import logging + +from .. import CredentialContext, CredentialProvider, CredentialResult + +logger = logging.getLogger(__name__) + + +class KeyringProvider(CredentialProvider): + """Resolves credentials from SAML tokens stored in the system keyring.""" + + name = "keyring" + + def resolve(self, context: CredentialContext) -> CredentialResult | None: + from ....cli.saml import refresh_access_token + from ....core import keyring + + if not keyring.should_use_keyring(): + return None + + api_host = context.api_host + access_token = keyring.get_access_token(api_host) + + if not access_token: + return None + + try: + if keyring.should_refresh_access_token(api_host): + if not context.session: + return None + refresh_token = keyring.get_refresh_token(api_host) + new_access_token, new_refresh_token = refresh_access_token( + api_host, + access_token, + refresh_token, + session=context.session, + ) + keyring.store_sso_tokens(api_host, new_access_token, new_refresh_token) + access_token = new_access_token + except Exception: # pylint: disable=broad-exception-caught + keyring.update_refresh_attempted_at(api_host) + context.keyring_refresh_failed = True + logger.debug("Failed to refresh SAML token", exc_info=True) + return None + + return CredentialResult( + api_key=access_token, + source_name="keyring", + source_detail="SAML token from system keyring", + auth_type="bearer", + ) diff --git a/cloudsmith_cli/core/credentials/session.py b/cloudsmith_cli/core/credentials/session.py new file mode 100644 index 00000000..9e314efb --- /dev/null +++ b/cloudsmith_cli/core/credentials/session.py @@ -0,0 +1,63 @@ +"""HTTP session factory with networking configuration and retry support.""" + +from __future__ import annotations + +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +#: Default retry policy: retry on connection errors, 429, and 5xx responses +#: with exponential back-off (1s, 2s, 4s). +DEFAULT_RETRY = Retry( + total=3, + backoff_factor=1, + status_forcelist=(429, 500, 502, 503, 504), + allowed_methods=("GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"), + raise_on_status=False, +) + + +def create_session( + proxy: str | None = None, + ssl_verify: bool = True, + user_agent: str | None = None, + headers: dict | None = None, + api_key: str | None = None, + retry: Retry | None = DEFAULT_RETRY, +) -> requests.Session: + """Create a requests session with networking, auth, and retry configuration. + + Args: + proxy: HTTP/HTTPS proxy URL. + ssl_verify: Whether to verify SSL certificates. + user_agent: Custom User-Agent header value. + headers: Additional headers to include in every request. + api_key: If provided, set as a Bearer token in the Authorization header. + retry: urllib3 Retry configuration. Defaults to :data:`DEFAULT_RETRY`. + Pass ``None`` to disable automatic retries. + + Returns: + A configured :class:`requests.Session`. + """ + session = requests.Session() + + if retry is not None: + adapter = HTTPAdapter(max_retries=retry) + session.mount("https://", adapter) + session.mount("http://", adapter) + + if proxy: + session.proxies = {"http": proxy, "https": proxy} + + session.verify = ssl_verify + + if user_agent: + session.headers["User-Agent"] = user_agent + + if headers: + session.headers.update(headers) + + if api_key: + session.headers["Authorization"] = f"Bearer {api_key}" + + return session diff --git a/cloudsmith_cli/core/tests/test_cli_flag_provider.py b/cloudsmith_cli/core/tests/test_cli_flag_provider.py new file mode 100644 index 00000000..1f10f25e --- /dev/null +++ b/cloudsmith_cli/core/tests/test_cli_flag_provider.py @@ -0,0 +1,34 @@ +"""Tests for the CLI flag credential provider.""" + +from cloudsmith_cli.core.credentials import CredentialContext +from cloudsmith_cli.core.credentials.providers import CLIFlagProvider + + +class TestCLIFlagProvider: + def test_resolves_from_context(self): + provider = CLIFlagProvider() + context = CredentialContext(api_key="my-api-key-1234") + result = provider.resolve(context) + assert result is not None + assert result.api_key == "my-api-key-1234" + assert result.source_name == "cli_flag" + assert result.auth_type == "api_key" + assert "1234" in result.source_detail + + def test_returns_none_when_not_set(self): + provider = CLIFlagProvider() + context = CredentialContext(api_key=None) + result = provider.resolve(context) + assert result is None + + def test_returns_none_for_empty_value(self): + provider = CLIFlagProvider() + context = CredentialContext(api_key=" ") + result = provider.resolve(context) + assert result is None + + def test_strips_whitespace(self): + provider = CLIFlagProvider() + context = CredentialContext(api_key=" my-key ") + result = provider.resolve(context) + assert result.api_key == "my-key" diff --git a/cloudsmith_cli/core/tests/test_credential_context.py b/cloudsmith_cli/core/tests/test_credential_context.py new file mode 100644 index 00000000..15c4b846 --- /dev/null +++ b/cloudsmith_cli/core/tests/test_credential_context.py @@ -0,0 +1,14 @@ +"""Tests for the CredentialContext class.""" + +from cloudsmith_cli.core.credentials import CredentialContext + + +class TestCredentialContext: + def test_keyring_refresh_failed_defaults_false(self): + context = CredentialContext() + assert context.keyring_refresh_failed is False + + def test_keyring_refresh_failed_can_be_set(self): + context = CredentialContext() + context.keyring_refresh_failed = True + assert context.keyring_refresh_failed is True diff --git a/cloudsmith_cli/core/tests/test_credential_provider_chain.py b/cloudsmith_cli/core/tests/test_credential_provider_chain.py new file mode 100644 index 00000000..94eb1e46 --- /dev/null +++ b/cloudsmith_cli/core/tests/test_credential_provider_chain.py @@ -0,0 +1,80 @@ +"""Tests for the credential provider chain.""" + +from cloudsmith_cli.core.credentials import ( + CredentialContext, + CredentialProvider, + CredentialProviderChain, + CredentialResult, +) + + +class DummyProvider(CredentialProvider): + """Test provider that returns a configurable result.""" + + def __init__(self, name, result=None, should_raise=False): + self.name = name + self._result = result + self._should_raise = should_raise + + def resolve(self, context): + if self._should_raise: + raise RuntimeError("Provider error") + return self._result + + +class TestCredentialProviderChain: + def test_first_provider_wins(self): + result1 = CredentialResult(api_key="key1", source_name="first") + result2 = CredentialResult(api_key="key2", source_name="second") + chain = CredentialProviderChain( + [ + DummyProvider("p1", result=result1), + DummyProvider("p2", result=result2), + ] + ) + result = chain.resolve(CredentialContext()) + assert result.api_key == "key1" + assert result.source_name == "first" + + def test_falls_through_to_second(self): + result2 = CredentialResult(api_key="key2", source_name="second") + chain = CredentialProviderChain( + [ + DummyProvider("p1", result=None), + DummyProvider("p2", result=result2), + ] + ) + result = chain.resolve(CredentialContext()) + assert result.api_key == "key2" + + def test_returns_none_when_all_fail(self): + chain = CredentialProviderChain( + [ + DummyProvider("p1", result=None), + DummyProvider("p2", result=None), + ] + ) + result = chain.resolve(CredentialContext()) + assert result is None + + def test_skips_erroring_provider(self): + result2 = CredentialResult(api_key="key2", source_name="second") + chain = CredentialProviderChain( + [ + DummyProvider("p1", should_raise=True), + DummyProvider("p2", result=result2), + ] + ) + result = chain.resolve(CredentialContext()) + assert result.api_key == "key2" + + def test_empty_chain(self): + chain = CredentialProviderChain([]) + result = chain.resolve(CredentialContext()) + assert result is None + + def test_default_chain_order(self): + chain = CredentialProviderChain() + assert len(chain.providers) == 2 + assert chain.providers[0].name == "keyring" + assert chain.providers[1].name == "cli_flag" diff --git a/cloudsmith_cli/core/tests/test_init.py b/cloudsmith_cli/core/tests/test_init.py index 5906796c..33881b16 100644 --- a/cloudsmith_cli/core/tests/test_init.py +++ b/cloudsmith_cli/core/tests/test_init.py @@ -1,62 +1,8 @@ -import os -from unittest.mock import patch - -import pytest from cloudsmith_api import Configuration -from ...cli import saml -from .. import keyring from ..api.init import initialise_api -@pytest.fixture -def mocked_get_access_token(): - with patch.object( - keyring, "get_access_token", return_value="dummy_access_token" - ) as get_access_token_mock: - yield get_access_token_mock - - -@pytest.fixture -def mocked_get_refresh_token(): - with patch.object( - keyring, "get_refresh_token", return_value="dummy_refresh_token" - ) as get_refresh_token_mock: - yield get_refresh_token_mock - - -@pytest.fixture -def mocked_should_refresh_access_token(): - with patch.object( - keyring, "should_refresh_access_token", return_value=False - ) as should_refresh_access_token_mock: - yield should_refresh_access_token_mock - - -@pytest.fixture -def mocked_refresh_access_token(): - with patch.object( - saml, - "refresh_access_token", - return_value=("new_access_token", "new_refresh_token"), - ) as refresh_access_token_mock: - yield refresh_access_token_mock - - -@pytest.fixture -def mocked_store_sso_tokens(): - with patch.object(keyring, "store_sso_tokens") as store_sso_tokens_mock: - yield store_sso_tokens_mock - - -@pytest.fixture -def mocked_update_refresh_attempted_at(): - with patch.object( - keyring, "update_refresh_attempted_at" - ) as update_refresh_attempted_at_mock: - yield update_refresh_attempted_at_mock - - class TestInitialiseApi: def setup_class(cls): # pylint: disable=no-self-argument # For the purposes of these tests, we need to explicitly call set_default(None) at the @@ -65,14 +11,10 @@ def setup_class(cls): # pylint: disable=no-self-argument # Configuration class to its vanilla, unmodified behaviour/state. Configuration.set_default(None) - def test_initialise_api_sets_cloudsmith_api_config_default( - self, mocked_get_access_token - ): + def test_initialise_api_sets_cloudsmith_api_config_default(self): """Assert that the extra attributes we add to the cloudsmith_cli.Configuration class are present on newly-created instances of that class. """ - mocked_get_access_token.return_value = None - # Read and understand the Configuration class's initialiser. # Notice how the _default class attribute is used if not None. # https://github.com/cloudsmith-io/cloudsmith-api/blob/57963fff5b7818783b3d87246495275545d505df/bindings/python/src/cloudsmith_api/configuration.py#L32-L40 @@ -122,64 +64,49 @@ def test_initialise_api_sets_cloudsmith_api_config_default( is not new_config_after_initialise ) - def test_initialise_api_with_refreshable_access_token_set( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - mocked_refresh_access_token, - mocked_store_sso_tokens, - mocked_update_refresh_attempted_at, - ): - mocked_should_refresh_access_token.return_value = True - - # Ensure keyring is enabled for this test - env = os.environ.copy() - env.pop("CLOUDSMITH_NO_KEYRING", None) - with patch.dict(os.environ, env, clear=True): - config = initialise_api(host="https://example.com") + def test_initialise_api_sets_bearer_auth_with_access_token(self): + """Verify access_token is set as Bearer auth header.""" + from cloudsmith_cli.core.credentials import CredentialResult - assert config.headers == {"Authorization": "Bearer new_access_token"} - mocked_refresh_access_token.assert_called_once() - mocked_store_sso_tokens.assert_called_once_with( - "https://example.com", "new_access_token", "new_refresh_token" + credential = CredentialResult( + api_key="test_access_token", source_name="test", auth_type="bearer" ) + config = initialise_api( + host="https://example.com", + credential=credential, + ) + assert config.headers == {"Authorization": "Bearer test_access_token"} - def test_initialise_api_with_recently_refreshed_access_token_and_empty_basic_auth_set( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - mocked_refresh_access_token, - mocked_store_sso_tokens, - mocked_update_refresh_attempted_at, - ): - auth_header = Configuration().get_basic_auth_token() + def test_initialise_api_sets_api_key(self): + """Verify key is set as X-Api-Key header.""" + from cloudsmith_cli.core.credentials import CredentialResult - # Ensure keyring is enabled for this test - env = os.environ.copy() - env.pop("CLOUDSMITH_NO_KEYRING", None) - with patch.dict(os.environ, env, clear=True): - config = initialise_api( - host="https://example.com", headers={"Authorization": auth_header} - ) + credential = CredentialResult( + api_key="test_api_key", source_name="test", auth_type="api_key" + ) + config = initialise_api( + host="https://example.com", + credential=credential, + ) + assert config.api_key["X-Api-Key"] == "test_api_key" - assert config.headers == {"Authorization": "Bearer dummy_access_token"} - assert config.username == "" - assert config.password == "" - mocked_refresh_access_token.assert_not_called() - mocked_store_sso_tokens.assert_not_called() - mocked_update_refresh_attempted_at.assert_not_called() + def test_initialise_api_bearer_credential(self): + """Verify bearer credential sets Authorization header, not X-Api-Key.""" + from cloudsmith_cli.core.credentials import CredentialResult - def test_initialise_api_with_recently_refreshed_access_token_and_present_basic_auth( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - mocked_refresh_access_token, - mocked_store_sso_tokens, - mocked_update_refresh_attempted_at, - ): + Configuration.set_default(None) + credential = CredentialResult( + api_key="test_access_token", source_name="test", auth_type="bearer" + ) + config = initialise_api( + host="https://example.com", + credential=credential, + ) + assert config.headers == {"Authorization": "Bearer test_access_token"} + assert "X-Api-Key" not in config.api_key + + def test_initialise_api_with_basic_auth_header(self): + """Verify basic auth header is parsed into username and password.""" temp_config = Configuration() temp_config.username = "username" temp_config.password = "password" @@ -191,181 +118,3 @@ def test_initialise_api_with_recently_refreshed_access_token_and_present_basic_a assert config.headers == {"Authorization": auth_header} assert config.username == "username" assert config.password == "password" - mocked_refresh_access_token.assert_not_called() - mocked_store_sso_tokens.assert_not_called() - mocked_update_refresh_attempted_at.assert_not_called() - - def test_initialise_api_skips_keyring_when_env_var_set( - self, - mocked_get_access_token, - ): - """Verify keyring returns None when CLOUDSMITH_NO_KEYRING=1.""" - mocked_get_access_token.return_value = None - with patch.dict(os.environ, {"CLOUDSMITH_NO_KEYRING": "1"}): - config = initialise_api(host="https://example.com", key="test_api_key") - - # get_access_token is called but returns None due to internal guard - mocked_get_access_token.assert_called_once() - # API key should be used instead - assert config.api_key["X-Api-Key"] == "test_api_key" - - def test_initialise_api_uses_keyring_when_env_var_not_set( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - ): - """Verify keyring is accessed when CLOUDSMITH_NO_KEYRING is not set.""" - env = os.environ.copy() - env.pop("CLOUDSMITH_NO_KEYRING", None) - with patch.dict(os.environ, env, clear=True): - config = initialise_api(host="https://example.com") - - # Keyring should be accessed - mocked_get_access_token.assert_called_once() - assert config.headers == {"Authorization": "Bearer dummy_access_token"} - - def test_initialise_api_falls_back_to_api_key_when_sso_refresh_fails( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - mocked_refresh_access_token, - mocked_store_sso_tokens, - mocked_update_refresh_attempted_at, - ): - """Verify API key is used as fallback when SSO token refresh fails.""" - from ..api.exceptions import ApiException - - # Simulate SSO token refresh failure - mocked_should_refresh_access_token.return_value = True - mocked_refresh_access_token.side_effect = ApiException( - status=401, detail="Unauthorized" - ) - - # Ensure keyring is enabled for this test - env = os.environ.copy() - env.pop("CLOUDSMITH_NO_KEYRING", None) - with patch.dict(os.environ, env, clear=True): - config = initialise_api(host="https://example.com", key="fallback_api_key") - - # Should not use expired SSO token - assert ( - "Authorization" not in config.headers - or config.headers.get("Authorization") != "Bearer dummy_access_token" - ) - # Should fall back to API key - assert config.api_key["X-Api-Key"] == "fallback_api_key" - mocked_update_refresh_attempted_at.assert_called_once() - mocked_store_sso_tokens.assert_not_called() - - def test_initialise_api_no_auth_when_sso_refresh_fails_without_api_key( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - mocked_refresh_access_token, - mocked_store_sso_tokens, - mocked_update_refresh_attempted_at, - ): - """Verify expired SSO token is not used when refresh fails and no API key available.""" - from ..api.exceptions import ApiException - - # Reset Configuration to clear any state from previous tests - Configuration.set_default(None) - - # Simulate SSO token refresh failure - mocked_should_refresh_access_token.return_value = True - mocked_refresh_access_token.side_effect = ApiException( - status=401, detail="Unauthorized" - ) - - # Ensure keyring is enabled for this test - env = os.environ.copy() - env.pop("CLOUDSMITH_NO_KEYRING", None) - with patch.dict(os.environ, env, clear=True): - config = initialise_api(host="https://example.com", key=None) - - # Should not use expired SSO token - assert ( - "Authorization" not in config.headers - or config.headers.get("Authorization") != "Bearer dummy_access_token" - ) - # Should not have API key either - assert "X-Api-Key" not in config.api_key - mocked_update_refresh_attempted_at.assert_called_once() - mocked_store_sso_tokens.assert_not_called() - - def test_initialise_api_uses_direct_access_token_when_keyring_disabled( - self, - mocked_get_access_token, - ): - """Verify a directly provided access_token is used even when keyring is disabled. - - This is the critical path for --request-api-key with CLOUDSMITH_NO_KEYRING=1. - The SSO callback provides the access token directly, bypassing keyring storage. - """ - with patch.dict(os.environ, {"CLOUDSMITH_NO_KEYRING": "1"}): - config = initialise_api( - host="https://example.com", - access_token="sso_direct_token_abc123", - ) - - # Keyring should NOT be accessed - mocked_get_access_token.assert_not_called() - # The directly provided access token should be used as Bearer auth - assert config.headers == {"Authorization": "Bearer sso_direct_token_abc123"} - - def test_initialise_api_direct_access_token_takes_precedence_over_keyring( - self, - mocked_get_access_token, - mocked_should_refresh_access_token, - ): - """Verify a directly provided access_token takes precedence over keyring.""" - env = os.environ.copy() - env.pop("CLOUDSMITH_NO_KEYRING", None) - with patch.dict(os.environ, env, clear=True): - config = initialise_api( - host="https://example.com", - access_token="direct_token_xyz", - ) - - # Keyring should NOT be accessed because we have a direct token - mocked_get_access_token.assert_not_called() - # The direct access token should be used - assert config.headers == {"Authorization": "Bearer direct_token_xyz"} - - def test_initialise_api_direct_access_token_skips_refresh( - self, - mocked_get_access_token, - mocked_get_refresh_token, - mocked_should_refresh_access_token, - mocked_refresh_access_token, - mocked_store_sso_tokens, - mocked_update_refresh_attempted_at, - ): - """Verify a directly provided access_token skips the refresh cycle entirely. - - When the SSO callback provides a fresh token - directly (e.g. for --request-api-key with CLOUDSMITH_NO_KEYRING=1), - we must NOT attempt to refresh it. The refresh path would fail because - there is no refresh_token in keyring, clearing the access_token and - leaving zero authentication. - """ - with patch.dict(os.environ, {"CLOUDSMITH_NO_KEYRING": "1"}): - config = initialise_api( - host="https://example.com", - access_token="fresh_sso_token", - ) - - # Keyring lookup should be skipped (direct token provided) - mocked_get_access_token.assert_not_called() - # should_refresh_access_token is called but returns False - # due to internal should_use_keyring() guard - mocked_should_refresh_access_token.assert_called_once() - # Refresh logic should NOT be triggered - mocked_refresh_access_token.assert_not_called() - mocked_store_sso_tokens.assert_not_called() - mocked_update_refresh_attempted_at.assert_not_called() - # The fresh SSO token should be used as-is - assert config.headers == {"Authorization": "Bearer fresh_sso_token"} diff --git a/cloudsmith_cli/core/tests/test_keyring_provider.py b/cloudsmith_cli/core/tests/test_keyring_provider.py new file mode 100644 index 00000000..3a6732af --- /dev/null +++ b/cloudsmith_cli/core/tests/test_keyring_provider.py @@ -0,0 +1,81 @@ +"""Tests for the keyring credential provider.""" + +import os +from unittest.mock import MagicMock, patch + +from cloudsmith_cli.core.credentials import CredentialContext +from cloudsmith_cli.core.credentials.providers import KeyringProvider + + +class TestKeyringProvider: + def test_returns_none_when_keyring_disabled(self): + provider = KeyringProvider() + with patch.dict(os.environ, {"CLOUDSMITH_NO_KEYRING": "1"}): + result = provider.resolve(CredentialContext()) + assert result is None + + def test_returns_none_when_no_token(self): + from cloudsmith_cli.core import keyring + + provider = KeyringProvider() + env = os.environ.copy() + env.pop("CLOUDSMITH_NO_KEYRING", None) + with patch.dict(os.environ, env, clear=True): + with patch.object(keyring, "should_use_keyring", return_value=True): + with patch.object(keyring, "get_access_token", return_value=None): + result = provider.resolve(CredentialContext()) + assert result is None + + def test_returns_bearer_token(self): + from cloudsmith_cli.core import keyring + + provider = KeyringProvider() + env = os.environ.copy() + env.pop("CLOUDSMITH_NO_KEYRING", None) + with patch.dict(os.environ, env, clear=True): + with patch.object(keyring, "should_use_keyring", return_value=True): + with patch.object( + keyring, "get_access_token", return_value="sso_token" + ): + with patch.object( + keyring, "should_refresh_access_token", return_value=False + ): + result = provider.resolve(CredentialContext()) + assert result is not None + assert result.api_key == "sso_token" + assert result.auth_type == "bearer" + assert result.source_name == "keyring" + + def test_returns_none_on_refresh_failure(self): + from cloudsmith_cli.cli import saml + from cloudsmith_cli.core import keyring + from cloudsmith_cli.core.api.exceptions import ApiException + + provider = KeyringProvider() + context = CredentialContext(session=MagicMock()) + env = os.environ.copy() + env.pop("CLOUDSMITH_NO_KEYRING", None) + with patch.dict(os.environ, env, clear=True): + with patch.object(keyring, "should_use_keyring", return_value=True): + with patch.object( + keyring, "get_access_token", return_value="old_token" + ): + with patch.object( + keyring, "should_refresh_access_token", return_value=True + ): + with patch.object( + keyring, "get_refresh_token", return_value="refresh_tok" + ): + with patch.object( + saml, + "refresh_access_token", + side_effect=ApiException( + status=401, detail="Unauthorized" + ), + ): + with patch.object( + keyring, "update_refresh_attempted_at" + ): + result = provider.resolve(context) + assert result is None + assert context.keyring_refresh_failed is True diff --git a/cloudsmith_cli/core/tests/test_rest.py b/cloudsmith_cli/core/tests/test_rest.py index 364af4e7..08335720 100644 --- a/cloudsmith_cli/core/tests/test_rest.py +++ b/cloudsmith_cli/core/tests/test_rest.py @@ -5,9 +5,21 @@ from ..rest import RestClient +@pytest.fixture(autouse=True) +def patch_httpretty_socket(monkeypatch): + """Patch httpretty's fake socket to handle shutdown() which urllib3 2.0+ calls.""" + import httpretty.core + + monkeypatch.setattr( + httpretty.core.fakesock.socket, + "shutdown", + lambda self, how: None, + raising=False, + ) + + class TestRestClient: @httpretty.activate(allow_net_connect=False, verbose=True) - @pytest.mark.usefixtures("mock_keyring") def test_implicit_retry_for_status_codes(self): """Assert that the rest client retries certain status codes automatically.""" # initialise_api() needs to be called before RestClient can be instantiated, @@ -39,32 +51,3 @@ def test_implicit_retry_for_status_codes(self): assert len(httpretty.latest_requests()) == 6 assert r.status == 200 - - -@pytest.fixture -def mock_keyring(monkeypatch): - """Mock keyring functions to prevent reading real SSO tokens from the system keyring. - - This is necessary because initialise_api() checks the keyring for SSO tokens, - and if found, it attempts to refresh them via a network request. When running - this test in isolation with httpretty mocking enabled, that network request - will fail because it's not mocked. - """ - # Import here to avoid circular imports - import httpretty.core - - from .. import keyring - - # Mock all keyring getter functions to return None/False - monkeypatch.setattr(keyring, "get_access_token", lambda api_host: None) - monkeypatch.setattr(keyring, "get_refresh_token", lambda api_host: None) - monkeypatch.setattr(keyring, "should_refresh_access_token", lambda api_host: False) - - # Patch httpretty's fake socket to handle shutdown() which urllib3 2.0+ calls - # This fixes: "Failed to socket.shutdown because a real socket does not exist" - monkeypatch.setattr( - httpretty.core.fakesock.socket, - "shutdown", - lambda self, how: None, - raising=False, - ) From f30e428164b46d77ca79fee33f66b1c449079985 Mon Sep 17 00:00:00 2001 From: Ian Duffy Date: Tue, 31 Mar 2026 17:01:10 +0100 Subject: [PATCH 2/2] fix: review feedback --- cloudsmith_cli/cli/decorators.py | 5 +- cloudsmith_cli/cli/tests/conftest.py | 2 +- cloudsmith_cli/cli/webserver.py | 2 +- cloudsmith_cli/core/credentials/__init__.py | 100 ------------------ cloudsmith_cli/core/credentials/chain.py | 61 +++++++++++ cloudsmith_cli/core/credentials/models.py | 33 ++++++ cloudsmith_cli/core/credentials/provider.py | 17 +++ .../core/credentials/providers/cli_flag.py | 3 +- .../credentials/providers/keyring_provider.py | 8 +- cloudsmith_cli/core/credentials/session.py | 63 ----------- cloudsmith_cli/core/rest.py | 28 ++--- .../core/tests/test_cli_flag_provider.py | 2 +- .../core/tests/test_credential_context.py | 2 +- .../tests/test_credential_provider_chain.py | 9 +- cloudsmith_cli/core/tests/test_init.py | 6 +- .../core/tests/test_keyring_provider.py | 2 +- cloudsmith_cli/core/tests/test_rest.py | 16 ++- 17 files changed, 162 insertions(+), 197 deletions(-) create mode 100644 cloudsmith_cli/core/credentials/chain.py create mode 100644 cloudsmith_cli/core/credentials/models.py create mode 100644 cloudsmith_cli/core/credentials/provider.py delete mode 100644 cloudsmith_cli/core/credentials/session.py diff --git a/cloudsmith_cli/cli/decorators.py b/cloudsmith_cli/cli/decorators.py index 06ce7836..bda1fd69 100644 --- a/cloudsmith_cli/cli/decorators.py +++ b/cloudsmith_cli/cli/decorators.py @@ -7,9 +7,10 @@ from cloudsmith_cli.cli import validators from ..core.api.init import initialise_api as _initialise_api -from ..core.credentials import CredentialContext, CredentialProviderChain -from ..core.credentials.session import create_session as _create_session +from ..core.credentials.chain import CredentialProviderChain +from ..core.credentials.models import CredentialContext from ..core.mcp import server +from ..core.rest import create_requests_session as _create_session from . import config, utils diff --git a/cloudsmith_cli/cli/tests/conftest.py b/cloudsmith_cli/cli/tests/conftest.py index ad6262dd..653e574c 100644 --- a/cloudsmith_cli/cli/tests/conftest.py +++ b/cloudsmith_cli/cli/tests/conftest.py @@ -5,7 +5,7 @@ from ...core.api.init import initialise_api from ...core.api.repos import create_repo, delete_repo -from ...core.credentials import CredentialResult +from ...core.credentials.models import CredentialResult from .utils import random_str diff --git a/cloudsmith_cli/cli/webserver.py b/cloudsmith_cli/cli/webserver.py index 3d2ffca9..060d2ff7 100644 --- a/cloudsmith_cli/cli/webserver.py +++ b/cloudsmith_cli/cli/webserver.py @@ -9,7 +9,7 @@ from ..core.api.exceptions import ApiException from ..core.api.init import initialise_api -from ..core.credentials import CredentialResult +from ..core.credentials.models import CredentialResult from ..core.keyring import store_sso_tokens from .saml import exchange_2fa_token diff --git a/cloudsmith_cli/core/credentials/__init__.py b/cloudsmith_cli/core/credentials/__init__.py index d55413ce..e69de29b 100644 --- a/cloudsmith_cli/core/credentials/__init__.py +++ b/cloudsmith_cli/core/credentials/__init__.py @@ -1,100 +0,0 @@ -"""Credential Provider Chain for Cloudsmith CLI. - -Implements an AWS SDK-style credential resolution chain that evaluates -credential sources sequentially and returns the first valid result. -""" - -from __future__ import annotations - -import logging -from abc import ABC, abstractmethod -from dataclasses import dataclass -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - import requests - -logger = logging.getLogger(__name__) - - -@dataclass -class CredentialContext: - """Context passed to credential providers during resolution. - - All values are populated directly from Click options / ``opts``. - """ - - session: requests.Session | None = None - api_key: str | None = None - api_host: str = "https://api.cloudsmith.io" - creds_file_path: str | None = None - profile: str | None = None - debug: bool = False - keyring_refresh_failed: bool = False - - -@dataclass -class CredentialResult: - """Result from a successful credential resolution.""" - - api_key: str - source_name: str - source_detail: str | None = None - auth_type: str = "api_key" - - -class CredentialProvider(ABC): - """Base class for credential providers.""" - - name: str = "base" - - @abstractmethod - def resolve(self, context: CredentialContext) -> CredentialResult | None: - """Attempt to resolve credentials. Return CredentialResult or None.""" - - -class CredentialProviderChain: - """Evaluates credential providers in order, returning the first valid result. - - If no providers are given, uses the default chain: - Keyring → CLIFlag. - """ - - def __init__(self, providers: list[CredentialProvider] | None = None): - if providers is not None: - self.providers = providers - else: - from .providers import CLIFlagProvider, KeyringProvider - - self.providers = [ - KeyringProvider(), - CLIFlagProvider(), - ] - - def resolve(self, context: CredentialContext) -> CredentialResult | None: - """Evaluate each provider in order. Return the first successful result.""" - for provider in self.providers: - try: - result = provider.resolve(context) - if result is not None: - if context.debug: - logger.debug( - "Credentials resolved by %s: %s", - provider.name, - result.source_detail or result.source_name, - ) - return result - if context.debug: - logger.debug( - "Provider %s did not resolve credentials, trying next", - provider.name, - ) - except Exception: # pylint: disable=broad-exception-caught - # Intentionally broad - one provider failing shouldn't stop others - logger.debug( - "Provider %s raised an exception, skipping", - provider.name, - exc_info=True, - ) - continue - return None diff --git a/cloudsmith_cli/core/credentials/chain.py b/cloudsmith_cli/core/credentials/chain.py new file mode 100644 index 00000000..259490fc --- /dev/null +++ b/cloudsmith_cli/core/credentials/chain.py @@ -0,0 +1,61 @@ +"""Credential provider chain for the Cloudsmith CLI. + +Implements an AWS SDK-style credential resolution chain that evaluates +credential sources sequentially and returns the first valid result. +""" + +from __future__ import annotations + +import logging + +from .models import CredentialContext, CredentialResult +from .provider import CredentialProvider + +logger = logging.getLogger(__name__) + + +class CredentialProviderChain: + """Evaluates credential providers in order, returning the first valid result. + + If no providers are given, uses the default chain: + Keyring → CLIFlag. + """ + + def __init__(self, providers: list[CredentialProvider] | None = None): + if providers is not None: + self.providers = providers + else: + from .providers import CLIFlagProvider, KeyringProvider + + self.providers = [ + KeyringProvider(), + CLIFlagProvider(), + ] + + def resolve(self, context: CredentialContext) -> CredentialResult | None: + """Evaluate each provider in order. Return the first successful result.""" + for provider in self.providers: + try: + result = provider.resolve(context) + if result is not None: + if context.debug: + logger.debug( + "Credentials resolved by %s: %s", + provider.name, + result.source_detail or result.source_name, + ) + return result + if context.debug: + logger.debug( + "Provider %s did not resolve credentials, trying next", + provider.name, + ) + except Exception: # pylint: disable=broad-exception-caught + # Intentionally broad - one provider failing shouldn't stop others + logger.debug( + "Provider %s raised an exception, skipping", + provider.name, + exc_info=True, + ) + continue + return None diff --git a/cloudsmith_cli/core/credentials/models.py b/cloudsmith_cli/core/credentials/models.py new file mode 100644 index 00000000..73b3416e --- /dev/null +++ b/cloudsmith_cli/core/credentials/models.py @@ -0,0 +1,33 @@ +"""Credential data models for the Cloudsmith CLI.""" + +from __future__ import annotations + +from dataclasses import dataclass + +import requests + + +@dataclass +class CredentialContext: + """Context passed to credential providers during resolution. + + All values are populated directly from Click options / ``opts``. + """ + + session: requests.Session | None = None + api_key: str | None = None + api_host: str = "https://api.cloudsmith.io" + creds_file_path: str | None = None + profile: str | None = None + debug: bool = False + keyring_refresh_failed: bool = False + + +@dataclass +class CredentialResult: + """Result from a successful credential resolution.""" + + api_key: str + source_name: str + source_detail: str | None = None + auth_type: str = "api_key" diff --git a/cloudsmith_cli/core/credentials/provider.py b/cloudsmith_cli/core/credentials/provider.py new file mode 100644 index 00000000..78b18886 --- /dev/null +++ b/cloudsmith_cli/core/credentials/provider.py @@ -0,0 +1,17 @@ +"""Base credential provider interface.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod + +from .models import CredentialContext, CredentialResult + + +class CredentialProvider(ABC): + """Base class for credential providers.""" + + name: str = "base" + + @abstractmethod + def resolve(self, context: CredentialContext) -> CredentialResult | None: + """Attempt to resolve credentials. Return CredentialResult or None.""" diff --git a/cloudsmith_cli/core/credentials/providers/cli_flag.py b/cloudsmith_cli/core/credentials/providers/cli_flag.py index 99b55dcf..0a05d027 100644 --- a/cloudsmith_cli/core/credentials/providers/cli_flag.py +++ b/cloudsmith_cli/core/credentials/providers/cli_flag.py @@ -2,7 +2,8 @@ from __future__ import annotations -from .. import CredentialContext, CredentialProvider, CredentialResult +from ..models import CredentialContext, CredentialResult +from ..provider import CredentialProvider class CLIFlagProvider(CredentialProvider): diff --git a/cloudsmith_cli/core/credentials/providers/keyring_provider.py b/cloudsmith_cli/core/credentials/providers/keyring_provider.py index c4f44e95..c8844f0c 100644 --- a/cloudsmith_cli/core/credentials/providers/keyring_provider.py +++ b/cloudsmith_cli/core/credentials/providers/keyring_provider.py @@ -4,7 +4,10 @@ import logging -from .. import CredentialContext, CredentialProvider, CredentialResult +from ....cli.saml import refresh_access_token +from ....core import keyring +from ..models import CredentialContext, CredentialResult +from ..provider import CredentialProvider logger = logging.getLogger(__name__) @@ -15,9 +18,6 @@ class KeyringProvider(CredentialProvider): name = "keyring" def resolve(self, context: CredentialContext) -> CredentialResult | None: - from ....cli.saml import refresh_access_token - from ....core import keyring - if not keyring.should_use_keyring(): return None diff --git a/cloudsmith_cli/core/credentials/session.py b/cloudsmith_cli/core/credentials/session.py deleted file mode 100644 index 9e314efb..00000000 --- a/cloudsmith_cli/core/credentials/session.py +++ /dev/null @@ -1,63 +0,0 @@ -"""HTTP session factory with networking configuration and retry support.""" - -from __future__ import annotations - -import requests -from requests.adapters import HTTPAdapter -from urllib3.util.retry import Retry - -#: Default retry policy: retry on connection errors, 429, and 5xx responses -#: with exponential back-off (1s, 2s, 4s). -DEFAULT_RETRY = Retry( - total=3, - backoff_factor=1, - status_forcelist=(429, 500, 502, 503, 504), - allowed_methods=("GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"), - raise_on_status=False, -) - - -def create_session( - proxy: str | None = None, - ssl_verify: bool = True, - user_agent: str | None = None, - headers: dict | None = None, - api_key: str | None = None, - retry: Retry | None = DEFAULT_RETRY, -) -> requests.Session: - """Create a requests session with networking, auth, and retry configuration. - - Args: - proxy: HTTP/HTTPS proxy URL. - ssl_verify: Whether to verify SSL certificates. - user_agent: Custom User-Agent header value. - headers: Additional headers to include in every request. - api_key: If provided, set as a Bearer token in the Authorization header. - retry: urllib3 Retry configuration. Defaults to :data:`DEFAULT_RETRY`. - Pass ``None`` to disable automatic retries. - - Returns: - A configured :class:`requests.Session`. - """ - session = requests.Session() - - if retry is not None: - adapter = HTTPAdapter(max_retries=retry) - session.mount("https://", adapter) - session.mount("http://", adapter) - - if proxy: - session.proxies = {"http": proxy, "https": proxy} - - session.verify = ssl_verify - - if user_agent: - session.headers["User-Agent"] = user_agent - - if headers: - session.headers.update(headers) - - if api_key: - session.headers["Authorization"] = f"Bearer {api_key}" - - return session diff --git a/cloudsmith_cli/core/rest.py b/cloudsmith_cli/core/rest.py index 3820b50b..e9652931 100644 --- a/cloudsmith_cli/core/rest.py +++ b/cloudsmith_cli/core/rest.py @@ -61,28 +61,26 @@ def create_requests_session( session=None, error_retry_cb=None, respect_retry_after_header=True, + user_agent=None, + headers=None, ): """Create a requests session that retries some errors.""" # pylint: disable=too-many-branches config = Configuration() if retries is None: - if config.error_retry_max is None: # pylint: disable=no-member - retries = 5 - else: - retries = config.error_retry_max # pylint: disable=no-member + retry_max = getattr(config, "error_retry_max", None) + retries = retry_max if retry_max is not None else 5 if backoff_factor is None: - if config.error_retry_backoff is None: # pylint: disable=no-member - backoff_factor = 0.23 - else: - backoff_factor = config.error_retry_backoff # pylint: disable=no-member + retry_backoff = getattr(config, "error_retry_backoff", None) + backoff_factor = retry_backoff if retry_backoff is not None else 0.23 if status_forcelist is None: - if config.error_retry_codes is None: # pylint: disable=no-member - status_forcelist = [500, 502, 503, 504] - else: - status_forcelist = config.error_retry_codes # pylint: disable=no-member + retry_codes = getattr(config, "error_retry_codes", None) + status_forcelist = ( + retry_codes if retry_codes is not None else [500, 502, 503, 504] + ) if ssl_verify is None: ssl_verify = config.verify_ssl @@ -125,6 +123,12 @@ def create_requests_session( session.mount("http://", adapter) session.mount("https://", adapter) + if user_agent: + session.headers["User-Agent"] = user_agent + + if headers: + session.headers.update(headers) + return session diff --git a/cloudsmith_cli/core/tests/test_cli_flag_provider.py b/cloudsmith_cli/core/tests/test_cli_flag_provider.py index 1f10f25e..dc4c0ff7 100644 --- a/cloudsmith_cli/core/tests/test_cli_flag_provider.py +++ b/cloudsmith_cli/core/tests/test_cli_flag_provider.py @@ -1,6 +1,6 @@ """Tests for the CLI flag credential provider.""" -from cloudsmith_cli.core.credentials import CredentialContext +from cloudsmith_cli.core.credentials.models import CredentialContext from cloudsmith_cli.core.credentials.providers import CLIFlagProvider diff --git a/cloudsmith_cli/core/tests/test_credential_context.py b/cloudsmith_cli/core/tests/test_credential_context.py index 15c4b846..49dc57d9 100644 --- a/cloudsmith_cli/core/tests/test_credential_context.py +++ b/cloudsmith_cli/core/tests/test_credential_context.py @@ -1,6 +1,6 @@ """Tests for the CredentialContext class.""" -from cloudsmith_cli.core.credentials import CredentialContext +from cloudsmith_cli.core.credentials.models import CredentialContext class TestCredentialContext: diff --git a/cloudsmith_cli/core/tests/test_credential_provider_chain.py b/cloudsmith_cli/core/tests/test_credential_provider_chain.py index 94eb1e46..58cddb00 100644 --- a/cloudsmith_cli/core/tests/test_credential_provider_chain.py +++ b/cloudsmith_cli/core/tests/test_credential_provider_chain.py @@ -1,11 +1,8 @@ """Tests for the credential provider chain.""" -from cloudsmith_cli.core.credentials import ( - CredentialContext, - CredentialProvider, - CredentialProviderChain, - CredentialResult, -) +from cloudsmith_cli.core.credentials.chain import CredentialProviderChain +from cloudsmith_cli.core.credentials.models import CredentialContext, CredentialResult +from cloudsmith_cli.core.credentials.provider import CredentialProvider class DummyProvider(CredentialProvider): diff --git a/cloudsmith_cli/core/tests/test_init.py b/cloudsmith_cli/core/tests/test_init.py index 33881b16..f0b691e6 100644 --- a/cloudsmith_cli/core/tests/test_init.py +++ b/cloudsmith_cli/core/tests/test_init.py @@ -66,7 +66,7 @@ def test_initialise_api_sets_cloudsmith_api_config_default(self): def test_initialise_api_sets_bearer_auth_with_access_token(self): """Verify access_token is set as Bearer auth header.""" - from cloudsmith_cli.core.credentials import CredentialResult + from cloudsmith_cli.core.credentials.models import CredentialResult credential = CredentialResult( api_key="test_access_token", source_name="test", auth_type="bearer" @@ -79,7 +79,7 @@ def test_initialise_api_sets_bearer_auth_with_access_token(self): def test_initialise_api_sets_api_key(self): """Verify key is set as X-Api-Key header.""" - from cloudsmith_cli.core.credentials import CredentialResult + from cloudsmith_cli.core.credentials.models import CredentialResult credential = CredentialResult( api_key="test_api_key", source_name="test", auth_type="api_key" @@ -92,7 +92,7 @@ def test_initialise_api_sets_api_key(self): def test_initialise_api_bearer_credential(self): """Verify bearer credential sets Authorization header, not X-Api-Key.""" - from cloudsmith_cli.core.credentials import CredentialResult + from cloudsmith_cli.core.credentials.models import CredentialResult Configuration.set_default(None) credential = CredentialResult( diff --git a/cloudsmith_cli/core/tests/test_keyring_provider.py b/cloudsmith_cli/core/tests/test_keyring_provider.py index 3a6732af..dcd1ba52 100644 --- a/cloudsmith_cli/core/tests/test_keyring_provider.py +++ b/cloudsmith_cli/core/tests/test_keyring_provider.py @@ -3,7 +3,7 @@ import os from unittest.mock import MagicMock, patch -from cloudsmith_cli.core.credentials import CredentialContext +from cloudsmith_cli.core.credentials.models import CredentialContext from cloudsmith_cli.core.credentials.providers import KeyringProvider diff --git a/cloudsmith_cli/core/tests/test_rest.py b/cloudsmith_cli/core/tests/test_rest.py index 08335720..d2a6e6d8 100644 --- a/cloudsmith_cli/core/tests/test_rest.py +++ b/cloudsmith_cli/core/tests/test_rest.py @@ -2,7 +2,7 @@ import pytest from ..api.init import initialise_api -from ..rest import RestClient +from ..rest import RestClient, create_requests_session @pytest.fixture(autouse=True) @@ -51,3 +51,17 @@ def test_implicit_retry_for_status_codes(self): assert len(httpretty.latest_requests()) == 6 assert r.status == 200 + + +class TestCreateRequestsSession: + @pytest.fixture(autouse=True) + def setup(self): + initialise_api() + + def test_sets_user_agent_header(self): + session = create_requests_session(user_agent="test-agent/1.0") + assert session.headers["User-Agent"] == "test-agent/1.0" + + def test_sets_extra_headers(self): + session = create_requests_session(headers={"X-Custom": "value"}) + assert session.headers["X-Custom"] == "value"