Provider Architecture¶
The Open Resource Broker implements a provider architecture that enables integration with AWS while maintaining clean separation between the core domain layer and AWS-specific implementations.
Provider Architecture Overview¶
graph TB
subgraph "Domain Layer (Provider Agnostic)"
DomainModels[Domain Models]
BusinessLogic[Business Logic]
DomainEvents[Domain Events]
end
subgraph "Application Layer"
AppService[Application Service]
CommandBus[Command Bus]
QueryBus[Query Bus]
end
subgraph "Provider Abstraction"
ProviderInterface[Provider Interface]
ProviderFactory[Provider Factory]
ResourceManager[Resource Manager Interface]
end
subgraph "AWS Provider Implementation"
AWSProvider[AWS Provider]
AWSHandlers[AWS Handlers]
AWSOperations[AWS Operations Utility]
AWSClient[AWS Client]
end
AppService --> ProviderInterface
ProviderFactory --> AWSProvider
AWSProvider --> AWSHandlers
AWSHandlers --> AWSOperations
AWSOperations --> AWSClient
DomainModels -.-> ProviderInterface
BusinessLogic -.-> ProviderInterface
Provider Abstraction Layer¶
Provider Interface¶
The core provider interface defines the contract that all cloud providers must implement:
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from orb.domain.machine import Machine
from orb.domain.request import Request
from orb.domain.template import Template
class ProviderInterface(ABC):
"""Abstract interface for cloud providers."""
@abstractmethod
def get_provider_type(self) -> str:
"""Get the provider type identifier."""
pass
@abstractmethod
def initialize(self) -> bool:
"""Initialize the provider."""
pass
@abstractmethod
def health_check(self) -> Dict[str, Any]:
"""Check provider health status."""
pass
@abstractmethod
def get_available_templates(self) -> List[Template]:
"""Get available machine templates."""
pass
@abstractmethod
def provision_machines(self, request: Request) -> List[Machine]:
"""Provision machines based on request."""
pass
@abstractmethod
def terminate_machines(self, machine_ids: List[str]) -> bool:
"""Terminate specified machines."""
pass
@abstractmethod
def get_machine_status(self, machine_ids: List[str]) -> List[Machine]:
"""Get status of specified machines."""
pass
@abstractmethod
def validate_template(self, template: Template) -> List[str]:
"""Validate template configuration."""
pass
Provider Factory¶
The provider factory creates provider instances dynamically:
from typing import Dict, Type, Optional
from orb.infrastructure.logging.logger import get_logger
class ProviderFactory:
"""Factory for creating provider instances."""
_providers: Dict[str, Type[ProviderInterface]] = {}
@classmethod
def register_provider(cls, provider_type: str, provider_class: Type[ProviderInterface]):
"""Register a provider implementation."""
cls._providers[provider_type] = provider_class
get_logger(__name__).info(f"Registered provider: {provider_type}")
@classmethod
def create_provider(cls, provider_type: str, config: Any) -> ProviderInterface:
"""Create a provider instance."""
if provider_type not in cls._providers:
raise ValueError(f"Unknown provider type: {provider_type}")
provider_class = cls._providers[provider_type]
return provider_class(config)
@classmethod
def get_available_providers(cls) -> List[str]:
"""Get list of available provider types."""
return list(cls._providers.keys())
# Register AWS provider
from orb.providers.aws.aws_provider import AWSProvider
ProviderFactory.register_provider("aws", AWSProvider)
AWS Provider Implementation¶
AWS Provider Architecture¶
The AWS provider implements a sophisticated handler-based architecture:
graph TB
subgraph "AWS Provider"
AWSProvider[AWS Provider]
HandlerFactory[Handler Factory]
OperationsUtility[Operations Utility]
end
subgraph "AWS Handlers"
EC2FleetHandler[EC2 Fleet Handler]
ASGHandler[Auto Scaling Handler]
SpotFleetHandler[Spot Fleet Handler]
RunInstancesHandler[Run Instances Handler]
end
subgraph "AWS Services"
EC2[Amazon EC2]
AutoScaling[Auto Scaling]
CloudWatch[CloudWatch]
IAM[IAM]
end
AWSProvider --> HandlerFactory
HandlerFactory --> EC2FleetHandler
HandlerFactory --> ASGHandler
HandlerFactory --> SpotFleetHandler
HandlerFactory --> RunInstancesHandler
EC2FleetHandler --> OperationsUtility
ASGHandler --> OperationsUtility
SpotFleetHandler --> OperationsUtility
RunInstancesHandler --> OperationsUtility
OperationsUtility --> EC2
OperationsUtility --> AutoScaling
OperationsUtility --> CloudWatch
AWS Provider Class¶
from typing import List, Dict, Any, Optional
from orb.infrastructure.interfaces.provider import ProviderInterface
from orb.providers.aws.infrastructure.aws_handler_factory import AWSHandlerFactory
from orb.providers.aws.configuration.config import AWSConfig
from orb.providers.aws.utilities.aws_operations import AWSOperations
class AWSProvider(ProviderInterface):
"""AWS cloud provider implementation."""
def __init__(self, config: AWSConfig):
"""Initialize AWS provider."""
self.config = config
self.handler_factory = AWSHandlerFactory(config)
self.aws_operations = AWSOperations(config)
self._initialized = False
self.logger = get_logger(__name__)
def get_provider_type(self) -> str:
"""Get provider type."""
return "aws"
def initialize(self) -> bool:
"""Initialize AWS provider."""
try:
# Test AWS connectivity
self.aws_operations.test_connection()
# Initialize handler factory
self.handler_factory.initialize()
self._initialized = True
self.logger.info("AWS provider initialized successfully")
return True
except Exception as e:
self.logger.error(f"Failed to initialize AWS provider: {e}")
return False
def health_check(self) -> Dict[str, Any]:
"""Check AWS provider health."""
if not self._initialized:
return {"status": "unhealthy", "reason": "not_initialized"}
try:
# Check AWS service connectivity
ec2_health = self.aws_operations.check_ec2_connectivity()
asg_health = self.aws_operations.check_autoscaling_connectivity()
return {
"status": "healthy" if ec2_health and asg_health else "degraded",
"services": {
"ec2": "healthy" if ec2_health else "unhealthy",
"autoscaling": "healthy" if asg_health else "unhealthy"
},
"region": self.config.region,
"account_id": self.aws_operations.get_account_id()
}
except Exception as e:
return {"status": "unhealthy", "reason": str(e)}
AWS Handler Hierarchy¶
Base Handler¶
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from orb.providers.aws.utilities.aws_operations import AWSOperations
class BaseAWSHandler(ABC):
"""Base class for all AWS handlers."""
def __init__(self, aws_operations: AWSOperations):
"""Initialize base handler."""
self.aws_operations = aws_operations
self.logger = get_logger(self.__class__.__name__)
@abstractmethod
def get_handler_type(self) -> str:
"""Get handler type identifier."""
pass
@abstractmethod
def provision_machines(self, template: Dict[str, Any],
machine_count: int) -> List[Dict[str, Any]]:
"""Provision machines using this handler."""
pass
@abstractmethod
def terminate_machines(self, machine_ids: List[str]) -> bool:
"""Terminate machines using this handler."""
pass
@abstractmethod
def get_machine_status(self, machine_ids: List[str]) -> List[Dict[str, Any]]:
"""Get machine status using this handler."""
pass
def validate_template(self, template: Dict[str, Any]) -> List[str]:
"""Validate template for this handler."""
errors = []
# Common validation
if not template.get('vm_type'):
errors.append("vm_type is required")
if not template.get('image_id'):
errors.append("image_id is required")
return errors
EC2 Fleet Handler¶
class EC2FleetHandler(BaseAWSHandler):
"""Handler for EC2 Fleet operations."""
def get_handler_type(self) -> str:
return "EC2Fleet"
def provision_machines(self, template: Dict[str, Any],
machine_count: int) -> List[Dict[str, Any]]:
"""Provision machines using EC2 Fleet."""
try:
# Create fleet configuration
fleet_config = self._build_fleet_config(template, machine_count)
# Create EC2 fleet
fleet_response = self.aws_operations.create_ec2_fleet(fleet_config)
# Wait for instances to be created
instance_ids = self._wait_for_fleet_instances(fleet_response['FleetId'])
# Get instance details
machines = []
for instance_id in instance_ids:
instance_details = self.aws_operations.get_instance_details(instance_id)
machines.append({
'machine_id': f"machine-{instance_id}",
'provider_instance_id': instance_id,
'status': 'PENDING',
'instance_type': instance_details.get('InstanceType'),
'private_ip': instance_details.get('PrivateIpAddress'),
'public_ip': instance_details.get('PublicIpAddress')
})
return machines
except Exception as e:
self.logger.error(f"Failed to provision machines with EC2 Fleet: {e}")
raise
def _build_fleet_config(self, template: Dict[str, Any],
machine_count: int) -> Dict[str, Any]:
"""Build EC2 Fleet configuration."""
return {
'LaunchTemplateConfigs': [{
'LaunchTemplateSpecification': {
'LaunchTemplateName': template.get('launch_template_name'),
'Version': template.get('launch_template_version', '$Latest')
},
'Overrides': [{
'InstanceType': template['vm_type'],
'SubnetId': subnet_id,
'AvailabilityZone': self.aws_operations.get_subnet_az(subnet_id)
} for subnet_id in template.get('subnet_ids', [])]
}],
'TargetCapacitySpecification': {
'TotalTargetCapacity': machine_count,
'OnDemandTargetCapacity': machine_count,
'DefaultTargetCapacityType': 'on-demand'
},
'Type': 'instant'
}
Auto Scaling Group Handler¶
class ASGHandler(BaseAWSHandler):
"""Handler for Auto Scaling Group operations."""
def get_handler_type(self) -> str:
return "ASG"
def provision_machines(self, template: Dict[str, Any],
machine_count: int) -> List[Dict[str, Any]]:
"""Provision machines using Auto Scaling Group."""
try:
# Create launch template if needed
launch_template = self._ensure_launch_template(template)
# Create Auto Scaling Group
asg_name = f"hostfactory-{template['template_id']}-{int(time.time())}"
asg_config = self._build_asg_config(template, machine_count,
launch_template['LaunchTemplateName'])
self.aws_operations.create_auto_scaling_group(asg_name, asg_config)
# Wait for instances to be launched
instance_ids = self._wait_for_asg_instances(asg_name, machine_count)
# Get instance details
machines = []
for instance_id in instance_ids:
instance_details = self.aws_operations.get_instance_details(instance_id)
machines.append({
'machine_id': f"machine-{instance_id}",
'provider_instance_id': instance_id,
'status': 'PENDING',
'instance_type': instance_details.get('InstanceType'),
'private_ip': instance_details.get('PrivateIpAddress'),
'asg_name': asg_name
})
return machines
except Exception as e:
self.logger.error(f"Failed to provision machines with ASG: {e}")
raise
AWS Operations Utility¶
The AWS Operations utility consolidates common AWS operations:
class AWSOperations:
"""Consolidated AWS operations utility."""
def __init__(self, config: AWSConfig):
"""Initialize AWS operations."""
self.config = config
self.ec2_client = self._create_ec2_client()
self.autoscaling_client = self._create_autoscaling_client()
self.logger = get_logger(__name__)
def create_ec2_fleet(self, fleet_config: Dict[str, Any]) -> Dict[str, Any]:
"""Create EC2 fleet with error handling and retry."""
try:
response = self.ec2_client.create_fleet(**fleet_config)
if response.get('Errors'):
error_msg = f"EC2 Fleet creation errors: {response['Errors']}"
self.logger.error(error_msg)
raise AWSProviderError(error_msg)
return response
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code in ['InsufficientInstanceCapacity', 'RequestLimitExceeded']:
# Retryable errors
raise RetryableAWSError(f"AWS API error: {error_code}")
else:
# Non-retryable errors
raise AWSProviderError(f"AWS API error: {error_code}")
def terminate_instances(self, instance_ids: List[str]) -> bool:
"""Terminate EC2 instances with consolidated error handling."""
try:
if not instance_ids:
return True
response = self.ec2_client.terminate_instances(InstanceIds=instance_ids)
# Check for termination errors
terminating_instances = response.get('TerminatingInstances', [])
failed_instances = [
inst for inst in terminating_instances
if inst.get('CurrentState', {}).get('Name') not in ['shutting-down', 'terminated']
]
if failed_instances:
self.logger.warning(f"Some instances failed to terminate: {failed_instances}")
return False
return True
except ClientError as e:
self.logger.error(f"Failed to terminate instances: {e}")
return False
def get_instance_details(self, instance_id: str) -> Dict[str, Any]:
"""Get detailed instance information."""
try:
response = self.ec2_client.describe_instances(InstanceIds=[instance_id])
if not response['Reservations']:
raise AWSProviderError(f"Instance {instance_id} not found")
instance = response['Reservations'][0]['Instances'][0]
return {
'InstanceId': instance['InstanceId'],
'InstanceType': instance['InstanceType'],
'State': instance['State']['Name'],
'PrivateIpAddress': instance.get('PrivateIpAddress'),
'PublicIpAddress': instance.get('PublicIpAddress'),
'LaunchTime': instance.get('LaunchTime'),
'Tags': {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}
}
except ClientError as e:
self.logger.error(f"Failed to get instance details: {e}")
raise AWSProviderError(f"Failed to get instance details: {e}")
Adding New Providers¶
Implement Provider Interface¶
# src/providers/provider1/provider1_provider.py
from orb.infrastructure.interfaces.provider import ProviderInterface
class Provider1Provider(ProviderInterface):
"""Provider1 cloud provider implementation."""
def __init__(self, config: Provider1Config):
self.config = config
# Initialize Provider1 clients
def get_provider_type(self) -> str:
return "provider1"
def initialize(self) -> bool:
# Initialize Provider1 provider
pass
# Implement all abstract methods...
Create Provider Configuration with BaseSettings¶
# src/providers/provider1/configuration/config.py
from typing import Optional
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
from orb.infrastructure.interfaces.provider import BaseProviderConfig
class Provider1ProviderConfig(BaseSettings, BaseProviderConfig):
"""Provider1 provider configuration with automatic environment variable support."""
model_config = SettingsConfigDict(
env_prefix='ORB_PROVIDER1_',
case_sensitive=False,
populate_by_name=True,
env_nested_delimiter='__'
)
# Provider identification
provider_type: str = "provider1"
# Provider1 Authentication - automatically mapped to ORB_PROVIDER1_* env vars
account_id: str = Field(..., description="Provider1 account ID")
tenant_id: str = Field(..., description="Provider1 tenant ID")
client_id: str = Field(..., description="Provider1 client ID")
client_secret: str = Field(..., description="Provider1 client secret")
# Provider1 Settings
resource_group: str = Field(..., description="Provider1 resource group")
location: str = Field("Region A", description="Provider1 location")
# Optional settings
endpoint_url: Optional[str] = Field(None, description="Provider1 endpoint URL")
max_retries: int = Field(3, description="Maximum retries for Provider1 API calls")
timeout: int = Field(30, description="Request timeout in seconds")
Register Provider Settings¶
# src/providers/provider1/__init__.py
from orb.config.schemas.provider_settings_registry import ProviderSettingsRegistry
from .configuration.config import Provider1ProviderConfig
# Register Provider1 provider settings for automatic environment variable support
ProviderSettingsRegistry.register_provider_settings("provider1", Provider1ProviderConfig)
Register Provider¶
# Register Provider1 provider
from orb.providers.provider1.provider1_provider import Provider1Provider
ProviderFactory.register_provider("provider1", Provider1Provider)
Configuration with Environment Variable Support¶
Configuration File:
{
"providers": [{
"name": "provider1-region-a",
"type": "provider1",
"config": {
"subscription_id": "your-subscription-id",
"tenant_id": "your-tenant-id",
"client_id": "your-client-id",
"resource_group": "hostfactory-rg",
"location": "East US"
}
}]
}
Environment Variables (Override Config File):
# All Provider1 fields support environment variable overrides
export ORB_PROVIDER1_ACCOUNT_ID="prod-account-id"
export ORB_PROVIDER1_CLIENT_SECRET="secure-client-secret"
export ORB_PROVIDER1_RESOURCE_GROUP="production-rg"
export ORB_PROVIDER1_LOCATION="Region B"
export ORB_PROVIDER1_MAX_RETRIES=5
Provider Configuration with BaseSettings¶
The Open Resource Broker uses Pydantic BaseSettings for provider configuration, enabling automatic environment variable mapping, type validation, and extensible configuration schemas.
BaseSettings Benefits¶
- Automatic Environment Variable Mapping: No manual mapping required
- Type Safety: Automatic type conversion and validation
- Provider Extensibility: Each provider defines its own configuration schema
- Environment Variable Precedence: Environment variables override configuration file values
AWS Provider Configuration Example¶
class AWSProviderConfig(BaseSettings, BaseProviderConfig):
"""AWS provider configuration with automatic environment variable support."""
model_config = SettingsConfigDict(
env_prefix='ORB_AWS_',
case_sensitive=False,
populate_by_name=True,
env_nested_delimiter='__'
)
# Authentication - automatically mapped to ORB_AWS_* env vars
region: str = Field("us-east-1", description="AWS region")
profile: Optional[str] = Field(None, description="AWS profile")
role_arn: Optional[str] = Field(None, description="AWS role ARN")
access_key_id: Optional[str] = Field(None, description="AWS access key ID")
secret_access_key: Optional[str] = Field(None, description="AWS secret access key")
# Service settings
aws_max_retries: int = Field(3, description="Maximum retries for AWS API calls")
aws_read_timeout: int = Field(30, description="Read timeout in seconds")
# Complex nested objects support JSON environment variables
handlers: HandlersConfig = Field(default_factory=HandlersConfig)
launch_template: LaunchTemplateConfiguration = Field(default_factory=LaunchTemplateConfiguration)
Environment Variable Support¶
All provider configuration fields automatically support environment variable overrides:
# Basic configuration
export ORB_AWS_REGION="us-west-2"
export ORB_AWS_PROFILE="production"
export ORB_AWS_ROLE_ARN="arn:aws:iam::123456789012:role/OrbitRole"
# Service settings
export ORB_AWS_AWS_MAX_RETRIES=5
export ORB_AWS_AWS_READ_TIMEOUT=60
# Complex nested objects (JSON format)
export ORB_AWS_HANDLERS='{"ec2_fleet": true, "spot_fleet": false}'
export ORB_AWS_LAUNCH_TEMPLATE='{"create_per_request": false, "reuse_existing": true}'
# Nested delimiter support
export ORB_AWS_HANDLERS__EC2_FLEET=true
export ORB_AWS_HANDLERS__SPOT_FLEET=false
Provider Settings Registry¶
The provider settings registry manages BaseSettings classes for automatic environment variable loading:
from orb.config.schemas.provider_settings_registry import ProviderSettingsRegistry
# Register provider settings class
ProviderSettingsRegistry.register_provider_settings("aws", AWSProviderConfig)
# Create settings instance with automatic env var loading
settings = ProviderSettingsRegistry.create_settings("aws", config_dict)
Provider Configuration¶
AWS Provider Configuration¶
{
"provider": {
"type": "aws",
"aws": {
"region": "us-east-1",
"profile": "default",
"handlers": {
"preferred_order": ["EC2Fleet", "ASG", "SpotFleet", "RunInstances"],
"ec2_fleet": {
"enabled": true,
"default_type": "instant",
"max_spot_percentage": 50
},
"asg": {
"enabled": true,
"default_min_size": 0,
"default_max_size": 100,
"health_check_type": "EC2"
},
"spot_fleet": {
"enabled": true,
"allocation_strategy": "lowestPrice",
"target_capacity": 10
},
"run_instances": {
"enabled": true,
"max_instances_per_call": 20
}
}
}
}
}
Provider Best Practices¶
Design Principles¶
- Provider Agnostic Domain: Keep domain layer free of cloud-specific code
- Interface Segregation: Define focused interfaces for specific capabilities
- Error Handling: Implement comprehensive error handling and classification
- Retry Logic: Handle transient errors with appropriate retry strategies
- Resource Cleanup: Ensure appropriate cleanup of cloud resources
Implementation Guidelines¶
- Use Factory Pattern: Create providers through factory for flexibility
- Configuration Validation: Validate provider configuration on startup
- Health Monitoring: Implement health checks for provider connectivity
- Logging: Add comprehensive logging for debugging and monitoring
- Testing: Create mock providers for testing without cloud resources
Performance Considerations¶
- Connection Pooling: Reuse connections to cloud APIs
- Batch Operations: Group similar operations for efficiency
- Caching: Cache frequently accessed data (templates, metadata)
- Async Operations: Use asynchronous operations where possible
- Rate Limiting: Respect cloud provider rate limits
Next Steps¶
- Resilience Patterns: Learn about error handling and retry logic
- Data Models: Understand the data validation architecture
- Storage Strategies: Learn about storage options
- Configuration Reference: Complete configuration guide