Coverage for cli / files.py: 91%

219 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 21:47 +0000

1""" 

2File system operations for GCO CLI. 

3 

4Provides functionality to interact with EFS and FSx for Lustre file systems 

5attached to GCO regional stacks. 

6""" 

7 

8from dataclasses import dataclass 

9from datetime import UTC, datetime 

10from typing import Any 

11 

12import boto3 

13from botocore.exceptions import ClientError 

14 

15from .aws_client import get_aws_client 

16from .config import GCOConfig, get_config 

17from .kubectl_helpers import update_kubeconfig 

18 

19 

20@dataclass 

21class FileSystemInfo: 

22 """Information about a file system.""" 

23 

24 file_system_id: str 

25 file_system_type: str # "efs" or "fsx" 

26 region: str 

27 dns_name: str 

28 mount_target_ip: str | None = None 

29 size_bytes: int | None = None 

30 status: str = "available" 

31 created_time: datetime | None = None 

32 tags: dict[str, str] | None = None 

33 

34 def __post_init__(self) -> None: 

35 if self.tags is None: 

36 self.tags = {} 

37 

38 

39@dataclass 

40class FileInfo: 

41 """Information about a file or directory.""" 

42 

43 path: str 

44 name: str 

45 is_directory: bool 

46 size_bytes: int = 0 

47 modified_time: datetime | None = None 

48 owner: str | None = None 

49 

50 

51class FileSystemClient: 

52 """ 

53 Client for interacting with GCO file systems. 

54 

55 Supports: 

56 - Listing file systems (EFS/FSx) in GCO stacks 

57 - Getting file system information and access points 

58 - Downloading files from pods via kubectl cp 

59 """ 

60 

61 def __init__(self, config: GCOConfig | None = None): 

62 self.config = config or get_config() 

63 self._session = boto3.Session() 

64 self._aws_client = get_aws_client(config) 

65 

66 def get_file_systems(self, region: str | None = None) -> list[FileSystemInfo]: 

67 """ 

68 Get all file systems associated with GCO stacks. 

69 

70 Args: 

71 region: Specific region to query (None for all regions) 

72 

73 Returns: 

74 List of FileSystemInfo objects 

75 """ 

76 file_systems = [] 

77 

78 # Get regional stacks 

79 stacks = self._aws_client.discover_regional_stacks() 

80 

81 if region: 

82 stacks = {k: v for k, v in stacks.items() if k == region} 

83 

84 for stack_region, stack in stacks.items(): 

85 # Get EFS file systems 

86 if stack.efs_file_system_id: 86 ↛ 92line 86 didn't jump to line 92 because the condition on line 86 was always true

87 efs_info = self._get_efs_info(stack.efs_file_system_id, stack_region) 

88 if efs_info: 88 ↛ 92line 88 didn't jump to line 92 because the condition on line 88 was always true

89 file_systems.append(efs_info) 

90 

91 # Get FSx file systems 

92 if stack.fsx_file_system_id: 

93 fsx_info = self._get_fsx_info(stack.fsx_file_system_id, stack_region) 

94 if fsx_info: 94 ↛ 84line 94 didn't jump to line 84 because the condition on line 94 was always true

95 file_systems.append(fsx_info) 

96 

97 return file_systems 

98 

99 def _get_efs_info(self, file_system_id: str, region: str) -> FileSystemInfo | None: 

100 """Get information about an EFS file system.""" 

101 try: 

102 efs = self._session.client("efs", region_name=region) 

103 

104 response = efs.describe_file_systems(FileSystemId=file_system_id) 

105 if not response["FileSystems"]: 

106 return None 

107 

108 fs = response["FileSystems"][0] 

109 

110 # Get mount targets for DNS name 

111 mt_response = efs.describe_mount_targets(FileSystemId=file_system_id) 

112 mount_target_ip = None 

113 if mt_response["MountTargets"]: 

114 mount_target_ip = mt_response["MountTargets"][0].get("IpAddress") 

115 

116 # Get tags 

117 tags_response = efs.describe_tags(FileSystemId=file_system_id) 

118 tags = {t["Key"]: t["Value"] for t in tags_response.get("Tags", [])} 

119 

120 return FileSystemInfo( 

121 file_system_id=file_system_id, 

122 file_system_type="efs", 

123 region=region, 

124 dns_name=f"{file_system_id}.efs.{region}.amazonaws.com", 

125 mount_target_ip=mount_target_ip, 

126 size_bytes=fs.get("SizeInBytes", {}).get("Value"), 

127 status=fs["LifeCycleState"], 

128 created_time=fs.get("CreationTime"), 

129 tags=tags, 

130 ) 

