Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ dev: deps
IRI_API_ADAPTER_compute=app.demo_adapter.DemoAdapter \
IRI_API_ADAPTER_filesystem=app.demo_adapter.DemoAdapter \
IRI_API_ADAPTER_task=app.demo_adapter.DemoAdapter \
DEMO_QUEUE_UPDATE_SECS=2 \
OPENTELEMETRY_ENABLED=true \
API_URL_ROOT='http://127.0.0.1:8000' fastapi dev
API_URL_ROOT='http://localhost:8000' fastapi dev

.PHONY: clean
clean:
Expand Down
128 changes: 91 additions & 37 deletions app/demo_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing import Any, Tuple

from fastapi import HTTPException
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel

from .routers.account import facility_adapter as account_adapter
Expand All @@ -29,7 +30,7 @@
from .types.models import Capability
from .types.scalars import AllocationUnit

DEMO_QUEUE_UPDATE_SECS = 5
DEMO_QUEUE_UPDATE_SECS = int(os.environ.get("DEMO_QUEUE_UPDATE_SECS", 5))


def paginate_list(items, offset: int | None, limit: int | None):
Expand Down Expand Up @@ -136,14 +137,14 @@ def _init_state(self):

day_ago = utc_now() - datetime.timedelta(days=1)
self.capabilities = {
"cpu": Capability(id=str(uuid.uuid4()), name="CPU Nodes", units=[AllocationUnit.node_hours]),
"gpu": Capability(id=str(uuid.uuid4()), name="GPU Nodes", units=[AllocationUnit.node_hours]),
"hpss": Capability(id=str(uuid.uuid4()), name="Tape Storage", units=[AllocationUnit.bytes, AllocationUnit.inodes]),
"gpfs": Capability(id=str(uuid.uuid4()), name="GPFS Storage", units=[AllocationUnit.bytes, AllocationUnit.inodes]),
"cpu": Capability(id=demo_uuid("capability", "cpu"), name="CPU Nodes", units=[AllocationUnit.node_hours]),
"gpu": Capability(id=demo_uuid("capability", "gpu"), name="GPU Nodes", units=[AllocationUnit.node_hours]),
"hpss": Capability(id=demo_uuid("capability", "hpss"), name="Tape Storage", units=[AllocationUnit.bytes, AllocationUnit.inodes]),
"gpfs": Capability(id=demo_uuid("capability", "gpfs"), name="GPFS Storage", units=[AllocationUnit.bytes, AllocationUnit.inodes]),
}

pm = status_models.Resource(
id=str(uuid.uuid4()),
id=demo_uuid("resource", "perlmutter_compute_nodes"),
site_id=site1.id,
group="perlmutter",
name="compute nodes",
Expand All @@ -158,7 +159,7 @@ def _init_state(self):
)

hpss = status_models.Resource(
id=str(uuid.uuid4()),
id=demo_uuid("resource", "hpss"),
site_id=site1.id,
group="hpss",
name="hpss",
Expand All @@ -170,7 +171,7 @@ def _init_state(self):
)

cfs = status_models.Resource(
id=str(uuid.uuid4()),
id=demo_uuid("resource", "cfs"),
site_id=site1.id,
group="cfs",
name="cfs",
Expand All @@ -182,7 +183,7 @@ def _init_state(self):
)

login = status_models.Resource(
id=str(uuid.uuid4()),
id=demo_uuid("resource", "login_nodes"),
site_id=site2.id,
group="perlmutter",
name="login nodes",
Expand All @@ -194,7 +195,7 @@ def _init_state(self):
)

