Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 10 additions & 22 deletions cloudsmith_cli/cli/commands/whoami.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
"""CLI/Commands - Retrieve authentication status."""

import os

import click

from ...core import keyring
from ...core.api.exceptions import ApiException
from ...core.api.user import get_token_metadata, get_user_brief
from .. import decorators, utils
from ..config import CredentialsReader
from ..exceptions import handle_api_exceptions
from .main import main

Expand All @@ -26,26 +23,17 @@ def _get_active_method(api_config):
def _get_api_key_source(opts):
"""Determine where the API key was loaded from.

Checks in priority order matching actual resolution:
CLI --api-key flag > CLOUDSMITH_API_KEY env var > credentials.ini.
Uses the credential provider chain result attached by initialise_api.
"""
if not opts.api_key:
return {"configured": False, "source": None, "source_key": None}

env_key = os.environ.get("CLOUDSMITH_API_KEY")

# If env var is set but differs from the resolved key, CLI flag won
if env_key and opts.api_key != env_key:
source, key = "CLI --api-key flag", "cli_flag"
elif env_key:
suffix = env_key[-4:]
source, key = f"CLOUDSMITH_API_KEY env var (ends with ...{suffix})", "env_var"
elif creds := CredentialsReader.find_existing_files():
source, key = f"credentials.ini ({creds[0]})", "credentials_file"
else:
source, key = "CLI --api-key flag", "cli_flag"

return {"configured": True, "source": source, "source_key": key}
credential = getattr(opts, "credential", None)
if credential:
return {
"configured": True,
"source": credential.source_detail or credential.source_name,
"source_key": credential.source_name,
}

return {"configured": False, "source": None, "source_key": None}


def _get_sso_status(api_host):
Expand Down
108 changes: 90 additions & 18 deletions cloudsmith_cli/cli/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from cloudsmith_cli.cli import validators

from ..core.api.init import initialise_api as _initialise_api
from ..core.credentials import CredentialContext, CredentialProviderChain
from ..core.credentials.session import create_session as _create_session
from ..core.mcp import server
from . import config, utils

Expand All @@ -20,6 +22,14 @@ def report_retry(seconds, context=None):
)


def _pop_boolean_flag(kwargs, name, invert=False):
"""Pop a boolean flag from kwargs, optionally inverting it."""
value = kwargs.pop(name)
if value is not None and invert:
value = not value
return value


def common_package_action_options(f):
"""Add common options for package actions."""

Expand Down Expand Up @@ -214,15 +224,17 @@ def common_api_auth_options(f):
def wrapper(ctx, *args, **kwargs):
# pylint: disable=missing-docstring
opts = config.get_or_create_options(ctx)
opts.api_key = kwargs.pop("api_key")
api_key = kwargs.pop("api_key")
if api_key:
opts.api_key = api_key
kwargs["opts"] = opts
return ctx.invoke(f, *args, **kwargs)

return wrapper


def initialise_api(f):
"""Initialise the Cloudsmith API for use."""
def initialise_session(f):
"""Create a shared HTTP session with proxy/SSL/user-agent settings."""

@click.option(
"--api-host", envvar="CLOUDSMITH_API_HOST", help="The API host to connect to."
Expand Down Expand Up @@ -252,6 +264,78 @@ def initialise_api(f):
envvar="CLOUDSMITH_API_HEADERS",
help="A CSV list of extra headers (key=value) to send to the API.",
)
@click.pass_context
@functools.wraps(f)
def wrapper(ctx, *args, **kwargs):
# pylint: disable=missing-docstring
opts = config.get_or_create_options(ctx)
opts.api_host = kwargs.pop("api_host")
opts.api_proxy = kwargs.pop("api_proxy")
opts.api_ssl_verify = _pop_boolean_flag(
kwargs, "without_api_ssl_verify", invert=True
)
opts.api_user_agent = kwargs.pop("api_user_agent")
opts.api_headers = kwargs.pop("api_headers")