131 except ClientError: 

132 return None 

133 

134 def _get_fsx_info(self, file_system_id: str, region: str) -> FileSystemInfo | None: 

135 """Get information about an FSx for Lustre file system.""" 

136 try: 

137 fsx = self._session.client("fsx", region_name=region) 

138 

139 response = fsx.describe_file_systems(FileSystemIds=[file_system_id]) 

140 if not response["FileSystems"]: 

141 return None 

142 

143 fs = response["FileSystems"][0] 

144 

145 # Get DNS name from Lustre configuration 

146 dns_name = fs.get("DNSName", "") 

147 

148 # Get tags 

149 tags = {t["Key"]: t["Value"] for t in fs.get("Tags", [])} 

150 

151 return FileSystemInfo( 

152 file_system_id=file_system_id, 

153 file_system_type="fsx", 

154 region=region, 

155 dns_name=dns_name, 

156 size_bytes=fs.get("StorageCapacity", 0) * 1024 * 1024 * 1024, # GB to bytes 

157 status=fs["Lifecycle"], 

158 created_time=fs.get("CreationTime"), 

159 tags=tags, 

160 ) 

161 except ClientError: 

162 return None 

163 

164 def get_file_system_by_region(self, region: str, fs_type: str = "efs") -> FileSystemInfo | None: 

165 """ 

166 Get file system for a specific region. 

167 

168 Args: 

169 region: AWS region 

170 fs_type: "efs" or "fsx" 

171 

172 Returns: 

173 FileSystemInfo or None 

174 """ 

175 file_systems = self.get_file_systems(region) 

176 for fs in file_systems: 

177 if fs.file_system_type == fs_type: 177 ↛ 176line 177 didn't jump to line 176 because the condition on line 177 was always true

178 return fs 

179 return None 

180 

181 def create_datasync_download_task( 

182 self, 

183 file_system_id: str, 

184 region: str, 

185 source_path: str, 

186 destination_bucket: str, 

187 destination_prefix: str = "", 

188 ) -> str: 

189 """ 

190 Create a DataSync task to download files from EFS/FSx to S3. 

191 

192 This is useful for downloading large amounts of data from file systems 

193 that aren't directly accessible. 

194 

195 Args: 

196 file_system_id: EFS or FSx file system ID 

197 region: AWS region 

198 source_path: Path within the file system 

199 destination_bucket: S3 bucket name 

200 destination_prefix: S3 key prefix 

201 

202 Returns: 

203 DataSync task ARN 

204 """ 

205 datasync = self._session.client("datasync", region_name=region) 

206 

207 # Determine file system type 

208 fs_info = None 

209 for fs in self.get_file_systems(region): 

210 if fs.file_system_id == file_system_id: 210 ↛ 209line 210 didn't jump to line 209 because the condition on line 210 was always true

211 fs_info = fs 

212 break 

213 

214 if not fs_info: 

215 raise ValueError(f"File system {file_system_id} not found in region {region}") 

216 

217 # Create source location 

218 if fs_info.file_system_type == "efs": 

219 source_location = datasync.create_location_efs( 

220 EfsFilesystemArn=f"arn:aws:elasticfilesystem:{region}:{self._get_account_id()}:file-system/{file_system_id}", 

221 Subdirectory=source_path, 

222 Ec2Config={ 

223 "SubnetArn": self._get_subnet_arn(region), 

224 "SecurityGroupArns": [self._get_security_group_arn(region)], 

225 }, 

226 ) 

227 source_arn = source_location["LocationArn"] 

228 else: 

229 source_location = datasync.create_location_fsx_lustre( 

230 FsxFilesystemArn=f"arn:aws:fsx:{region}:{self._get_account_id()}:file-system/{file_system_id}", 

231 Subdirectory=source_path, 

232 SecurityGroupArns=[self._get_security_group_arn(region)], 

233 ) 

234 source_arn = source_location["LocationArn"] 

235 

236 # Create destination location (S3) 

237 dest_location = datasync.create_location_s3( 

238 S3BucketArn=f"arn:aws:s3:::{destination_bucket}", 

239 Subdirectory=destination_prefix, 

240 S3Config={"BucketAccessRoleArn": self._get_datasync_role_arn(region)}, 

241 ) 

242 dest_arn = dest_location["LocationArn"] 

243 

244 # Create task 

