Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ This library uses semantic versioning and follows Okta's [Library Version Policy

The latest release can always be found on the [releases page][github-releases].

**NOTE:** We have implemented DPoP and the support is now available in the latest version of the SDK.

## Need help?

If you run into problems using the SDK, you can:
Expand Down
24 changes: 18 additions & 6 deletions okta/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,24 @@ def __init__(
# use default configuration if none is provided
if configuration is None:
configuration = Configuration.get_default()
self.configuration = Configuration(
host=configuration["client"]["orgUrl"],
access_token=configuration["client"]["token"],
api_key=configuration["client"].get("privateKey", None),
authorization_mode=configuration["client"].get("authorizationMode", "SSWS"),
)

# Build Configuration with DPoP support if present
config_params = {
"host": configuration["client"]["orgUrl"],
"access_token": configuration["client"].get("token", None), # Use .get() to handle PrivateKey mode
"api_key": configuration["client"].get("privateKey", None),
"authorization_mode": configuration["client"].get("authorizationMode", "SSWS"),
}

# Add DPoP parameters if enabled
if configuration["client"].get("dpopEnabled", False):
config_params.update({
"dpop_enabled": True,
"dpop_private_key": configuration["client"].get("privateKey"),
"dpop_key_rotation_interval": configuration["client"].get("dpopKeyRotationInterval", 86400),
})

self.configuration = Configuration(**config_params)

if self.configuration.event_listeners is not None:
if len(self.configuration.event_listeners["call_api_started"]) > 0:
Expand Down
6 changes: 6 additions & 0 deletions okta/cache/no_op_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ class NoOpCache(Cache):
This is a disabled Cache Class where no operations occur
in the cache.
Implementing the okta.cache.cache.Cache abstract class.

.. warning::
**DPoP Performance Impact**: When using DPoP (Demonstrating Proof-of-Possession)
authentication with NoOpCache, OAuth tokens will be regenerated on every request
instead of being cached. This may significantly impact performance and could
trigger rate limits. Consider using OktaCache instead for production DPoP usage.
"""

def __init__(self):
Expand Down
16 changes: 16 additions & 0 deletions okta/cache/okta_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,22 @@ class OktaCache(Cache):
"""
This is a base class implementing a Cache using TTL and TTI.
Implementing the okta.cache.cache.Cache abstract class.

THREAD SAFETY WARNING:
---------------------
This cache implementation is NOT thread-safe and should only be used in
single-threaded or single-coroutine contexts. In concurrent environments
(e.g., asyncio with multiple coroutines accessing the same cache instance),
race conditions may occur during cache operations.

For multi-threaded applications, consider:
1. Using threading.local() to create per-thread cache instances
2. Implementing a thread-safe cache wrapper with locks
3. Using an external cache (Redis, Memcached) for distributed scenarios

The default SDK usage pattern (one client instance per thread/coroutine)
is safe. Issues only arise when sharing a single client across multiple
concurrent execution contexts.
"""

def __init__(self, ttl, tti):
Expand Down
6 changes: 6 additions & 0 deletions okta/config/config_setter.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ def get_config(self):
Returns a deep copy to prevent external modification of internal state
and to avoid holding references to sensitive values.

NOTE: Deep copying creates duplicate copies of all config data in memory,
including private keys. While this prevents external mutation, it means
sensitive data may be duplicated. Callers should not store this config
unnecessarily and should allow Python's garbage collector to clean up
the copy when no longer needed.

Returns:
dict -- Deep copy of the client configuration dictionary
"""
Expand Down
59 changes: 58 additions & 1 deletion okta/config/config_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
# See the License for the specific language governing permissions and limitations under the License.
# coding: utf-8

from okta.constants import FINDING_OKTA_DOMAIN, REPO_URL
import logging

from okta.constants import FINDING_OKTA_DOMAIN, REPO_URL, MIN_DPOP_KEY_ROTATION_SECONDS, MAX_DPOP_KEY_ROTATION_SECONDS
from okta.error_messages import (
ERROR_MESSAGE_ORG_URL_MISSING,
ERROR_MESSAGE_API_TOKEN_DEFAULT,
Expand All @@ -26,6 +28,8 @@
ERROR_MESSAGE_PROXY_INVALID_PORT,
)

logger = logging.getLogger("okta-sdk-python")


class ConfigValidator:
"""
Expand Down Expand Up @@ -70,6 +74,8 @@ def validate_config(self):
]
client_fields_values = [client.get(field, "") for field in client_fields]
errors += self._validate_client_fields(*client_fields_values)
# Validate DPoP configuration if enabled
errors += self._validate_dpop_config(client)
else: # Not a valid authorization mode
errors += [
(
Expand Down Expand Up @@ -226,3 +232,54 @@ def _validate_proxy_settings(self, proxy):
proxy_errors.append(ERROR_MESSAGE_PROXY_INVALID_PORT)

return proxy_errors

def _validate_dpop_config(self, client):
"""
Validate DPoP-specific configuration.

Note: This method is only called when authorizationMode is 'PrivateKey',
so no need to re-check the auth mode here.

Args:
client: Client configuration dict

Returns:
list: List of error messages (empty if valid)
"""

errors = []

if not client.get('dpopEnabled'):
return errors # DPoP not enabled, nothing to validate

# Validate key rotation interval
rotation_interval = client.get('dpopKeyRotationInterval', 86400)

if not isinstance(rotation_interval, int):
errors.append(
f"dpopKeyRotationInterval must be an integer (seconds), "
f"but got {type(rotation_interval).__name__}"
)
elif rotation_interval < MIN_DPOP_KEY_ROTATION_SECONDS: # Minimum 1 hour
errors.append(
f"dpopKeyRotationInterval must be at least {MIN_DPOP_KEY_ROTATION_SECONDS} seconds (1 hour), "
f"but got {rotation_interval} seconds. "
"Shorter intervals may cause performance issues."
)
elif rotation_interval > MAX_DPOP_KEY_ROTATION_SECONDS: # Maximum 90 days
errors.append(
f"dpopKeyRotationInterval must be at most {MAX_DPOP_KEY_ROTATION_SECONDS} seconds "
f"({MAX_DPOP_KEY_ROTATION_SECONDS // 86400} days), "
f"but got {rotation_interval} seconds ({rotation_interval // 86400} days). "
"Excessive rotation intervals defeat the security purpose of DPoP. "
"Recommended: 24-48 hours for production use."
)
elif rotation_interval > 7 * 24 * 3600: # Warning for > 7 days
# This is a warning, not an error
logger.warning(
f"dpopKeyRotationInterval is very long ({rotation_interval} seconds, "
f"{rotation_interval // 86400} days). "
"Consider shorter intervals (24-48 hours) for better security."
)

return errors
19 changes: 19 additions & 0 deletions okta/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ class Configuration:
:param ssl_ca_cert: str - the path to a file of concatenated CA certificates
in PEM format.

warning::
**Thread Safety**: Configuration objects should be treated as immutable
after initialization. Modifying configuration attributes after passing
to Client/ApiClient may result in undefined behavior in multi-threaded
or async environments.

:Example:

API Key Authentication Example.
Expand Down Expand Up @@ -109,6 +115,9 @@ def __init__(
server_operation_variables=None,
ssl_ca_cert=None,
authorization_mode=None,
dpop_enabled=False,
dpop_private_key=None,
dpop_key_rotation_interval=86400,
) -> None:
"""Constructor"""
self._base_path = "https://subdomain.okta.com" if host is None else host
Expand Down Expand Up @@ -148,6 +157,16 @@ def __init__(
self.access_token = access_token
"""Access token
"""
# DPoP Settings
self.dpop_enabled = dpop_enabled
"""Enable DPoP (Demonstrating Proof-of-Possession) per RFC 9449
"""
self.dpop_private_key = dpop_private_key
"""Private key for DPoP proof generation
"""
self.dpop_key_rotation_interval = dpop_key_rotation_interval
"""Key rotation interval in seconds (default: 86400 = 24 hours)
"""
self.logger = {}
"""Logging Settings
"""
Expand Down
7 changes: 7 additions & 0 deletions okta/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,10 @@

SWA_APP_NAME = "template_swa"
SWA3_APP_NAME = "template_swa3field"

# DPoP (Demonstrating Proof-of-Possession) constants
MIN_DPOP_KEY_ROTATION_SECONDS = 3600 # 1 hour minimum
MAX_DPOP_KEY_ROTATION_SECONDS = 90 * 24 * 3600 # 90 days maximum
MAX_DPOP_NONCE_RETRIES = 2
MAX_DPOP_BACKOFF_DELAY = 1.0 # Maximum backoff delay in seconds for nonce retries
DPOP_USER_AGENT_EXTENSION = "isDPoP:true"
Loading
Loading