Skip to content
Open
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies = [
# go/keep-sorted start
"PyYAML>=6.0.2, <7.0.0", # For APIHubToolset.
"aiosqlite>=0.21.0", # For SQLite database
"agentic_sandbox @ git+https://github.com/kubernetes-sigs/agent-sandbox.git@dbac66ecba5497ac493ca5e4ab5e0fcb1c945134#subdirectory=clients/python/agentic-sandbox-client", # For Agent Sandboxed Code Execution
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using a direct Git URL with a specific commit hash for a dependency can make dependency management and updates challenging. It ties the project to an immutable state of the external repository, which might not receive updates or security patches easily. Consider if there's a way to depend on a published package with version ranges, or if this is a temporary measure, document the long-term plan for this dependency.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a plan to transition this to a versioned release instead of a direct Git dependency to a specific commit?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we are going to publish the python client to PyPI. This is an interim solution.

"anyio>=4.9.0, <5.0.0", # For MCP Session Manager
"authlib>=1.6.6, <2.0.0", # For RestAPI Tool
"click>=8.1.8, <9.0.0", # For CLI tools
Expand Down
76 changes: 67 additions & 9 deletions src/google/adk/code_executors/gke_code_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import logging
import uuid

from agentic_sandbox import SandboxClient
import kubernetes as k8s
from kubernetes.watch import Watch

Expand All @@ -36,9 +37,19 @@
class GkeCodeExecutor(BaseCodeExecutor):
"""Executes Python code in a secure gVisor-sandboxed Pod on GKE.

This executor securely runs code by dynamically creating a Kubernetes Job for
each execution request. The user's code is mounted via a ConfigMap, and the
Pod is hardened with a strict security context and resource limits.
This executor supports two modes of execution: 'job' and 'sandbox'.

Job Mode (default):
Securely runs code by dynamically creating a Kubernetes Job for each execution
request. The user's code is mounted via a ConfigMap, and the Pod is hardened
with a strict security context and resource limits.

Sandbox Mode:
Executes code using the Agent Sandbox Client. This mode requires additional
infrastructure to be deployed in the cluster, specifically:
- Agent-sandbox controller
- Sandbox templates (e.g., python-sandbox-template)
- Sandbox router and gateway

Key Features:
- Sandboxed execution using the gVisor runtime.
Expand Down Expand Up @@ -70,6 +81,8 @@ class GkeCodeExecutor(BaseCodeExecutor):
namespace: str = "default"
image: str = "python:3.11-slim"
timeout_seconds: int = 300
executor_type: str = "job" # "job" or "sandbox"
sandbox_gateway_name: str | None = None
cpu_requested: str = "200m"
mem_requested: str = "256Mi"
# The maximum CPU the container can use, in "millicores". 1000m is 1 full CPU core.
Expand All @@ -79,11 +92,17 @@ class GkeCodeExecutor(BaseCodeExecutor):
kubeconfig_path: str | None = None
kubeconfig_context: str | None = None

# Sandbox constants
sandbox_template: str | None = None

_batch_v1: k8s.client.BatchV1Api
_core_v1: k8s.client.CoreV1Api

def __init__(
self,
executor_type: str = "job",
sandbox_gateway_name: str | None = None,
sandbox_template: str = "python-sandbox-template",
kubeconfig_path: str | None = None,
kubeconfig_context: str | None = None,
**data,
Expand All @@ -96,9 +115,17 @@ def __init__(
3. Automatically via the default local kubeconfig file (~/.kube/config).
"""
super().__init__(**data)
self.executor_type = executor_type
self.sandbox_gateway_name = sandbox_gateway_name
self.sandbox_template = sandbox_template
self.kubeconfig_path = kubeconfig_path
self.kubeconfig_context = kubeconfig_context

if executor_type not in ["job", "sandbox"]:
raise ValueError(
f"Invalid executor_type: '{executor_type}'. Must be 'job' or"
" 'sandbox'."
)
if self.kubeconfig_path:
try:
logger.info(f"Using explicit kubeconfig from '{self.kubeconfig_path}'.")
Expand Down Expand Up @@ -136,10 +163,28 @@ def __init__(
self._batch_v1 = client.BatchV1Api()
self._core_v1 = client.CoreV1Api()

def execute_code(
self,
invocation_context: InvocationContext,
code_execution_input: CodeExecutionInput,
def _execute_in_sandbox(self, code: str) -> CodeExecutionResult:
"""Executes code using Agent Sandbox Client."""
try:
with SandboxClient(
template_name=self.sandbox_template,
gateway_name=self.sandbox_gateway_name,
namespace=self.namespace,
) as sandbox:
# Execute the code as a python script
logger.debug("Executing code in sandbox:\n```\n%s\n```", code)
sandbox.write("script.py", code)
result = sandbox.run("python3 script.py")

return CodeExecutionResult(stdout=result.stdout, stderr=result.stderr)
except Exception as e:
logger.error("Sandbox execution failed", exc_info=True)
return CodeExecutionResult(
stderr=f"Sandbox execution failed: {str(e)}",
)

def _execute_as_job(
self, code: str, invocation_context: InvocationContext
) -> CodeExecutionResult:
"""Orchestrates the secure execution of a code snippet on GKE."""
job_name = f"adk-exec-{uuid.uuid4().hex[:10]}"
Expand All @@ -150,7 +195,7 @@ def execute_code(
# 1. Create a ConfigMap to mount LLM-generated code into the Pod.
# 2. Create a Job that runs the code from the ConfigMap.
# 3. Set the Job as the ConfigMap's owner for automatic cleanup.
self._create_code_configmap(configmap_name, code_execution_input.code)
self._create_code_configmap(configmap_name, code)
job_manifest = self._create_job_manifest(
job_name, configmap_name, invocation_context
)
Expand All @@ -162,7 +207,7 @@ def execute_code(
logger.info(
f"Submitted Job '{job_name}' to namespace '{self.namespace}'."
)
logger.debug("Executing code:\n```\n%s\n```", code_execution_input.code)
logger.debug("Executing code:\n```\n%s\n```", code)
return self._watch_job_completion(job_name)

except ApiException as e:
Expand All @@ -186,6 +231,19 @@ def execute_code(
stderr=f"An unexpected executor error occurred: {e}"
)

def execute_code(
self,
invocation_context: InvocationContext,
code_execution_input: CodeExecutionInput,
) -> CodeExecutionResult:
"""Overrides the base method to route execution based on executor_type."""
code = code_execution_input.code
if self.executor_type == "sandbox":
return self._execute_in_sandbox(code)
else:
# Fallback to existing GKE Job logic
return self._execute_as_job(code, invocation_context)

def _create_job_manifest(
self,
job_name: str,
Expand Down
137 changes: 137 additions & 0 deletions tests/unittests/code_executors/test_gke_code_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def test_init_defaults(self):
assert executor.timeout_seconds == 300
assert executor.cpu_requested == "200m"
assert executor.mem_limit == "512Mi"
assert executor.executor_type == "job"

def test_init_with_overrides(self):
"""Tests that class attributes can be overridden at instantiation."""
Expand All @@ -79,11 +80,19 @@ def test_init_with_overrides(self):
image="custom-python:latest",
timeout_seconds=60,
cpu_limit="1000m",
executor_type="sandbox",
)
assert executor.namespace == "test-ns"
assert executor.image == "custom-python:latest"
assert executor.timeout_seconds == 60
assert executor.cpu_limit == "1000m"
assert executor.executor_type == "sandbox"
assert executor.sandbox_template == "python-sandbox-template"

def test_init_invalid_executor_type(self):
"""Tests that init raises ValueError for invalid executor_type."""
with pytest.raises(ValueError, match="Invalid executor_type"):
GkeCodeExecutor(executor_type="invalid_type")

@patch("google.adk.code_executors.gke_code_executor.Watch")
def test_execute_code_success(
Expand Down Expand Up @@ -225,3 +234,131 @@ def test_create_job_manifest_structure(self, mock_invocation_context):
assert sec_context.allow_privilege_escalation is False
assert sec_context.read_only_root_filesystem is True
assert sec_context.capabilities.drop == ["ALL"]

@patch("google.adk.code_executors.gke_code_executor.SandboxClient")
def test_execute_code_forks_to_sandbox(
self,
mock_sandbox_client,
mock_invocation_context,
mock_k8s_clients,
):
"""Tests that execute_code uses SandboxClient when executor_type='sandbox'."""
# Setup Sandbox mock
mock_sandbox_instance = (
mock_sandbox_client.return_value.__enter__.return_value
)
mock_run_result = MagicMock()
mock_run_result.stdout = "sandbox stdout"
mock_run_result.stderr = None
mock_sandbox_instance.run.return_value = mock_run_result

# Instantiate with sandbox type
executor = GkeCodeExecutor(executor_type="sandbox")
code_input = CodeExecutionInput(code='print("sandbox")')

# Execute
result = executor.execute_code(mock_invocation_context, code_input)

# Assertions
assert result.stdout == "sandbox stdout"

# Verify SandboxClient was used
mock_sandbox_client.assert_called_once()
mock_sandbox_instance.run.assert_called_once()

# Verify Job path was NOT taken
mock_k8s_clients["batch_v1"].create_namespaced_job.assert_not_called()

@patch("google.adk.code_executors.gke_code_executor.SandboxClient")
def test_execute_code_sandbox_exception(
self,
mock_sandbox_client,
mock_invocation_context,
):
"""Tests handling of exceptions from SandboxClient."""
# Setup Sandbox mock to raise exception
mock_sandbox_client.return_value.__enter__.side_effect = Exception(
"Connection failed"
)

# Instantiate with sandbox type
executor = GkeCodeExecutor(executor_type="sandbox")
code_input = CodeExecutionInput(code='print("sandbox")')

# Execute
result = executor.execute_code(mock_invocation_context, code_input)

# Assertions
assert result.stdout == ""
assert "Sandbox execution failed: Connection failed" in result.stderr

@patch("google.adk.code_executors.gke_code_executor.SandboxClient")
@patch("google.adk.code_executors.gke_code_executor.Watch")
def test_execute_code_forks_to_job(
self,
mock_watch,
mock_sandbox_client,
mock_invocation_context,
mock_k8s_clients,
):
"""Tests that execute_code uses K8s Job when executor_type='job'."""
# Setup K8s Job mocks (success path)
mock_job = MagicMock()
mock_job.status.succeeded = True
mock_watch.return_value.stream.return_value = [{"object": mock_job}]

mock_pod = MagicMock()
mock_pod.metadata.name = "pod-1"
mock_k8s_clients["core_v1"].list_namespaced_pod.return_value.items = [
mock_pod
]
mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = (
"job stdout"
)

# Instantiate with job type
executor = GkeCodeExecutor(executor_type="job")
code_input = CodeExecutionInput(code='print("job")')

# Execute
result = executor.execute_code(mock_invocation_context, code_input)

# Assertions
assert result.stdout == "job stdout"

# Verify Job path WAS taken
mock_k8s_clients["batch_v1"].create_namespaced_job.assert_called_once()

# Verify SandboxClient was NOT used
mock_sandbox_client.assert_not_called()

@patch("google.adk.code_executors.gke_code_executor.SandboxClient")
def test_execute_in_sandbox_returns_stderr(
self,
mock_sandbox_client,
mock_invocation_context,
):
"""Tests that stderr from the sandbox run is propagated to the result."""
# Setup Sandbox mock
mock_sandbox_instance = (
mock_sandbox_client.return_value.__enter__.return_value
)
mock_run_result = MagicMock()
mock_run_result.stdout = ""
mock_run_result.stderr = "oops\n"
mock_sandbox_instance.run.return_value = mock_run_result

# Instantiate with sandbox type
executor = GkeCodeExecutor(executor_type="sandbox")
code_input = CodeExecutionInput(
code="import sys; print('oops', file=sys.stderr)"
)

# Execute
result = executor.execute_code(mock_invocation_context, code_input)

# Assertions
assert result.stdout == ""
assert result.stderr == "oops\n"
mock_sandbox_instance.write.assert_called_with("script.py", code_input.code)
mock_sandbox_instance.run.assert_called_with("python3 script.py")
Loading