245 task = datasync.create_task( 

246 SourceLocationArn=source_arn, 

247 DestinationLocationArn=dest_arn, 

248 Name=f"gco-download-{datetime.now(UTC).strftime('%Y%m%d-%H%M%S')}", 

249 Options={ 

250 "VerifyMode": "ONLY_FILES_TRANSFERRED", 

251 "OverwriteMode": "ALWAYS", 

252 "PreserveDeletedFiles": "REMOVE", 

253 "TransferMode": "CHANGED", 

254 }, 

255 ) 

256 

257 return str(task["TaskArn"]) 

258 

259 def _get_account_id(self) -> str: 

260 """Get current AWS account ID.""" 

261 sts = self._session.client("sts") 

262 return str(sts.get_caller_identity()["Account"]) 

263 

264 def _get_subnet_arn(self, _region: str) -> str: 

265 """Get a subnet ARN for DataSync in the given region.""" 

266 # This would need to be implemented based on your VPC setup 

267 # For now, return a placeholder 

268 raise NotImplementedError("Subnet ARN lookup not implemented - configure via stack outputs") 

269 

270 def _get_security_group_arn(self, _region: str) -> str: 

271 """Get a security group ARN for DataSync in the given region.""" 

272 raise NotImplementedError( 

273 "Security group ARN lookup not implemented - configure via stack outputs" 

274 ) 

275 

276 def _get_datasync_role_arn(self, region: str) -> str: 

277 """Get the DataSync IAM role ARN.""" 

278 raise NotImplementedError( 

279 "DataSync role ARN lookup not implemented - configure via stack outputs" 

280 ) 

281 

282 def get_access_point_info(self, file_system_id: str, region: str) -> list[dict[str, Any]]: 

283 """ 

284 Get EFS access points for a file system. 

285 

286 Args: 

287 file_system_id: EFS file system ID 

288 region: AWS region 

289 

290 Returns: 

291 List of access point information 

292 """ 

293 try: 

294 efs = self._session.client("efs", region_name=region) 

295 response = efs.describe_access_points(FileSystemId=file_system_id) 

296 

297 return [ 

298 { 

299 "access_point_id": ap["AccessPointId"], 

300 "name": ap.get("Name", ""), 

301 "path": ap.get("RootDirectory", {}).get("Path", "/"), 

302 "posix_user": ap.get("PosixUser", {}), 

303 "status": ap["LifeCycleState"], 

304 } 

305 for ap in response.get("AccessPoints", []) 

306 ] 

307 except ClientError: 

308 return [] 

309 

310 def download_from_pod( 

311 self, 

312 region: str, 

313 pod_name: str, 

314 remote_path: str, 

315 local_path: str, 

316 namespace: str = "gco-jobs", 

317 container: str | None = None, 

318 ) -> dict[str, Any]: 

319 """ 

320 Download files from a pod using kubectl cp. 

321 

322 This uses kubectl port-forward internally to copy files from a pod's 

323 mounted file system (EFS/FSx) to the local machine. 

324 

325 Args: 

326 region: AWS region where the cluster is located 

327 pod_name: Name of the pod to copy from 

328 remote_path: Path inside the pod (e.g., /mnt/efs/outputs) 

329 local_path: Local destination path 

330 namespace: Kubernetes namespace (default: gco-jobs) 

331 container: Container name (optional, for multi-container pods) 

332 

333 Returns: 

334 Dict with download status and details 

335 """ 

336 import os 

337 import subprocess 

338 

339 # Update kubeconfig for the cluster 

340 cluster_name = f"gco-{region}" 

341 update_kubeconfig(cluster_name, region) 

342 

343 # Build kubectl cp command 

344 # Format: kubectl cp <namespace>/<pod>:<remote_path> <local_path> 

345 source = f"{namespace}/{pod_name}:{remote_path}" 

346 cmd = ["kubectl", "cp", source, local_path] 

347 

348 if container: 

349 cmd.extend(["-c", container]) 

350 

351 try: 

352 subprocess.run( 

353 cmd, check=True, capture_output=True, text=True 

354 ) # nosemgrep: dangerous-subprocess-use-audit - cmd is a list ["kubectl","cp",source,local_path]; source is namespace/pod:path, local_path is caller-provided destination 

355 if os.path.isfile(local_path): 

356 size = os.path.getsize(local_path) 

357 elif os.path.isdir(local_path): 357 ↛ 364line 357 didn't jump to line 364 because the condition on line 357 was always true

