diff --git a/Makefile b/Makefile index e9bb6ff..60f4781 100644 --- a/Makefile +++ b/Makefile @@ -33,8 +33,9 @@ dev: deps IRI_API_ADAPTER_compute=app.demo_adapter.DemoAdapter \ IRI_API_ADAPTER_filesystem=app.demo_adapter.DemoAdapter \ IRI_API_ADAPTER_task=app.demo_adapter.DemoAdapter \ + DEMO_QUEUE_UPDATE_SECS=2 \ OPENTELEMETRY_ENABLED=true \ - API_URL_ROOT='http://127.0.0.1:8000' fastapi dev + API_URL_ROOT='http://localhost:8000' fastapi dev .PHONY: clean clean: diff --git a/app/demo_adapter.py b/app/demo_adapter.py index 9d217ba..fa2d9cf 100644 --- a/app/demo_adapter.py +++ b/app/demo_adapter.py @@ -12,6 +12,7 @@ from typing import Any, Tuple from fastapi import HTTPException +from fastapi.encoders import jsonable_encoder from pydantic import BaseModel from .routers.account import facility_adapter as account_adapter @@ -29,7 +30,7 @@ from .types.models import Capability from .types.scalars import AllocationUnit -DEMO_QUEUE_UPDATE_SECS = 5 +DEMO_QUEUE_UPDATE_SECS = int(os.environ.get("DEMO_QUEUE_UPDATE_SECS", 5)) def paginate_list(items, offset: int | None, limit: int | None): @@ -136,14 +137,14 @@ def _init_state(self): day_ago = utc_now() - datetime.timedelta(days=1) self.capabilities = { - "cpu": Capability(id=str(uuid.uuid4()), name="CPU Nodes", units=[AllocationUnit.node_hours]), - "gpu": Capability(id=str(uuid.uuid4()), name="GPU Nodes", units=[AllocationUnit.node_hours]), - "hpss": Capability(id=str(uuid.uuid4()), name="Tape Storage", units=[AllocationUnit.bytes, AllocationUnit.inodes]), - "gpfs": Capability(id=str(uuid.uuid4()), name="GPFS Storage", units=[AllocationUnit.bytes, AllocationUnit.inodes]), + "cpu": Capability(id=demo_uuid("capability", "cpu"), name="CPU Nodes", units=[AllocationUnit.node_hours]), + "gpu": Capability(id=demo_uuid("capability", "gpu"), name="GPU Nodes", units=[AllocationUnit.node_hours]), + "hpss": Capability(id=demo_uuid("capability", "hpss"), name="Tape Storage", units=[AllocationUnit.bytes, AllocationUnit.inodes]), + "gpfs": Capability(id=demo_uuid("capability", "gpfs"), name="GPFS Storage", units=[AllocationUnit.bytes, AllocationUnit.inodes]), } pm = status_models.Resource( - id=str(uuid.uuid4()), + id=demo_uuid("resource", "perlmutter_compute_nodes"), site_id=site1.id, group="perlmutter", name="compute nodes", @@ -158,7 +159,7 @@ def _init_state(self): ) hpss = status_models.Resource( - id=str(uuid.uuid4()), + id=demo_uuid("resource", "hpss"), site_id=site1.id, group="hpss", name="hpss", @@ -170,7 +171,7 @@ def _init_state(self): ) cfs = status_models.Resource( - id=str(uuid.uuid4()), + id=demo_uuid("resource", "cfs"), site_id=site1.id, group="cfs", name="cfs", @@ -182,7 +183,7 @@ def _init_state(self): ) login = status_models.Resource( - id=str(uuid.uuid4()), + id=demo_uuid("resource", "login_nodes"), site_id=site2.id, group="perlmutter", name="login nodes", @@ -194,7 +195,7 @@ def _init_state(self): ) iris = status_models.Resource( - id=str(uuid.uuid4()), + id=demo_uuid("resource", "iris"), site_id=site2.id, group="services", name="Iris", @@ -205,7 +206,7 @@ def _init_state(self): resource_type=status_models.ResourceType.website, ) sfapi = status_models.Resource( - id=str(uuid.uuid4()), + id=demo_uuid("resource", "sfapi"), site_id=site2.id, group="services", name="sfapi", @@ -224,23 +225,25 @@ def _init_state(self): self.projects = [ account_models.Project( - id=str(uuid.uuid4()), + id=demo_uuid("project", "staff_research"), name="Staff research project", description="Compute and storage allocation for staff research use", user_ids=["gtorok"], + last_modified=day_ago, ), account_models.Project( - id=str(uuid.uuid4()), + id=demo_uuid("project", "test_project"), name="Test project", description="Compute and storage allocation for testing use", user_ids=["gtorok"], + last_modified=day_ago, ), ] for p in self.projects: for c in self.capabilities.values(): pa = account_models.ProjectAllocation( - id=str(uuid.uuid4()), + id=demo_uuid("project_allocation", f"{p.id}_{c.id}"), project_id=p.id, capability_id=c.id, entries=[ @@ -255,7 +258,7 @@ def _init_state(self): self.project_allocations.append(pa) self.user_allocations.append( account_models.UserAllocation( - id=str(uuid.uuid4()), + id=demo_uuid("user_allocation", f"{pa.id}_gtorok"), project_id=pa.project_id, project_allocation_id=pa.id, user_id="gtorok", @@ -274,7 +277,7 @@ def _init_state(self): r = random.choice(self.resources) status = statuses[r.name] event = status_models.Event( - id=str(uuid.uuid4()), + id=demo_uuid("event", f"{r.name}_{d.isoformat()}"), name=f"{r.name} is {status.value}", description=f"{r.name} is {status.value}", occurred_at=d, @@ -298,7 +301,7 @@ def _init_state(self): statuses[r.name] = status_models.Status.down dstr = d.strftime("%Y-%m-%d %H:%M:%S.%f%z") incident = status_models.Incident( - id=str(uuid.uuid4()), + id=demo_uuid("incident", f"{r.name}_{dstr}"), name=f"{r.name} incident at {dstr}", description=f"{r.name} incident at {dstr}", status=status_models.Status.down, @@ -470,9 +473,11 @@ async def get_user( client_ip: str | None, ) -> account_models.User: if user_id != self.user.id: - raise HTTPException(status_code=401, detail="User not found") + raise HTTPException(status_code=403, detail="User not found") + if api_key.startswith("Bearer "): + api_key = api_key[len("Bearer ") :] if api_key != self.user.api_key: - raise HTTPException(status_code=403, detail="Invalid API key") + raise HTTPException(status_code=401, detail="Invalid API key") return self.user async def get_projects(self: "DemoAdapter", user: account_models.User) -> list[account_models.Project]: @@ -683,19 +688,27 @@ def _headtail( path: str, file_bytes: int | None, lines: int | None, + skip_heading: bool = False, ) -> Tuple[Any, int]: args = [cmd] - if file_bytes: - args.append("-c") - args.append(str(file_bytes)) - elif lines: - args.append("-n") - args.append(str(lines)) + + if cmd == "tail" and skip_heading: + if file_bytes is not None: + args.extend(["-c", f"+{file_bytes + 1}"]) + elif lines is not None: + args.extend(["-n", f"+{lines + 1}"]) + else: + if file_bytes is not None: + args.extend(["-c", str(file_bytes)]) + elif lines is not None: + args.extend(["-n", str(lines)]) + rp = self.validate_path(path) args.append(rp) + result = subprocess.run(args, capture_output=True, text=True) content = result.stdout - return content, -len(content) + return content, len(content) async def head( self: "DemoAdapter", @@ -705,11 +718,42 @@ async def head( file_bytes: int | None, lines: int | None, skip_trailing: bool, - ) -> Tuple[Any, int]: - return self._headtail("head", path, file_bytes, lines) + ) -> filesystem_models.GetFileHeadResponse: + content, offset = self._headtail("head", path, file_bytes, lines) + + fc = filesystem_models.FileContent( + content=content, + content_type=(filesystem_models.ContentUnit.bytes + if file_bytes is not None + else filesystem_models.ContentUnit.lines), + start_position=0, + end_position=len(content)) + + return filesystem_models.GetFileHeadResponse(output=fc, offset=offset) + + async def tail( + self: "DemoAdapter", + resource: status_models.Resource, + user: account_models.User, + path: str, + file_bytes: int | None, + lines: int | None, + skip_heading: bool, + ) -> filesystem_models.GetFileTailResponse: + + content, offset = self._headtail("tail", path, file_bytes, lines, skip_heading=skip_heading) + + fc = filesystem_models.FileContent( + content=content, + content_type=(filesystem_models.ContentUnit.bytes + if file_bytes is not None + else filesystem_models.ContentUnit.lines), + start_position=0, + end_position=len(content)) + + return filesystem_models.GetFileTailResponse(output=fc, offset=offset) + - async def tail(self: "DemoAdapter", resource: status_models.Resource, user: account_models.User, path: str, file_bytes: int | None, lines: int | None, skip_trailing: bool) -> Tuple[Any, int]: - return self._headtail("tail", path, file_bytes, lines) async def view(self: "DemoAdapter", resource: status_models.Resource, user: account_models.User, path: str, size: int, offset: int) -> filesystem_models.GetViewFileResponse: rp = self.validate_path(path) @@ -762,12 +806,12 @@ async def rm( resource: status_models.Resource, user: account_models.User, path: str, - ): + ) -> filesystem_models.RemoveResponse: rp = self.validate_path(path) if rp == PathSandbox.get_base_temp_dir(): raise HTTPException(status_code=400, detail="Cannot delete sandbox") subprocess.run(["rm", "-rf", rp], check=True) - return None + return filesystem_models.RemoveResponse(output=f"Removed {rp}") async def mkdir(self: "DemoAdapter", resource: status_models.Resource, user: account_models.User, request_model: filesystem_models.PostMakeDirRequest) -> filesystem_models.PostMkdirResponse: rp = self.validate_path(request_model.path) @@ -786,16 +830,18 @@ async def symlink( subprocess.run(["ln", "-s", rp_src, rp_dst], check=True) return filesystem_models.PostFileSymlinkResponse(output=self._file(rp_dst)) - async def download(self: "DemoAdapter", resource: status_models.Resource, user: account_models.User, path: str) -> Any: + async def download(self: "DemoAdapter", resource: status_models.Resource, user: account_models.User, path: str) -> filesystem_models.GetFileDownloadResponse: rp = self.validate_path(path) raw_content = pathlib.Path(rp).read_bytes() if len(raw_content) > filesystem_adapter.OPS_SIZE_LIMIT: raise Exception("File to download is too large.") - return base64.b64encode(raw_content).decode("utf-8") + return filesystem_models.GetFileDownloadResponse( + output=base64.b64encode(raw_content).decode("utf-8"), + ) - async def upload(self: "DemoAdapter", resource: status_models.Resource, user: account_models.User, path: str, content: str) -> None: + async def upload(self: "DemoAdapter", resource: status_models.Resource, user: account_models.User, path: str, content: str) -> filesystem_models.PutFileUploadResponse: rp = self.validate_path(path) if isinstance(content, bytes): pathlib.Path(rp).write_bytes(content) @@ -803,6 +849,7 @@ async def upload(self: "DemoAdapter", resource: status_models.Resource, user: ac pathlib.Path(rp).write_bytes(base64.b64decode(content)) else: raise Exception(f"Don't know how to handle variable of type: {type(content)}") + return filesystem_models.PutFileUploadResponse(output=f"Uploaded to {rp}") async def compress( self: "DemoAdapter", resource: status_models.Resource, user: account_models.User, request_model: filesystem_models.PostCompressRequest @@ -835,6 +882,13 @@ async def extract(self: "DemoAdapter", resource: status_models.Resource, user: a src_rp = self.validate_path(request_model.path) dst_rp = self.validate_path(request_model.target_path) + if os.path.exists(dst_rp): + if os.path.isdir(dst_rp): + raise Exception(f"Target path already exists: {request_model.target_path}") + else: + raise Exception(f"Target path already exists and is not a directory: {request_model.target_path}") + os.makedirs(dst_rp) + args = ["tar"] if request_model.compression == filesystem_models.CompressionType.gzip: args.append("-xzf") @@ -896,7 +950,7 @@ class DemoTask(BaseModel): user: account_models.User start: float status: task_models.TaskStatus = task_models.TaskStatus.pending - result: str | None = None + result: Any | None = None class DemoTaskQueue: @@ -916,7 +970,7 @@ async def _process_tasks(da: DemoAdapter): elif t.status == task_models.TaskStatus.active and now - t.start > DEMO_QUEUE_UPDATE_SECS: cmd = task_models.TaskCommand.model_validate_json(t.task) (result, status) = await DemoAdapter.on_task(t.resource, t.user, cmd) - t.result = result + t.result = jsonable_encoder(result) t.status = status _tasks.append(t) DemoTaskQueue.tasks = _tasks diff --git a/app/routers/account/account.py b/app/routers/account/account.py index c030d63..f3b3696 100644 --- a/app/routers/account/account.py +++ b/app/routers/account/account.py @@ -136,6 +136,8 @@ async def get_project_allocation( raise HTTPException(status_code=404, detail="User not found") projects = await router.adapter.get_projects(user=user) project = next((p for p in projects if p.id == project_id), None) + if not project: + raise HTTPException(status_code=404, detail="Project not found") pas = await router.adapter.get_project_allocations(project=project, user=user) pa = next((pa for pa in pas if pa.id == project_allocation_id), None) if not pa: diff --git a/app/routers/account/models.py b/app/routers/account/models.py index 55406dc..0ba13b2 100644 --- a/app/routers/account/models.py +++ b/app/routers/account/models.py @@ -1,4 +1,6 @@ -from pydantic import Field, computed_field +import datetime + +from pydantic import Field, computed_field, field_validator from ... import config from ...types.base import IRIBaseModel @@ -23,6 +25,17 @@ class Project(IRIBaseModel): description: str user_ids: list[str] + @field_validator("last_modified", mode="before") + @classmethod + def _norm_dt_field(cls, v): + return cls.normalize_dt(v) + + last_modified: datetime.datetime + + @computed_field(description="URI to this project resource") + @property + def self_uri(self) -> str: + return f"{config.API_URL_ROOT}{config.API_PREFIX}{config.API_URL}/account/projects/{self.id}" class AllocationEntry(IRIBaseModel): """Base class for allocations.""" diff --git a/app/routers/compute/models.py b/app/routers/compute/models.py index 38eca02..968e0af 100644 --- a/app/routers/compute/models.py +++ b/app/routers/compute/models.py @@ -1,4 +1,4 @@ -from enum import IntEnum +from enum import Enum from typing import Annotated from pydantic import ConfigDict, Field, StrictBool, field_serializer @@ -84,7 +84,7 @@ class CommandResult(IRIBaseModel): result: str | None = None -class JobState(IntEnum): +class JobState(str, Enum): """ from: https://exaworks.org/psij-python/docs/v/0.9.11/_modules/psij/job_state.html#JobState @@ -93,29 +93,29 @@ class JobState(IntEnum): The possible states are: `NEW`, `QUEUED`, `ACTIVE`, `COMPLETED`, `FAILED`, and `CANCELED`. """ - NEW = 0 + NEW = "new" """ This is the state of a job immediately after the :class:`~psij.Job` object is created and before being submitted to a :class:`~psij.JobExecutor`. """ - QUEUED = 1 + QUEUED = "queued" """ This is the state of the job after being accepted by a backend for execution, but before the execution of the job begins. """ - ACTIVE = 2 + ACTIVE = "active" """This state represents an actively running job.""" - COMPLETED = 3 + COMPLETED = "completed" """ This state represents a job that has completed *successfully* (i.e., with a zero exit code). In other words, a job with the executable set to `/bin/false` cannot enter this state. """ - FAILED = 4 + FAILED = "failed" """ Represents a job that has either completed unsuccessfully (with a non-zero exit code) or a job whose handling and/or execution by the backend has failed in some way. """ - CANCELED = 5 + CANCELED = "canceled" """Represents a job that was canceled by a call to :func:`~psij.Job.cancel()`.""" @@ -126,10 +126,6 @@ class JobStatus(IRIBaseModel): exit_code: int | None = None meta_data: dict[str, object] | None = None - @field_serializer("state") - def serialize_state(self, state: JobState): - return state.name - class Job(IRIBaseModel): id: str diff --git a/app/routers/filesystem/facility_adapter.py b/app/routers/filesystem/facility_adapter.py index afa0290..bc1d74d 100644 --- a/app/routers/filesystem/facility_adapter.py +++ b/app/routers/filesystem/facility_adapter.py @@ -43,11 +43,11 @@ async def ls( pass @abstractmethod - async def head(self: "FacilityAdapter", resource: status_models.Resource, user: account_models.User, path: str, file_bytes: int, lines: int, skip_trailing: bool) -> Tuple[Any, int]: + async def head(self: "FacilityAdapter", resource: status_models.Resource, user: account_models.User, path: str, file_bytes: int, lines: int, skip_trailing: bool) -> filesystem_models.GetFileHeadResponse: pass @abstractmethod - async def tail(self: "FacilityAdapter", resource: status_models.Resource, user: account_models.User, path: str, file_bytes: int | None, lines: int | None, skip_trailing: bool) -> Tuple[Any, int]: + async def tail(self: "FacilityAdapter", resource: status_models.Resource, user: account_models.User, path: str, file_bytes: int | None, lines: int | None, skip_trailing: bool) -> filesystem_models.GetFileTailResponse: pass @abstractmethod @@ -67,7 +67,7 @@ async def stat(self: "FacilityAdapter", resource: status_models.Resource, user: pass @abstractmethod - async def rm(self: "FacilityAdapter", resource: status_models.Resource, user: account_models.User, path: str): + async def rm(self: "FacilityAdapter", resource: status_models.Resource, user: account_models.User, path: str) -> filesystem_models.RemoveResponse: pass @abstractmethod @@ -84,11 +84,11 @@ async def symlink( pass @abstractmethod - async def download(self: "FacilityAdapter", resource: status_models.Resource, user: account_models.User, path: str) -> Any: + async def download(self: "FacilityAdapter", resource: status_models.Resource, user: account_models.User, path: str) -> filesystem_models.GetFileDownloadResponse: pass @abstractmethod - async def upload(self: "FacilityAdapter", resource: status_models.Resource, user: account_models.User, path: str, content: str) -> None: + async def upload(self: "FacilityAdapter", resource: status_models.Resource, user: account_models.User, path: str, content: str) -> filesystem_models.PutFileUploadResponse: pass @abstractmethod diff --git a/app/routers/filesystem/models.py b/app/routers/filesystem/models.py index 5c2cc0a..57b0634 100644 --- a/app/routers/filesystem/models.py +++ b/app/routers/filesystem/models.py @@ -7,8 +7,7 @@ from enum import Enum from typing import Optional -from humps.camel import case -from pydantic import Field, AliasChoices, ConfigDict, BaseModel +from pydantic import Field, AliasChoices, BaseModel class CompressionType(str, Enum): @@ -23,16 +22,7 @@ class ContentUnit(str, Enum): bytes = "bytes" -class CamelModel(BaseModel): - model_config = ConfigDict( - alias_generator=case, - arbitrary_types_allowed=True, - populate_by_name=True, - validate_assignment=True, - ) - - -class File(CamelModel): +class File(BaseModel): name: str type: str link_target: Optional[str] @@ -43,19 +33,19 @@ class File(CamelModel): size: str -class FileContent(CamelModel): +class FileContent(BaseModel): content: str content_type: ContentUnit start_position: int end_position: int -class FileChecksum(CamelModel): +class FileChecksum(BaseModel): algorithm: str = "SHA-256" checksum: str -class FileStat(CamelModel): +class FileStat(BaseModel): # message: str mode: int ino: int @@ -70,49 +60,55 @@ class FileStat(CamelModel): # birthtime: int -class PatchFile(CamelModel): +class PatchFile(BaseModel): message: str new_filepath: str new_permissions: str new_owner: str -class PatchFileMetadataRequest(CamelModel): +class PatchFileMetadataRequest(BaseModel): new_filename: Optional[str] = None new_permissions: Optional[str] = None new_owner: Optional[str] = None -class GetDirectoryLsResponse(CamelModel): +class GetDirectoryLsResponse(BaseModel): output: Optional[list[File]] -class GetFileHeadResponse(CamelModel): +class GetFileHeadResponse(BaseModel): output: Optional[FileContent] + offset: Optional[int] = Field(default=0, description="Offset in bytes from the beginning of the file where to start reading the content") -class GetFileTailResponse(CamelModel): +class GetFileTailResponse(BaseModel): output: Optional[FileContent] + offset: Optional[int] = Field(default=0, description="Offset in bytes from the beginning of the file where to start reading the content") -class GetFileChecksumResponse(CamelModel): +class GetFileChecksumResponse(BaseModel): output: Optional[FileChecksum] -class GetFileTypeResponse(CamelModel): +class GetFileTypeResponse(BaseModel): output: Optional[str] = Field(example="directory") -class GetFileStatResponse(CamelModel): +class GetFileStatResponse(BaseModel): output: Optional[FileStat] -class PatchFileMetadataResponse(CamelModel): +class GetFileDownloadResponse(BaseModel): + output: Optional[str] + +class PatchFileMetadataResponse(BaseModel): output: Optional[PatchFile] -class FilesystemRequestBase(CamelModel): - path: Optional[str] = Field(validation_alias=AliasChoices("sourcePath", "source_path"), example="/home/user/dir") +class FilesystemRequestBase(BaseModel): + # Should we allow both: path and source_path? Or just one of them? + path: Optional[str] = Field(validation_alias=AliasChoices("path", "source_path"), example="/home/user/dir") class PutFileChmodRequest(FilesystemRequestBase): @@ -120,7 +116,7 @@ class PutFileChmodRequest(FilesystemRequestBase): model_config = {"json_schema_extra": {"examples": [{"path": "/home/user/dir/file.out", "mode": "777"}]}} -class PutFileChmodResponse(CamelModel): +class PutFileChmodResponse(BaseModel): output: Optional[File] @@ -140,9 +136,11 @@ class PutFileChownRequest(FilesystemRequestBase): } -class PutFileChownResponse(CamelModel): +class PutFileChownResponse(BaseModel): output: Optional[File] +class PutFileUploadResponse(BaseModel): + output: Optional[str] class PostMakeDirRequest(FilesystemRequestBase): parent: Optional[bool] = Field( @@ -153,28 +151,28 @@ class PostMakeDirRequest(FilesystemRequestBase): class PostFileSymlinkRequest(FilesystemRequestBase): - link_path: str = Field(..., description="Path to the new symlink") + link_path: str = Field(description="Path to the new symlink") model_config = {"json_schema_extra": {"examples": [{"path": "/home/user/dir", "link_path": "/home/user/newlink"}]}} -class PostFileSymlinkResponse(CamelModel): +class PostFileSymlinkResponse(BaseModel): output: Optional[File] -class GetViewFileResponse(CamelModel): +class GetViewFileResponse(BaseModel): output: Optional[str] -class PostMkdirResponse(CamelModel): +class PostMkdirResponse(BaseModel): output: Optional[File] -class PostCompressResponse(CamelModel): +class PostCompressResponse(BaseModel): output: Optional[File] class PostCompressRequest(FilesystemRequestBase): - target_path: str = Field(..., description="Path to the compressed file") + target_path: str = Field(description="Path to the compressed file") match_pattern: Optional[str] = Field(default=None, description="Regex pattern to filter files to compress") dereference: Optional[bool] = Field( default=False, @@ -188,9 +186,9 @@ class PostCompressRequest(FilesystemRequestBase): "json_schema_extra": { "examples": [ { - "sourcePath": "/home/user/dir", - "targetPath": "/home/user/file.tar.gz", - "matchPattern": "*./[ab].*\\.txt", + "source_path": "/home/user/dir", + "target_path": "/home/user/file.tar.gz", + "match_pattern": "*./[ab].*\\.txt", "dereference": "true", "compression": "none", } @@ -199,12 +197,12 @@ class PostCompressRequest(FilesystemRequestBase): } -class PostExtractResponse(CamelModel): +class PostExtractResponse(BaseModel): output: Optional[File] class PostExtractRequest(FilesystemRequestBase): - target_path: str = Field(..., description="Path to the directory where to extract the compressed file") + target_path: str = Field(description="Path to the directory where to extract the compressed file") compression: Optional[CompressionType] = Field( default="gzip", description="Defines the type of compression to be used. By default gzip is used.", @@ -213,8 +211,8 @@ class PostExtractRequest(FilesystemRequestBase): "json_schema_extra": { "examples": [ { - "sourcePath": "/home/user/dir/file.tar.gz", - "targetPath": "/home/user/dir", + "source_path": "/home/user/dir/file.tar.gz", + "target_path": "/home/user/dir", "compression": "none", } ] @@ -223,7 +221,7 @@ class PostExtractRequest(FilesystemRequestBase): class PostCopyRequest(FilesystemRequestBase): - target_path: str = Field(..., description="Target path of the copy operation") + target_path: str = Field(description="Target path of the copy operation") dereference: Optional[bool] = Field( default=False, description=("If set to `true`, it follows symbolic links and copies the files they point to instead of the links themselves."), @@ -232,8 +230,8 @@ class PostCopyRequest(FilesystemRequestBase): "json_schema_extra": { "examples": [ { - "sourcePath": "/home/user/dir/file.orig", - "targetPath": "/home/user/dir/file.new", + "source_path": "/home/user/dir/file.orig", + "target_path": "/home/user/dir/file.new", "dereference": "true", } ] @@ -241,23 +239,26 @@ class PostCopyRequest(FilesystemRequestBase): } -class PostCopyResponse(CamelModel): +class PostCopyResponse(BaseModel): output: Optional[File] class PostMoveRequest(FilesystemRequestBase): - target_path: str = Field(..., description="Target path of the move operation") + target_path: str = Field(description="Target path of the move operation") model_config = { "json_schema_extra": { "examples": [ { - "sourcePath": "/home/user/dir/file.orig", - "targetPath": "/home/user/dir/file.new", + "source_path": "/home/user/dir/file.orig", + "target_path": "/home/user/dir/file.new", } ] } } -class PostMoveResponse(CamelModel): +class PostMoveResponse(BaseModel): output: Optional[File] + +class RemoveResponse(BaseModel): + output: Optional[str] \ No newline at end of file diff --git a/app/routers/task/models.py b/app/routers/task/models.py index 9d64e3b..ba124bc 100644 --- a/app/routers/task/models.py +++ b/app/routers/task/models.py @@ -1,6 +1,8 @@ import enum +from typing import Any from pydantic import BaseModel, computed_field + from ... import config @@ -31,5 +33,5 @@ class TaskCommand(BaseModel): class Task(BaseModel): id: str status: TaskStatus = TaskStatus.pending - result: str | None = None + result: Any | None = None command: TaskCommand | None = None diff --git a/app/types/base.py b/app/types/base.py index 5052d99..c0a55dd 100644 --- a/app/types/base.py +++ b/app/types/base.py @@ -31,15 +31,6 @@ def get_extra(self, key, default=None): """Get an extra field value that is not defined in the model. Returns default if not found.""" return getattr(self, "__pydantic_extra__", {}).get(key, default) - -class NamedObject(IRIBaseModel): - """Base model for named objects.""" - - id: str = Field(..., description="The unique identifier for the object. Typically a UUID or URN.") - - def _self_path(self) -> str: - raise NotImplementedError - @classmethod def normalize_dt(cls, dt: datetime | None) -> datetime | None: """Normalize datetime to UTC-aware.""" @@ -52,6 +43,15 @@ def normalize_dt(cls, dt: datetime | None) -> datetime | None: return dt.replace(tzinfo=datetime.timezone.utc) return dt + +class NamedObject(IRIBaseModel): + """Base model for named objects.""" + + id: str = Field(..., description="The unique identifier for the object. Typically a UUID or URN.") + + def _self_path(self) -> str: + raise NotImplementedError + @field_validator("last_modified", mode="before") @classmethod def _norm_dt_field(cls, v): diff --git a/pyproject.toml b/pyproject.toml index ec0ccb3..356763e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ requires-python = ">=3.14,<3.15" dependencies = [ "fastapi[standard]>=0.128.0,<0.129.0", "uvicorn[standard]>=0.40.0,<0.41.0", - "humps>=0.2.2,<0.3.0", + "pyhumps>=3.8.0,<3.9.0", "opentelemetry-api>=1.39.1,<1.40.0", "opentelemetry-sdk>=1.39.1,<1.40.0", "opentelemetry-instrumentation-fastapi>=0.60b1,<0.61b0", diff --git a/test/test_filesystem.py b/test/test_filesystem.py new file mode 100644 index 0000000..62f04ad --- /dev/null +++ b/test/test_filesystem.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python3 +""" +IRI Filesystem API smoke test via async tasks. +""" +import os +import sys +import time +import random +import datetime as dt +import requests + + +# ========================= +# CONFIG — EDIT THESE AS NEEDED +# ========================= + +BASE_URL = "http://localhost:8000/api/v1" +TOKEN = os.environ.get("IRI_API_TOKEN", "12345") +# ========================= + +HEADERS = {"Authorization": f"Bearer {TOKEN}", "Accept": "application/json"} + +POLL_INTERVAL = 2 +TIMEOUT = 180 + + +def getAnyStorageResource(): + """Get the ID of any storage resource available in the facility by looking at the project allocations and resource capabilities.""" + projects = requests.get(f"{BASE_URL}/account/projects", headers=HEADERS, timeout=TIMEOUT).json() + caps = requests.get(f"{BASE_URL}/account/capabilities", headers=HEADERS, timeout=TIMEOUT).json() + storageCaps = {c["self_uri"] for c in caps if c["name"] == "GPFS Storage"} + if not storageCaps: + raise RuntimeError("No storage capabilities defined") + + projectStorageCaps = set() + for p in projects: + allocs = requests.get(f"{BASE_URL}/account/projects/{p['id']}/project_allocations", headers=HEADERS, timeout=TIMEOUT).json() + for a in allocs: + if a["capability_uri"] in storageCaps: + projectStorageCaps.add(a["capability_uri"]) + + if not projectStorageCaps: + raise RuntimeError("No storage allocations found in any project") + + resources = requests.get(f"{BASE_URL}/status/resources?offset=0&limit=100", headers=HEADERS, timeout=TIMEOUT).json() + matchingResources = [r["id"] for r in resources if any(cap in r["capability_uris"] for cap in projectStorageCaps)] + if not matchingResources: + raise RuntimeError("No storage resources found") + + return random.choice(matchingResources) + + +RESOURCE_ID = getAnyStorageResource() +print("Chosen storage resource ID:", RESOURCE_ID) + + + +def die(msg): + """Print error message and exit.""" + print(f"\nERROR: {msg}") + sys.exit(1) + + +def submit(method, path, **kwargs): + """Submit a task and return its ID.""" + print(f"Submitting {method} {path} with {kwargs}") + url = f"{BASE_URL}{path}" + r = requests.request(method, url, headers=HEADERS, timeout=TIMEOUT, **kwargs) + + if not r.ok: + die(f"{method} {url} failed: {r.status_code} {r.text}") + + data = r.json() + if not data.get("task_id"): + die(f"No task_id in response: {data}") + if not data.get("task_uri"): + die(f"No task_uri in response: {data}") + + return data + + +def wait_task(task): + """Wait for a task to complete and return its result.""" + deadline = time.time() + TIMEOUT + + while time.time() < deadline: + r = requests.get(task["task_uri"], headers=HEADERS, timeout=TIMEOUT) + + if not r.ok: + die(f"Task query failed: {r.status_code} {r.text}") + + t = r.json() + status = t["status"] + + print(f" Task {task['task_id']}: {status}") + + if status == "completed": + print(f" Task result: {t.get('result')}") + return t.get("result") + + if status in ("failed", "canceled"): + die(f"Task {task['task_id']} ended with status {status}: {t}") + + time.sleep(POLL_INTERVAL) + + die(f"Task {task['task_id']} timed out") + + +# ============================================================ +# Sandbox setup +# ============================================================ + +timestamp = dt.datetime.utcnow().strftime("%Y%m%d-%H%M%S") + +# NOTE/TODO: /Users/jbalcas/work/amsc/iri/iri-facility-api-python/iri_sandbox/ +# While we can use absolute paths, there is a need to return relative paths from the API +# As this directory can be mounted at different locations at different facilities +base_dir = f"iri-fs-test-{timestamp}" +file_path = f"{base_dir}/hello.txt" +copy_path = f"{base_dir}/hello_copy.txt" +moved_path = f"{base_dir}/hello_moved.txt" +link_path = f"{base_dir}/hello_link.txt" +archive_path = f"{base_dir}.tar.gz" +extract_dir = f"{base_dir}_extracted" + +content = f"hello world {timestamp}\n" + + +print("\n" + "="*40) +print("=== CREATE DIRECTORY ===") + +task = submit("POST", f"/filesystem/mkdir/{RESOURCE_ID}", json={"path": base_dir, "parent": True}) +wait_task(task) + +print("\n" + "="*40) +print("=== UPLOAD FILE ===") + +task = submit("POST", f"/filesystem/upload/{RESOURCE_ID}?path={file_path}", files={"file": ("hello.txt", content.encode())}) +wait_task(task) + +print("\n" + "="*40) +print("=== FILE TYPE ===") + +task = submit("GET", f"/filesystem/file/{RESOURCE_ID}", params={"path": file_path}) +wait_task(task) + +print("\n" + "="*40) +print("=== STAT ===") + +task = submit("GET", f"/filesystem/stat/{RESOURCE_ID}", params={"path": file_path}) +wait_task(task) + +print("\n" + "="*40) +print("=== LS ===") + +task = submit("GET", f"/filesystem/ls/{RESOURCE_ID}", params={"path": base_dir}) +wait_task(task) + +print("\n" + "="*40) +print("=== CHMOD ===") + +task = submit("PUT", f"/filesystem/chmod/{RESOURCE_ID}", json={"path": file_path, "mode": "644"}) +wait_task(task) + +print("\n" + "="*40) +print("=== HEAD ===") + +task = submit("GET", f"/filesystem/head/{RESOURCE_ID}", params={"path": file_path, "lines": 1}) +wait_task(task) + +print("\n" + "="*40) +print("=== TAIL ===") + +task = submit("GET", f"/filesystem/tail/{RESOURCE_ID}", params={"path": file_path, "lines": 1}) +wait_task(task) + +print("\n" + "="*40) +print("=== VIEW ===") + +task = submit("GET", f"/filesystem/view/{RESOURCE_ID}", params={"path": file_path, "size": 4096, "offset": 0}) +wait_task(task) + +print("\n" + "="*40) +print("=== CHECKSUM ===") + +task = submit("GET", f"/filesystem/checksum/{RESOURCE_ID}", params={"path": file_path}) +wait_task(task) + +print("\n" + "="*40) +print("=== COPY FILE ===") + +# Keep this as source_path. Server accepts both, so making sure it works. +task = submit("POST", f"/filesystem/cp/{RESOURCE_ID}", json={"source_path": file_path, "target_path": copy_path}) +wait_task(task) + +print("\n" + "="*40) +print("=== MOVE FILE ===") + +task = submit("POST", f"/filesystem/mv/{RESOURCE_ID}", json={"source_path": copy_path, "target_path": moved_path}) +wait_task(task) + +print("\n" + "="*40) +print("=== CREATE SYMLINK ===") + +task = submit("POST", f"/filesystem/symlink/{RESOURCE_ID}", json={"path": moved_path, "link_path": link_path}) +wait_task(task) + +print("\n" + "="*40) +print("=== COMPRESS DIRECTORY ===") + +task = submit("POST", f"/filesystem/compress/{RESOURCE_ID}", json={"source_path": base_dir, "target_path": archive_path, "compression": "gzip"}) +wait_task(task) + +print("\n" + "="*40) +print("=== EXTRACT ARCHIVE ===") + +task = submit("POST", f"/filesystem/extract/{RESOURCE_ID}", json={"source_path": archive_path, "target_path": extract_dir, "compression": "gzip"}) +wait_task(task) + +print("\n" + "="*40) +print("=== DOWNLOAD FILE ===") + +task = submit("GET", f"/filesystem/download/{RESOURCE_ID}", params={"path": moved_path}) +wait_task(task) + +print("\n" + "="*40) +print("=== CLEANUP ===") + +for p in [base_dir, archive_path, extract_dir]: + task = submit("DELETE", f"/filesystem/rm/{RESOURCE_ID}", params={"path": p}) + wait_task(task) + +print("\n" + "="*40) +print("ALL FILESYSTEM TESTS COMPLETED SUCCESSFULLY") +print("="*40)