opts.session = _create_session(
proxy=opts.api_proxy,
ssl_verify=opts.api_ssl_verify,
user_agent=opts.api_user_agent,
headers=opts.api_headers,
)

kwargs["opts"] = opts
return ctx.invoke(f, *args, **kwargs)

return wrapper


def resolve_credentials(f):
"""Resolve credentials via the provider chain. Depends on initialise_session."""

@click.pass_context
@functools.wraps(f)
def wrapper(ctx, *args, **kwargs):
# pylint: disable=missing-docstring
opts = config.get_or_create_options(ctx)

context = CredentialContext(
session=opts.session,
api_key=opts.api_key,
api_host=opts.api_host or "https://api.cloudsmith.io",
creds_file_path=ctx.meta.get("creds_file"),
profile=ctx.meta.get("profile"),
debug=opts.debug,
)

chain = CredentialProviderChain()
credential = chain.resolve(context)

if context.keyring_refresh_failed:
click.secho(
"An error occurred when attempting to refresh your SSO access token. "
"To refresh this session, run 'cloudsmith auth'",
fg="yellow",
err=True,
)
if credential:
click.secho(
"Falling back to API key authentication.",
fg="yellow",
err=True,
)

opts.credential = credential

kwargs["opts"] = opts
return ctx.invoke(f, *args, **kwargs)

return initialise_session(wrapper)


def initialise_api(f):
"""Initialise the Cloudsmith API for use. Depends on resolve_credentials."""

