Coverage for cli / files.py: 91%
219 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-30 21:47 +0000
1"""
2File system operations for GCO CLI.
4Provides functionality to interact with EFS and FSx for Lustre file systems
5attached to GCO regional stacks.
6"""
8from dataclasses import dataclass
9from datetime import UTC, datetime
10from typing import Any
12import boto3
13from botocore.exceptions import ClientError
15from .aws_client import get_aws_client
16from .config import GCOConfig, get_config
17from .kubectl_helpers import update_kubeconfig
20@dataclass
21class FileSystemInfo:
22 """Information about a file system."""
24 file_system_id: str
25 file_system_type: str # "efs" or "fsx"
26 region: str
27 dns_name: str
28 mount_target_ip: str | None = None
29 size_bytes: int | None = None
30 status: str = "available"
31 created_time: datetime | None = None
32 tags: dict[str, str] | None = None
34 def __post_init__(self) -> None:
35 if self.tags is None:
36 self.tags = {}
39@dataclass
40class FileInfo:
41 """Information about a file or directory."""
43 path: str
44 name: str
45 is_directory: bool
46 size_bytes: int = 0
47 modified_time: datetime | None = None
48 owner: str | None = None
51class FileSystemClient:
52 """
53 Client for interacting with GCO file systems.
55 Supports:
56 - Listing file systems (EFS/FSx) in GCO stacks
57 - Getting file system information and access points
58 - Downloading files from pods via kubectl cp
59 """
61 def __init__(self, config: GCOConfig | None = None):
62 self.config = config or get_config()
63 self._session = boto3.Session()
64 self._aws_client = get_aws_client(config)
66 def get_file_systems(self, region: str | None = None) -> list[FileSystemInfo]:
67 """
68 Get all file systems associated with GCO stacks.
70 Args:
71 region: Specific region to query (None for all regions)
73 Returns:
74 List of FileSystemInfo objects
75 """
76 file_systems = []
78 # Get regional stacks
79 stacks = self._aws_client.discover_regional_stacks()
81 if region:
82 stacks = {k: v for k, v in stacks.items() if k == region}
84 for stack_region, stack in stacks.items():
85 # Get EFS file systems
86 if stack.efs_file_system_id: 86 ↛ 92line 86 didn't jump to line 92 because the condition on line 86 was always true
87 efs_info = self._get_efs_info(stack.efs_file_system_id, stack_region)
88 if efs_info: 88 ↛ 92line 88 didn't jump to line 92 because the condition on line 88 was always true
89 file_systems.append(efs_info)
91 # Get FSx file systems
92 if stack.fsx_file_system_id:
93 fsx_info = self._get_fsx_info(stack.fsx_file_system_id, stack_region)
94 if fsx_info: 94 ↛ 84line 94 didn't jump to line 84 because the condition on line 94 was always true
95 file_systems.append(fsx_info)
97 return file_systems
99 def _get_efs_info(self, file_system_id: str, region: str) -> FileSystemInfo | None:
100 """Get information about an EFS file system."""
101 try:
102 efs = self._session.client("efs", region_name=region)
104 response = efs.describe_file_systems(FileSystemId=file_system_id)
105 if not response["FileSystems"]:
106 return None
108 fs = response["FileSystems"][0]
110 # Get mount targets for DNS name
111 mt_response = efs.describe_mount_targets(FileSystemId=file_system_id)
112 mount_target_ip = None
113 if mt_response["MountTargets"]:
114 mount_target_ip = mt_response["MountTargets"][0].get("IpAddress")
116 # Get tags
117 tags_response = efs.describe_tags(FileSystemId=file_system_id)
118 tags = {t["Key"]: t["Value"] for t in tags_response.get("Tags", [])}
120 return FileSystemInfo(
121 file_system_id=file_system_id,
122 file_system_type="efs",
123 region=region,
124 dns_name=f"{file_system_id}.efs.{region}.amazonaws.com",
125 mount_target_ip=mount_target_ip,
126 size_bytes=fs.get("SizeInBytes", {}).get("Value"),
127 status=fs["LifeCycleState"],
128 created_time=fs.get("CreationTime"),
129 tags=tags,
130 )
131 except ClientError:
132 return None
134 def _get_fsx_info(self, file_system_id: str, region: str) -> FileSystemInfo | None:
135 """Get information about an FSx for Lustre file system."""
136 try:
137 fsx = self._session.client("fsx", region_name=region)
139 response = fsx.describe_file_systems(FileSystemIds=[file_system_id])
140 if not response["FileSystems"]:
141 return None
143 fs = response["FileSystems"][0]
145 # Get DNS name from Lustre configuration
146 dns_name = fs.get("DNSName", "")
148 # Get tags
149 tags = {t["Key"]: t["Value"] for t in fs.get("Tags", [])}
151 return FileSystemInfo(
152 file_system_id=file_system_id,
153 file_system_type="fsx",
154 region=region,
155 dns_name=dns_name,
156 size_bytes=fs.get("StorageCapacity", 0) * 1024 * 1024 * 1024, # GB to bytes
157 status=fs["Lifecycle"],
158 created_time=fs.get("CreationTime"),
159 tags=tags,
160 )
161 except ClientError:
162 return None
164 def get_file_system_by_region(self, region: str, fs_type: str = "efs") -> FileSystemInfo | None:
165 """
166 Get file system for a specific region.
168 Args:
169 region: AWS region
170 fs_type: "efs" or "fsx"
172 Returns:
173 FileSystemInfo or None
174 """
175 file_systems = self.get_file_systems(region)
176 for fs in file_systems:
177 if fs.file_system_type == fs_type: 177 ↛ 176line 177 didn't jump to line 176 because the condition on line 177 was always true
178 return fs
179 return None
181 def create_datasync_download_task(
182 self,
183 file_system_id: str,
184 region: str,
185 source_path: str,
186 destination_bucket: str,
187 destination_prefix: str = "",
188 ) -> str:
189 """
190 Create a DataSync task to download files from EFS/FSx to S3.
192 This is useful for downloading large amounts of data from file systems
193 that aren't directly accessible.
195 Args:
196 file_system_id: EFS or FSx file system ID
197 region: AWS region
198 source_path: Path within the file system
199 destination_bucket: S3 bucket name
200 destination_prefix: S3 key prefix
202 Returns:
203 DataSync task ARN
204 """
205 datasync = self._session.client("datasync", region_name=region)
207 # Determine file system type
208 fs_info = None
209 for fs in self.get_file_systems(region):
210 if fs.file_system_id == file_system_id: 210 ↛ 209line 210 didn't jump to line 209 because the condition on line 210 was always true
211 fs_info = fs
212 break
214 if not fs_info:
215 raise ValueError(f"File system {file_system_id} not found in region {region}")
217 # Create source location
218 if fs_info.file_system_type == "efs":
219 source_location = datasync.create_location_efs(
220 EfsFilesystemArn=f"arn:aws:elasticfilesystem:{region}:{self._get_account_id()}:file-system/{file_system_id}",
221 Subdirectory=source_path,
222 Ec2Config={
223 "SubnetArn": self._get_subnet_arn(region),
224 "SecurityGroupArns": [self._get_security_group_arn(region)],
225 },
226 )
227 source_arn = source_location["LocationArn"]
228 else:
229 source_location = datasync.create_location_fsx_lustre(
230 FsxFilesystemArn=f"arn:aws:fsx:{region}:{self._get_account_id()}:file-system/{file_system_id}",
231 Subdirectory=source_path,
232 SecurityGroupArns=[self._get_security_group_arn(region)],
233 )
234 source_arn = source_location["LocationArn"]
236 # Create destination location (S3)
237 dest_location = datasync.create_location_s3(
238 S3BucketArn=f"arn:aws:s3:::{destination_bucket}",
239 Subdirectory=destination_prefix,
240 S3Config={"BucketAccessRoleArn": self._get_datasync_role_arn(region)},
241 )
242 dest_arn = dest_location["LocationArn"]
244 # Create task
245 task = datasync.create_task(
246 SourceLocationArn=source_arn,
247 DestinationLocationArn=dest_arn,
248 Name=f"gco-download-{datetime.now(UTC).strftime('%Y%m%d-%H%M%S')}",
249 Options={
250 "VerifyMode": "ONLY_FILES_TRANSFERRED",
251 "OverwriteMode": "ALWAYS",
252 "PreserveDeletedFiles": "REMOVE",
253 "TransferMode": "CHANGED",
254 },
255 )
257 return str(task["TaskArn"])
259 def _get_account_id(self) -> str:
260 """Get current AWS account ID."""
261 sts = self._session.client("sts")
262 return str(sts.get_caller_identity()["Account"])
264 def _get_subnet_arn(self, _region: str) -> str:
265 """Get a subnet ARN for DataSync in the given region."""
266 # This would need to be implemented based on your VPC setup
267 # For now, return a placeholder
268 raise NotImplementedError("Subnet ARN lookup not implemented - configure via stack outputs")
270 def _get_security_group_arn(self, _region: str) -> str:
271 """Get a security group ARN for DataSync in the given region."""
272 raise NotImplementedError(
273 "Security group ARN lookup not implemented - configure via stack outputs"
274 )
276 def _get_datasync_role_arn(self, region: str) -> str:
277 """Get the DataSync IAM role ARN."""
278 raise NotImplementedError(
279 "DataSync role ARN lookup not implemented - configure via stack outputs"
280 )
282 def get_access_point_info(self, file_system_id: str, region: str) -> list[dict[str, Any]]:
283 """
284 Get EFS access points for a file system.
286 Args:
287 file_system_id: EFS file system ID
288 region: AWS region
290 Returns:
291 List of access point information
292 """
293 try:
294 efs = self._session.client("efs", region_name=region)
295 response = efs.describe_access_points(FileSystemId=file_system_id)
297 return [
298 {
299 "access_point_id": ap["AccessPointId"],
300 "name": ap.get("Name", ""),
301 "path": ap.get("RootDirectory", {}).get("Path", "/"),
302 "posix_user": ap.get("PosixUser", {}),
303 "status": ap["LifeCycleState"],
304 }
305 for ap in response.get("AccessPoints", [])
306 ]
307 except ClientError:
308 return []
310 def download_from_pod(
311 self,
312 region: str,
313 pod_name: str,
314 remote_path: str,
315 local_path: str,
316 namespace: str = "gco-jobs",
317 container: str | None = None,
318 ) -> dict[str, Any]:
319 """
320 Download files from a pod using kubectl cp.
322 This uses kubectl port-forward internally to copy files from a pod's
323 mounted file system (EFS/FSx) to the local machine.
325 Args:
326 region: AWS region where the cluster is located
327 pod_name: Name of the pod to copy from
328 remote_path: Path inside the pod (e.g., /mnt/efs/outputs)
329 local_path: Local destination path
330 namespace: Kubernetes namespace (default: gco-jobs)
331 container: Container name (optional, for multi-container pods)
333 Returns:
334 Dict with download status and details
335 """
336 import os
337 import subprocess
339 # Update kubeconfig for the cluster
340 cluster_name = f"gco-{region}"
341 update_kubeconfig(cluster_name, region)
343 # Build kubectl cp command
344 # Format: kubectl cp <namespace>/<pod>:<remote_path> <local_path>
345 source = f"{namespace}/{pod_name}:{remote_path}"
346 cmd = ["kubectl", "cp", source, local_path]
348 if container:
349 cmd.extend(["-c", container])
351 try:
352 subprocess.run(
353 cmd, check=True, capture_output=True, text=True
354 ) # nosemgrep: dangerous-subprocess-use-audit - cmd is a list ["kubectl","cp",source,local_path]; source is namespace/pod:path, local_path is caller-provided destination
355 if os.path.isfile(local_path):
356 size = os.path.getsize(local_path)
357 elif os.path.isdir(local_path): 357 ↛ 364line 357 didn't jump to line 364 because the condition on line 357 was always true
358 size = sum(
359 os.path.getsize(os.path.join(dirpath, filename))
360 for dirpath, _, filenames in os.walk(local_path)
361 for filename in filenames
362 )
363 else:
364 size = 0
366 return {
367 "status": "success",
368 "source": source,
369 "destination": local_path,
370 "size_bytes": size,
371 "message": "Download completed successfully",
372 }
374 except subprocess.CalledProcessError as e:
375 raise RuntimeError(f"kubectl cp failed: {e.stderr}") from e
376 except FileNotFoundError as e:
377 raise RuntimeError(
378 "kubectl not found. Please install kubectl and ensure it's in your PATH."
379 ) from e
381 def list_storage_contents(
382 self,
383 region: str,
384 remote_path: str = "/",
385 storage_type: str = "efs",
386 namespace: str = "gco-jobs",
387 pvc_name: str | None = None,
388 ) -> dict[str, Any]:
389 """
390 List contents of EFS/FSx storage using a temporary helper pod.
392 This creates a temporary pod that mounts the storage, lists contents,
393 then cleans up. Useful for discovering what directories/files exist.
395 Args:
396 region: AWS region where the cluster is located
397 remote_path: Path inside the storage to list (default: root)
398 storage_type: "efs" or "fsx" (default: efs)
399 namespace: Kubernetes namespace (default: gco-jobs)
400 pvc_name: PVC name to mount (default: gco-shared-storage for EFS,
401 gco-fsx-storage for FSx)
403 Returns:
404 Dict with listing status and contents
405 """
406 import subprocess
407 import time
408 import uuid
410 # Determine PVC name based on storage type
411 if pvc_name is None: 411 ↛ 415line 411 didn't jump to line 415 because the condition on line 411 was always true
412 pvc_name = "gco-shared-storage" if storage_type == "efs" else "gco-fsx-storage"
414 # Determine mount path based on storage type
415 mount_path = "/efs" if storage_type == "efs" else "/fsx"
417 # Generate unique pod name
418 helper_pod_name = f"gco-list-helper-{uuid.uuid4().hex[:8]}"
420 # Update kubeconfig for the cluster
421 cluster_name = f"gco-{region}"
422 update_kubeconfig(cluster_name, region)
424 # Create helper pod manifest.
425 #
426 # We set an explicit ``resources`` block with CPU + memory but no GPU
427 # so the gco-jobs LimitRange admission plugin does not substitute its
428 # ``max`` value as an implicit request. Without this, a namespace
429 # that already has all 32 GPUs in use (typical during a demo or
430 # heavy workload burst) would reject the helper pod — even though
431 # listing files doesn't need a GPU — because K8s quota admission
432 # would attribute the LimitRange's ``max.nvidia.com/gpu`` to the
433 # pod's request.
434 pod_manifest = f"""
435apiVersion: v1
436kind: Pod
437metadata:
438 name: {helper_pod_name}
439 namespace: {namespace}
440 labels:
441 app: gco-list-helper
442spec:
443 restartPolicy: Never
444 containers:
445 - name: helper
446 image: busybox:1.37.0
447 command: ["sleep", "300"]
448 resources:
449 requests:
450 cpu: "50m"
451 memory: "64Mi"
452 limits:
453 cpu: "200m"
454 memory: "256Mi"
455 volumeMounts:
456 - name: storage
457 mountPath: {mount_path}
458 volumes:
459 - name: storage
460 persistentVolumeClaim:
461 claimName: {pvc_name}
462"""
464 try:
465 # Create the helper pod
466 subprocess.run(
467 ["kubectl", "apply", "-f", "-"],
468 input=pod_manifest,
469 capture_output=True,
470 text=True,
471 check=True,
472 )
474 # Wait for pod to be ready
475 max_wait = 60
476 waited = 0
477 while waited < max_wait: 477 ↛ 497line 477 didn't jump to line 497 because the condition on line 477 was always true
478 status_result = subprocess.run(
479 [
480 "kubectl",
481 "get",
482 "pod",
483 helper_pod_name,
484 "-n",
485 namespace,
486 "-o",
487 "jsonpath={.status.phase}",
488 ],
489 capture_output=True,
490 text=True,
491 )
492 if status_result.stdout.strip() == "Running": 492 ↛ 494line 492 didn't jump to line 494 because the condition on line 492 was always true
493 break
494 time.sleep(2) # nosemgrep: arbitrary-sleep
495 waited += 2
497 if waited >= max_wait: 497 ↛ 498line 497 didn't jump to line 498 because the condition on line 497 was never true
498 raise RuntimeError("Helper pod did not become ready in time")
500 # Build the full path inside the pod
501 full_remote_path = f"{mount_path}/{remote_path.lstrip('/')}"
503 # List contents using kubectl exec
504 list_result = subprocess.run(
505 [
506 "kubectl",
507 "exec",
508 helper_pod_name,
509 "-n",
510 namespace,
511 "--",
512 "ls",
513 "-la",
514 full_remote_path,
515 ],
516 capture_output=True,
517 text=True,
518 )
520 if list_result.returncode != 0:
521 return {
522 "status": "error",
523 "path": remote_path,
524 "storage_type": storage_type,
525 "contents": [],
526 "message": f"Path not found or empty: {list_result.stderr.strip()}",
527 }
529 # Parse ls output
530 contents = []
531 for line in list_result.stdout.strip().split("\n"):
532 if line.startswith("total") or not line.strip():
533 continue
534 parts = line.split()
535 if len(parts) >= 9: 535 ↛ 531line 535 didn't jump to line 531 because the condition on line 535 was always true
536 name = " ".join(parts[8:])
537 is_dir = line.startswith("d")
538 size = int(parts[4]) if parts[4].isdigit() else 0
539 contents.append(
540 {
541 "name": name,
542 "is_directory": is_dir,
543 "size_bytes": size,
544 "permissions": parts[0],
545 }
546 )
548 return {
549 "status": "success",
550 "path": remote_path,
551 "storage_type": storage_type,
552 "contents": contents,
553 "message": f"Found {len(contents)} items",
554 }
556 except subprocess.CalledProcessError as e:
557 error_msg = e.stderr if e.stderr else str(e)
558 raise RuntimeError(f"List failed: {error_msg}") from e
559 except FileNotFoundError as e:
560 raise RuntimeError(
561 "kubectl not found. Please install kubectl and ensure it's in your PATH."
562 ) from e
563 finally:
564 # Always clean up the helper pod
565 import contextlib
567 with contextlib.suppress(Exception):
568 subprocess.run(
569 [
570 "kubectl",
571 "delete",
572 "pod",
573 helper_pod_name,
574 "-n",
575 namespace,
576 "--ignore-not-found",
577 ],
578 capture_output=True,
579 text=True,
580 )
582 def download_from_storage(
583 self,
584 region: str,
585 remote_path: str,
586 local_path: str,
587 storage_type: str = "efs",
588 namespace: str = "gco-jobs",
589 pvc_name: str | None = None,
590 ) -> dict[str, Any]:
591 """
592 Download files from EFS/FSx storage using a temporary helper pod.
594 This creates a temporary pod that mounts the storage, copies files via
595 kubectl cp, then cleans up. Works even after the original job pod is gone.
597 Args:
598 region: AWS region where the cluster is located
599 remote_path: Path inside the storage (e.g., /efs-output-example/results.json)
600 local_path: Local destination path
601 storage_type: "efs" or "fsx" (default: efs)
602 namespace: Kubernetes namespace (default: gco-jobs)
603 pvc_name: PVC name to mount (default: gco-shared-storage for EFS,
604 gco-fsx-storage for FSx)
606 Returns:
607 Dict with download status and details
608 """
609 import os
610 import subprocess
611 import time
612 import uuid
614 # Determine PVC name based on storage type
615 if pvc_name is None:
616 pvc_name = "gco-shared-storage" if storage_type == "efs" else "gco-fsx-storage"
618 # Determine mount path based on storage type
619 mount_path = "/efs" if storage_type == "efs" else "/fsx"
621 # Generate unique pod name
622 helper_pod_name = f"gco-download-helper-{uuid.uuid4().hex[:8]}"
624 # Update kubeconfig for the cluster
625 cluster_name = f"gco-{region}"
626 update_kubeconfig(cluster_name, region)
628 # Create helper pod manifest.
629 #
630 # Explicit ``resources`` block avoids the LimitRange admission plugin
631 # substituting ``max.nvidia.com/gpu`` as an implicit request — see the
632 # ``ls`` helper above for the full rationale.
633 pod_manifest = f"""
634apiVersion: v1
635kind: Pod
636metadata:
637 name: {helper_pod_name}
638 namespace: {namespace}
639 labels:
640 app: gco-download-helper
641spec:
642 restartPolicy: Never
643 containers:
644 - name: helper
645 image: busybox:1.37.0
646 command: ["sleep", "300"]
647 resources:
648 requests:
649 cpu: "50m"
650 memory: "64Mi"
651 limits:
652 cpu: "200m"
653 memory: "256Mi"
654 volumeMounts:
655 - name: storage
656 mountPath: {mount_path}
657 volumes:
658 - name: storage
659 persistentVolumeClaim:
660 claimName: {pvc_name}
661"""
663 try:
664 # Create the helper pod
665 subprocess.run(
666 ["kubectl", "apply", "-f", "-"],
667 input=pod_manifest,
668 capture_output=True,
669 text=True,
670 check=True,
671 )
673 # Wait for pod to be ready
674 max_wait = 60
675 waited = 0
676 while waited < max_wait:
677 status_result = subprocess.run(
678 [
679 "kubectl",
680 "get",
681 "pod",
682 helper_pod_name,
683 "-n",
684 namespace,
685 "-o",
686 "jsonpath={.status.phase}",
687 ],
688 capture_output=True,
689 text=True,
690 )
691 if status_result.stdout.strip() == "Running":
692 break
693 time.sleep(2) # nosemgrep: arbitrary-sleep
694 waited += 2
696 if waited >= max_wait:
697 raise RuntimeError("Helper pod did not become ready in time")
699 # Build the full path inside the pod
700 full_remote_path = f"{mount_path}/{remote_path.lstrip('/')}"
702 # Copy files from the helper pod
703 source = f"{namespace}/{helper_pod_name}:{full_remote_path}"
704 cmd = ["kubectl", "cp", source, local_path]
706 subprocess.run(
707 cmd, check=True, capture_output=True, text=True
708 ) # nosemgrep: dangerous-subprocess-use-audit - cmd is a list ["kubectl","cp",source,local_path]; source is namespace/pod:path, local_path is caller-provided destination
710 # Get file info
711 if os.path.isfile(local_path): 711 ↛ 713line 711 didn't jump to line 713 because the condition on line 711 was always true
712 size = os.path.getsize(local_path)
713 elif os.path.isdir(local_path):
714 size = sum(
715 os.path.getsize(os.path.join(dirpath, filename))
716 for dirpath, _, filenames in os.walk(local_path)
717 for filename in filenames
718 )
719 else:
720 size = 0
722 return {
723 "status": "success",
724 "source": f"{storage_type}:{remote_path}",
725 "destination": local_path,
726 "size_bytes": size,
727 "storage_type": storage_type,
728 "message": "Download completed successfully",
729 }
731 except subprocess.CalledProcessError as e:
732 error_msg = e.stderr if e.stderr else str(e)
733 raise RuntimeError(f"Download failed: {error_msg}") from e
734 except FileNotFoundError as e:
735 raise RuntimeError(
736 "kubectl not found. Please install kubectl and ensure it's in your PATH."
737 ) from e
738 finally:
739 # Always clean up the helper pod
740 import contextlib
742 with contextlib.suppress(Exception):
743 subprocess.run(
744 [
745 "kubectl",
746 "delete",
747 "pod",
748 helper_pod_name,
749 "-n",
750 namespace,
751 "--ignore-not-found",
752 ],
753 capture_output=True,
754 text=True,
755 )
758def get_file_system_client(config: GCOConfig | None = None) -> FileSystemClient:
759 """Get a configured file system client instance."""
760 return FileSystemClient(config)