358 size = sum( 

359 os.path.getsize(os.path.join(dirpath, filename)) 

360 for dirpath, _, filenames in os.walk(local_path) 

361 for filename in filenames 

362 ) 

363 else: 

364 size = 0 

365 

366 return { 

367 "status": "success", 

368 "source": source, 

369 "destination": local_path, 

370 "size_bytes": size, 

371 "message": "Download completed successfully", 

372 } 

373 

374 except subprocess.CalledProcessError as e: 

375 raise RuntimeError(f"kubectl cp failed: {e.stderr}") from e 

376 except FileNotFoundError as e: 

377 raise RuntimeError( 

378 "kubectl not found. Please install kubectl and ensure it's in your PATH." 

379 ) from e 

380 

381 def list_storage_contents( 

382 self, 

383 region: str, 

384 remote_path: str = "/", 

385 storage_type: str = "efs", 

386 namespace: str = "gco-jobs", 

387 pvc_name: str | None = None, 

388 ) -> dict[str, Any]: 

389 """ 

390 List contents of EFS/FSx storage using a temporary helper pod. 

391 

392 This creates a temporary pod that mounts the storage, lists contents, 

393 then cleans up. Useful for discovering what directories/files exist. 

394 

395 Args: 

396 region: AWS region where the cluster is located 

397 remote_path: Path inside the storage to list (default: root) 

398 storage_type: "efs" or "fsx" (default: efs) 

399 namespace: Kubernetes namespace (default: gco-jobs) 

400 pvc_name: PVC name to mount (default: gco-shared-storage for EFS, 

401 gco-fsx-storage for FSx) 

402 

403 Returns: 

404 Dict with listing status and contents 

405 """ 

406 import subprocess 

407 import time 

408 import uuid 

409 

410 # Determine PVC name based on storage type 

411 if pvc_name is None: 411 ↛ 415line 411 didn't jump to line 415 because the condition on line 411 was always true

412 pvc_name = "gco-shared-storage" if storage_type == "efs" else "gco-fsx-storage" 

413 

414 # Determine mount path based on storage type 

415 mount_path = "/efs" if storage_type == "efs" else "/fsx" 

416 

417 # Generate unique pod name 

418 helper_pod_name = f"gco-list-helper-{uuid.uuid4().hex[:8]}" 

419 

420 # Update kubeconfig for the cluster 

421 cluster_name = f"gco-{region}" 

422 update_kubeconfig(cluster_name, region) 

423 

424 # Create helper pod manifest. 

425 # 

426 # We set an explicit ``resources`` block with CPU + memory but no GPU 

427 # so the gco-jobs LimitRange admission plugin does not substitute its 

428 # ``max`` value as an implicit request. Without this, a namespace 

429 # that already has all 32 GPUs in use (typical during a demo or 

430 # heavy workload burst) would reject the helper pod — even though 

431 # listing files doesn't need a GPU — because K8s quota admission 

432 # would attribute the LimitRange's ``max.nvidia.com/gpu`` to the 

433 # pod's request. 

434 pod_manifest = f""" 

435apiVersion: v1 

436kind: Pod 

437metadata: 

438 name: {helper_pod_name} 

439 namespace: {namespace} 

440 labels: 

441 app: gco-list-helper 

442spec: 

443 restartPolicy: Never 

444 containers: 

445 - name: helper 

446 image: busybox:1.37.0 

447 command: ["sleep", "300"] 

448 resources: 

449 requests: 

450 cpu: "50m" 

451 memory: "64Mi" 

452 limits: 

453 cpu: "200m" 

454 memory: "256Mi" 

455 volumeMounts: 

456 - name: storage 

457 mountPath: {mount_path} 

458 volumes: 

459 - name: storage 

460 persistentVolumeClaim: 

461 claimName: {pvc_name} 

462""" 

463 

464 try: 

465 # Create the helper pod 

466 subprocess.run( 

467 ["kubectl", "apply", "-f", "-"], 

468 input=pod_manifest, 

469 capture_output=True, 

470 text=True, 

471 check=True, 

472 ) 

473 

474 # Wait for pod to be ready 

475 max_wait = 60 

476 waited = 0 

477 while waited < max_wait: 477 ↛ 497line 477 didn't jump to line 497 because the condition on line 477 was always true

478 status_result = subprocess.run( 

479 [ 

480 "kubectl", 

481 "get", 

482 "pod", 

483 helper_pod_name, 

484 "-n", 

485 namespace, 

486 "-o", 

487 "jsonpath={.status.phase}", 

488 ], 

489 capture_output=True, 

490 text=True, 

491 ) 