@click.option(
"-R",
"--without-rate-limit",
Expand Down Expand Up @@ -294,20 +378,8 @@ def initialise_api(f):
@functools.wraps(f)
def wrapper(ctx, *args, **kwargs):
# pylint: disable=missing-docstring
def _set_boolean(name, invert=False):
value = kwargs.pop(name)
value = value if value is not None else None
if value is not None and invert:
value = not value
return value

opts = config.get_or_create_options(ctx)
opts.api_host = kwargs.pop("api_host")
opts.api_proxy = kwargs.pop("api_proxy")
opts.api_ssl_verify = _set_boolean("without_api_ssl_verify", invert=True)
opts.api_user_agent = kwargs.pop("api_user_agent")
opts.api_headers = kwargs.pop("api_headers")
opts.rate_limit = _set_boolean("without_rate_limit", invert=True)
opts.rate_limit = _pop_boolean_flag(kwargs, "without_rate_limit", invert=True)
opts.rate_limit_warning = kwargs.pop("rate_limit_warning")
opts.error_retry_max = kwargs.pop("error_retry_max")
opts.error_retry_backoff = kwargs.pop("error_retry_backoff")
Expand All @@ -320,7 +392,7 @@ def call_print_rate_limit_info_with_opts(rate_info):
opts.api_config = _initialise_api(
debug=opts.debug,
host=opts.api_host,
key=opts.api_key,
credential=opts.credential,
proxy=opts.api_proxy,
ssl_verify=opts.api_ssl_verify,
user_agent=opts.api_user_agent,
Expand All @@ -336,7 +408,7 @@ def call_print_rate_limit_info_with_opts(rate_info):
kwargs["opts"] = opts
return ctx.invoke(f, *args, **kwargs)

return wrapper
return resolve_credentials(wrapper)


def initialise_mcp(f):
Expand Down
5 changes: 4 additions & 1 deletion cloudsmith_cli/cli/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from ...core.api.init import initialise_api
from ...core.api.repos import create_repo, delete_repo
from ...core.credentials import CredentialResult
from .utils import random_str


Expand Down Expand Up @@ -51,7 +52,9 @@ def organization():
@pytest.fixture()
def tmp_repository(organization, api_host, api_key):
"""Yield a temporary repository."""
initialise_api(host=api_host, key=api_key)
initialise_api(
host=api_host, credential=CredentialResult(api_key=api_key, source_name="test")
)
repo_data = create_repo(organization, {"name": random_str()})
yield repo_data
delete_repo(organization, repo_data["slug"])
Expand Down
6 changes: 5 additions & 1 deletion cloudsmith_cli/cli/tests/test_webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ def test_refresh_api_config_passes_sso_token(self):

mock_init_api.assert_called_once()
call_kwargs = mock_init_api.call_args.kwargs
assert call_kwargs.get("access_token") == "test_sso_token_123"
credential = call_kwargs.get("credential")
assert credential is not None
assert credential.api_key == "test_sso_token_123"
assert credential.auth_type == "bearer"
assert credential.source_name == "sso"


class TestAuthenticationWebRequestHandlerKeyring:
Expand Down
11 changes: 10 additions & 1 deletion cloudsmith_cli/cli/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from ..core.api.exceptions import ApiException
from ..core.api.init import initialise_api
from ..core.credentials import CredentialResult
from ..core.keyring import store_sso_tokens
from .saml import exchange_2fa_token

Expand Down Expand Up @@ -79,7 +80,15 @@ def refresh_api_config_after_auth(self):
user_agent=getattr(self.api_opts, "user_agent", None),
headers=getattr(self.api_opts, "headers", None),
rate_limit=getattr(self.api_opts, "rate_limit", True),
access_token=self.sso_access_token,
credential=(
CredentialResult(
api_key=self.sso_access_token,
source_name="sso",
auth_type="bearer",
)
if self.sso_access_token
else None
),
)

def finish_request(self, request, client_address):
Expand Down
74 changes: 10 additions & 64 deletions cloudsmith_cli/core/api/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@
import click
import cloudsmith_api

from ...cli import saml
from .. import keyring
from ..rest import RestClient
from .exceptions import ApiException


def initialise_api(
debug=False,
host=None,
key=None,
credential=None,
proxy=None,
ssl_verify=True,
user_agent=None,
Expand All @@ -26,7 +23,6 @@ def initialise_api(
error_retry_backoff=None,
error_retry_codes=None,
error_retry_cb=None,
access_token=None,
):
"""Initialise the cloudsmith_api.Configuration."""
# FIXME: pylint: disable=too-many-arguments
Expand All @@ -45,65 +41,15 @@ def initialise_api(
config.verify_ssl = ssl_verify
config.client_side_validation = False

# Use directly provided access token (e.g. from SSO callback),
# or fall back to keyring lookup if enabled.
if not access_token:
access_token = keyring.get_access_token(config.host)

if access_token:
auth_header = config.headers.get("Authorization")

# overwrite auth header if empty or is basic auth without username or password
if not auth_header or auth_header == config.get_basic_auth_token():
refresh_token = keyring.get_refresh_token(config.host)

try:
if keyring.should_refresh_access_token(config.host):
new_access_token, new_refresh_token = saml.refresh_access_token(
config.host,
access_token,
refresh_token,
session=saml.create_configured_session(config),
)
keyring.store_sso_tokens(
config.host, new_access_token, new_refresh_token
)
# Use the new tokens
access_token = new_access_token
except ApiException:
keyring.update_refresh_attempted_at(config.host)

click.secho(
"An error occurred when attempting to refresh your SSO access token. To refresh this session, run 'cloudsmith auth'",
fg="yellow",
err=True,
)

# Clear access_token to prevent using expired token
access_token = None

# Fall back to API key auth if available
if key:
click.secho(
"Falling back to API key authentication.",
fg="yellow",
err=True,
)
config.api_key["X-Api-Key"] = key

# Only use SSO token if refresh didn't fail
if access_token:
config.headers["Authorization"] = "Bearer {access_token}".format(
access_token=access_token
)

if config.debug:
click.echo("SSO access token config value set")
elif key:
config.api_key["X-Api-Key"] = key

if config.debug:
click.echo("User API key config value set")
if credential:
if credential.auth_type == "bearer":
config.headers["Authorization"] = f"Bearer {credential.api_key}"
if config.debug:
click.echo("SSO access token config value set")
else:
config.api_key["X-Api-Key"] = credential.api_key
if config.debug:
click.echo("User API key config value set")

auth_header = headers and config.headers.get("Authorization")
if auth_header and " " in auth_header:
Expand Down
Loading
Loading