iris = status_models.Resource(
id=str(uuid.uuid4()),
id=demo_uuid("resource", "iris"),
site_id=site2.id,
group="services",
name="Iris",
Expand All @@ -205,7 +206,7 @@ def _init_state(self):
resource_type=status_models.ResourceType.website,
)
sfapi = status_models.Resource(
id=str(uuid.uuid4()),
id=demo_uuid("resource", "sfapi"),
site_id=site2.id,
group="services",
name="sfapi",
Expand All @@ -224,23 +225,25 @@ def _init_state(self):

self.projects = [
account_models.Project(
id=str(uuid.uuid4()),
id=demo_uuid("project", "staff_research"),
name="Staff research project",
description="Compute and storage allocation for staff research use",
user_ids=["gtorok"],
last_modified=day_ago,
),
account_models.Project(
id=str(uuid.uuid4()),
id=demo_uuid("project", "test_project"),
name="Test project",
description="Compute and storage allocation for testing use",
user_ids=["gtorok"],
last_modified=day_ago,
),
]

for p in self.projects:
for c in self.capabilities.values():
pa = account_models.ProjectAllocation(
id=str(uuid.uuid4()),
id=demo_uuid("project_allocation", f"{p.id}_{c.id}"),
project_id=p.id,
capability_id=c.id,
entries=[
Expand All @@ -255,7 +258,7 @@ def _init_state(self):
self.project_allocations.append(pa)
self.user_allocations.append(
account_models.UserAllocation(
id=str(uuid.uuid4()),
id=demo_uuid("user_allocation", f"{pa.id}_gtorok"),
project_id=pa.project_id,
project_allocation_id=pa.id,
user_id="gtorok",
Expand All @@ -274,7 +277,7 @@ def _init_state(self):
r = random.choice(self.resources)
status = statuses[r.name]
event = status_models.Event(
id=str(uuid.uuid4()),
id=demo_uuid("event", f"{r.name}_{d.isoformat()}"),
name=f"{r.name} is {status.value}",
description=f"{r.name} is {status.value}",
occurred_at=d,
Expand All @@ -298,7 +301,7 @@ def _init_state(self):
statuses[r.name] = status_models.Status.down
dstr = d.strftime("%Y-%m-%d %H:%M:%S.%f%z")
incident = status_models.Incident(
id=str(uuid.uuid4()),
id=demo_uuid("incident", f"{r.name}_{dstr}"),
name=f"{r.name} incident at {dstr}",
description=f"{r.name} incident at {dstr}",
status=status_models.Status.down,
Expand Down Expand Up @@ -470,9 +473,11 @@ async def get_user(
client_ip: str | None,
) -> account_models.User:
if user_id != self.user.id:
raise HTTPException(status_code=401, detail="User not found")
raise HTTPException(status_code=403, detail="User not found")
if api_key.startswith("Bearer "):
api_key = api_key[len("Bearer ") :]
if api_key != self.user.api_key:
raise HTTPException(status_code=403, detail="Invalid API key")
raise HTTPException(status_code=401, detail="Invalid API key")
return self.user

async def get_projects(self: "DemoAdapter", user: account_models.User) -> list[account_models.Project]:
Expand Down Expand Up @@ -683,19 +688,27 @@ def _headtail(
path: str,
file_bytes: int | None,
lines: int | None,
skip_heading: bool = False,
) -> Tuple[Any, int]:
args = [cmd]
if file_bytes:
args.append("-c")
args.append(str(file_bytes))
elif lines:
args.append("-n")
args.append(str(lines))

if cmd == "tail" and skip_heading:
if file_bytes is not None:
args.extend(["-c", f"+{file_bytes + 1}"])
elif lines is not None:
args.extend(["-n", f"+{lines + 1}"])
else:
if file_bytes is not None:
args.extend(["-c", str(file_bytes)])
elif lines is not None:
args.extend(["-n", str(lines)])

rp = self.validate_path(path)
args.append(rp)

result = subprocess.run(args, capture_output=True, text=True)
content = result.stdout
return content, -len(content)
return content, len(content)

async def head(
self: "DemoAdapter",
Expand All @@ -705,11 +718,42 @@ async def head(
file_bytes: int | None,
lines: int | None,
skip_trailing: bool,
) -> Tuple[Any, int]:
return self._headtail("head", path, file_bytes, lines)
) -> filesystem_models.GetFileHeadResponse:
content, offset = self._headtail("head", path, file_bytes, lines)

fc = filesystem_models.FileContent(
content=content,
content_type=(filesystem_models.ContentUnit.bytes
if file_bytes is not None
else filesystem_models.ContentUnit.lines),
start_position=0,
end_position=len(content))

return filesystem_models.GetFileHeadResponse(output=fc, offset=offset)

async def tail(
self: "DemoAdapter",
resource: status_models.Resource,
user: account_models.User,
path: str,
file_bytes: int | None,
lines: int | None,
skip_heading: bool,
) -> filesystem_models.GetFileTailResponse:

content, offset = self._headtail("tail", path, file_bytes, lines, skip_heading=skip_heading)

fc = filesystem_models.FileContent(
content=content,
content_type=(filesystem_models.ContentUnit.bytes
if file_bytes is not None
else filesystem_models.ContentUnit.lines),
start_position=0,
end_position=len(content))

return filesystem_models.GetFileTailResponse(output=fc, offset=offset)


async def tail(self: "DemoAdapter", resource: status_models.Resource, user: account_models.User, path: str, file_bytes: int | None, lines: int | None, skip_trailing: bool) -> Tuple[Any, int]:
return self._headtail("tail", path, file_bytes, lines)

async def view(self: "DemoAdapter", resource: status_models.Resource, user: account_models.User, path: str, size: int, offset: int) -> filesystem_models.GetViewFileResponse:
rp = self.validate_path(path)
Expand Down Expand Up @@ -762,12 +806,12 @@ async def rm(
resource: status_models.Resource,
user: account_models.User,
path: str,
):
) -> filesystem_models.RemoveResponse:
rp = self.validate_path(path)
if rp == PathSandbox.get_base_temp_dir():
raise HTTPException(status_code=400, detail="Cannot delete sandbox")
subprocess.run(["rm", "-rf", rp], check=True)
return None
return filesystem_models.RemoveResponse(output=f"Removed {rp}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? If rm fails you already get an error. What additional benefit do we get from returning a static string? Same comment for put files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not required, but I find it useful to have a structured format returned for all Filesystem APIs (It keeps results structured). While it makes sense in synchronous calls to return 204, maybe results could be:

{ "path": "/foo", "deleted": true }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see - yes for async calls we do need something. Thanks!


async def mkdir(self: "DemoAdapter", resource: status_models.Resource, user: account_models.User, request_model: filesystem_models.PostMakeDirRequest) -> filesystem_models.PostMkdirResponse:
rp = self.validate_path(request_model.path)
Expand All @@ -786,23 +830,26 @@ async def symlink(
subprocess.run(["ln", "-s", rp_src, rp_dst], check=True)
return filesystem_models.PostFileSymlinkResponse(output=self._file(rp_dst))

async def download(self: "DemoAdapter", resource: status_models.Resource, user: account_models.User, path: str) -> Any:
async def download(self: "DemoAdapter", resource: status_models.Resource, user: account_models.User, path: str) -> filesystem_models.GetFileDownloadResponse:
rp = self.validate_path(path)
raw_content = pathlib.Path(rp).read_bytes()

if len(raw_content) > filesystem_adapter.OPS_SIZE_LIMIT:
raise Exception("File to download is too large.")

return base64.b64encode(raw_content).decode("utf-8")
return filesystem_models.GetFileDownloadResponse(
output=base64.b64encode(raw_content).decode("utf-8"),
)

async def upload(self: "DemoAdapter", resource: status_models.Resource, user: account_models.User, path: str, content: str) -> None:
async def upload(self: "DemoAdapter", resource: status_models.Resource, user: account_models.User, path: str, content: str) -> filesystem_models.PutFileUploadResponse:
rp = self.validate_path(path)
if isinstance(content, bytes):
pathlib.Path(rp).write_bytes(content)
elif isinstance(content, str):
pathlib.Path(rp).write_bytes(base64.b64decode(content))
else:
raise Exception(f"Don't know how to handle variable of type: {type(content)}")
return filesystem_models.PutFileUploadResponse(output=f"Uploaded to {rp}")

async def compress(
self: "DemoAdapter", resource: status_models.Resource, user: account_models.User, request_model: filesystem_models.PostCompressRequest
Expand Down Expand Up @@ -835,6 +882,13 @@ async def extract(self: "DemoAdapter", resource: status_models.Resource, user: a
src_rp = self.validate_path(request_model.path)
dst_rp = self.validate_path(request_model.target_path)

if os.path.exists(dst_rp):
if os.path.isdir(dst_rp):
raise Exception(f"Target path already exists: {request_model.target_path}")
else:
raise Exception(f"Target path already exists and is not a directory: {request_model.target_path}")
os.makedirs(dst_rp)

args = ["tar"]
if request_model.compression == filesystem_models.CompressionType.gzip:
args.append("-xzf")
Expand Down Expand Up @@ -896,7 +950,7 @@ class DemoTask(BaseModel):
user: account_models.User
start: float
status: task_models.TaskStatus = task_models.TaskStatus.pending
result: str | None = None
result: Any | None = None


class DemoTaskQueue:
Expand All @@ -916,7 +970,7 @@ async def _process_tasks(da: DemoAdapter):
elif t.status == task_models.TaskStatus.active and now - t.start > DEMO_QUEUE_UPDATE_SECS:
cmd = task_models.TaskCommand.model_validate_json(t.task)
(result, status) = await DemoAdapter.on_task(t.resource, t.user, cmd)
t.result = result
t.result = jsonable_encoder(result)
t.status = status
_tasks.append(t)
DemoTaskQueue.tasks = _tasks
Expand Down
2 changes: 2 additions & 0 deletions app/routers/account/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ async def get_project_allocation(
raise HTTPException(status_code=404, detail="User not found")
projects = await router.adapter.get_projects(user=user)
project = next((p for p in projects if p.id == project_id), None)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
pas = await router.adapter.get_project_allocations(project=project, user=user)
pa = next((pa for pa in pas if pa.id == project_allocation_id), None)
if not pa:
Expand Down
15 changes: 14 additions & 1 deletion app/routers/account/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from pydantic import Field, computed_field
import datetime

from pydantic import Field, computed_field, field_validator

from ... import config
from ...types.base import IRIBaseModel
Expand All @@ -23,6 +25,17 @@ class Project(IRIBaseModel):
description: str
user_ids: list[str]

@field_validator("last_modified", mode="before")
@classmethod
def _norm_dt_field(cls, v):
return cls.normalize_dt(v)

last_modified: datetime.datetime

@computed_field(description="URI to this project resource")
@property
def self_uri(self) -> str:
return f"{config.API_URL_ROOT}{config.API_PREFIX}{config.API_URL}/account/projects/{self.id}"

class AllocationEntry(IRIBaseModel):
"""Base class for allocations."""
Expand Down
Loading