492 if status_result.stdout.strip() == "Running": 492 ↛ 494line 492 didn't jump to line 494 because the condition on line 492 was always true

493 break 

494 time.sleep(2) # nosemgrep: arbitrary-sleep 

495 waited += 2 

496 

497 if waited >= max_wait: 497 ↛ 498line 497 didn't jump to line 498 because the condition on line 497 was never true

498 raise RuntimeError("Helper pod did not become ready in time") 

499 

500 # Build the full path inside the pod 

501 full_remote_path = f"{mount_path}/{remote_path.lstrip('/')}" 

502 

503 # List contents using kubectl exec 

504 list_result = subprocess.run( 

505 [ 

506 "kubectl", 

507 "exec", 

508 helper_pod_name, 

509 "-n", 

510 namespace, 

511 "--", 

512 "ls", 

513 "-la", 

514 full_remote_path, 

515 ], 

516 capture_output=True, 

517 text=True, 

518 ) 

519 

520 if list_result.returncode != 0: 

521 return { 

522 "status": "error", 

523 "path": remote_path, 

524 "storage_type": storage_type, 

525 "contents": [], 

526 "message": f"Path not found or empty: {list_result.stderr.strip()}", 

527 } 

528 

529 # Parse ls output 

530 contents = [] 

531 for line in list_result.stdout.strip().split("\n"): 

532 if line.startswith("total") or not line.strip(): 

533 continue 

534 parts = line.split() 

535 if len(parts) >= 9: 535 ↛ 531line 535 didn't jump to line 531 because the condition on line 535 was always true

536 name = " ".join(parts[8:]) 

537 is_dir = line.startswith("d") 

538 size = int(parts[4]) if parts[4].isdigit() else 0 

539 contents.append( 

540 { 

541 "name": name, 

542 "is_directory": is_dir, 

543 "size_bytes": size, 

544 "permissions": parts[0], 

545 } 

546 ) 

547 

548 return { 

549 "status": "success", 

550 "path": remote_path, 

551 "storage_type": storage_type, 

552 "contents": contents, 

553 "message": f"Found {len(contents)} items", 

554 } 

555 

556 except subprocess.CalledProcessError as e: 

557 error_msg = e.stderr if e.stderr else str(e) 

558 raise RuntimeError(f"List failed: {error_msg}") from e 

559 except FileNotFoundError as e: 

560 raise RuntimeError( 

561 "kubectl not found. Please install kubectl and ensure it's in your PATH." 

562 ) from e 

563 finally: 

564 # Always clean up the helper pod 

565 import contextlib 

566 

567 with contextlib.suppress(Exception): 

568 subprocess.run( 

569 [ 

570 "kubectl", 

571 "delete", 

572 "pod", 

573 helper_pod_name, 

574 "-n", 

575 namespace, 

576 "--ignore-not-found", 

577 ], 

578 capture_output=True, 

579 text=True, 

580 ) 

581 

582 def download_from_storage( 

583 self, 

584 region: str, 

585 remote_path: str, 

586 local_path: str, 

587 storage_type: str = "efs", 

588 namespace: str = "gco-jobs", 

589 pvc_name: str | None = None, 

590 ) -> dict[str, Any]: 

591 """ 

592 Download files from EFS/FSx storage using a temporary helper pod. 

593 

594 This creates a temporary pod that mounts the storage, copies files via 

595 kubectl cp, then cleans up. Works even after the original job pod is gone. 

596 

597 Args: 

598 region: AWS region where the cluster is located 

599 remote_path: Path inside the storage (e.g., /efs-output-example/results.json) 

600 local_path: Local destination path 

601 storage_type: "efs" or "fsx" (default: efs) 

602 namespace: Kubernetes namespace (default: gco-jobs) 

603 pvc_name: PVC name to mount (default: gco-shared-storage for EFS, 

604 gco-fsx-storage for FSx) 

605 

606 Returns: 

607 Dict with download status and details 

608 """ 

609 import os 

610 import subprocess 

611 import time 

612 import uuid 

613 

614 # Determine PVC name based on storage type 

615 if pvc_name is None: 

616 pvc_name = "gco-shared-storage" if storage_type == "efs" else "gco-fsx-storage" 

617 

618 # Determine mount path based on storage type 

619 mount_path = "/efs" if storage_type == "efs" else "/fsx" 

620 

621 # Generate unique pod name 

622 helper_pod_name = f"gco-download-helper-{uuid.uuid4().hex[:8]}" 

623 

624 # Update kubeconfig for the cluster 

625 cluster_name = f"gco-{region}" 

626 update_kubeconfig(cluster_name, region) 

627 

628 # Create helper pod manifest. 

629 # 

630 # Explicit ``resources`` block avoids the LimitRange admission plugin 

631 # substituting ``max.nvidia.com/gpu`` as an implicit request — see the 

632 # ``ls`` helper above for the full rationale. 

633 pod_manifest = f""" 

634apiVersion: v1 

635kind: Pod 

636metadata: 

637 name: {helper_pod_name} 

638 namespace: {namespace} 

639 labels: 

640 app: gco-download-helper 

641spec: 

642 restartPolicy: Never 

643 containers: 

644 - name: helper 

645 image: busybox:1.37.0 

646 command: ["sleep", "300"] 

647 resources: 

648 requests: 

649 cpu: "50m" 

650 memory: "64Mi" 

651 limits: 

652 cpu: "200m" 

653 memory: "256Mi" 

654 volumeMounts: 

655 - name: storage 

656 mountPath: {mount_path} 

657 volumes: 

658 - name: storage 

659 persistentVolumeClaim: 

660 claimName: {pvc_name} 

661""" 

662 

663 try: 

664 # Create the helper pod 

665 subprocess.run( 

666 ["kubectl", "apply", "-f", "-"], 

667 input=pod_manifest, 

668 capture_output=True, 

669 text=True, 

670 check=True, 

671 ) 

672 

673 # Wait for pod to be ready 

674 max_wait = 60 

675 waited = 0 

676 while waited < max_wait: 

677 status_result = subprocess.run( 

678 [ 

679 "kubectl", 

680 "get", 

681 "pod", 

682 helper_pod_name, 

683 "-n", 

684 namespace, 

685 "-o", 

686 "jsonpath={.status.phase}", 

687 ], 

688 capture_output=True, 

689 text=True, 

690 ) 

691 if status_result.stdout.strip() == "Running": 

692 break 

693 time.sleep(2) # nosemgrep: arbitrary-sleep 

694 waited += 2 

695 

696 if waited >= max_wait: 

697 raise RuntimeError("Helper pod did not become ready in time") 

698 

699 # Build the full path inside the pod 

700 full_remote_path = f"{mount_path}/{remote_path.lstrip('/')}" 

701 

702 # Copy files from the helper pod 

703 source = f"{namespace}/{helper_pod_name}:{full_remote_path}" 

704 cmd = ["kubectl", "cp", source, local_path] 

705 

706 subprocess.run( 

707 cmd, check=True, capture_output=True, text=True 

708 ) # nosemgrep: dangerous-subprocess-use-audit - cmd is a list ["kubectl","cp",source,local_path]; source is namespace/pod:path, local_path is caller-provided destination 

709 

710 # Get file info 

711 if os.path.isfile(local_path): 711 ↛ 713line 711 didn't jump to line 713 because the condition on line 711 was always true

712 size = os.path.getsize(local_path) 

713 elif os.path.isdir(local_path): 

714 size = sum( 

715 os.path.getsize(os.path.join(dirpath, filename)) 

716 for dirpath, _, filenames in os.walk(local_path) 

717 for filename in filenames 

718 ) 

719 else: 

720 size = 0 

721 

722 return { 

723 "status": "success", 

724 "source": f"{storage_type}:{remote_path}", 

725 "destination": local_path, 

726 "size_bytes": size, 

727 "storage_type": storage_type, 

728 "message": "Download completed successfully", 

729 } 

730 

731 except subprocess.CalledProcessError as e: 

732 error_msg = e.stderr if e.stderr else str(e) 

733 raise RuntimeError(f"Download failed: {error_msg}") from e 

734 except FileNotFoundError as e: 

735 raise RuntimeError( 

736 "kubectl not found. Please install kubectl and ensure it's in your PATH." 

737 ) from e 

738 finally: 

739 # Always clean up the helper pod 

740 import contextlib 

741 

742 with contextlib.suppress(Exception): 

743 subprocess.run( 

744 [ 

745 "kubectl", 

746 "delete", 

747 "pod", 

748 helper_pod_name, 

749 "-n", 

750 namespace, 

751 "--ignore-not-found", 

752 ], 

753 capture_output=True, 

754 text=True, 

755 ) 

756 

757 

758def get_file_system_client(config: GCOConfig | None = None) -> FileSystemClient: 

759 """Get a configured file system client instance.""" 

760 return FileSystemClient(config)