Skip to content

Provider Architecture

The Open Resource Broker implements a provider architecture that enables integration with AWS while maintaining clean separation between the core domain layer and AWS-specific implementations.

Provider Architecture Overview

graph TB
    subgraph "Domain Layer (Provider Agnostic)"
        DomainModels[Domain Models]
        BusinessLogic[Business Logic]
        DomainEvents[Domain Events]
    end

    subgraph "Application Layer"
        AppService[Application Service]
        CommandBus[Command Bus]
        QueryBus[Query Bus]
    end

    subgraph "Provider Abstraction"
        ProviderInterface[Provider Interface]
        ProviderFactory[Provider Factory]
        ResourceManager[Resource Manager Interface]
    end

    subgraph "AWS Provider Implementation"
        AWSProvider[AWS Provider]
        AWSHandlers[AWS Handlers]
        AWSOperations[AWS Operations Utility]
        AWSClient[AWS Client]
    end

    AppService --> ProviderInterface
    ProviderFactory --> AWSProvider

    AWSProvider --> AWSHandlers
    AWSHandlers --> AWSOperations
    AWSOperations --> AWSClient

    DomainModels -.-> ProviderInterface
    BusinessLogic -.-> ProviderInterface

Provider Abstraction Layer

Provider Interface

The core provider interface defines the contract that all cloud providers must implement:

from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from orb.domain.machine import Machine
from orb.domain.request import Request
from orb.domain.template import Template

class ProviderInterface(ABC):
    """Abstract interface for cloud providers."""

    @abstractmethod
    def get_provider_type(self) -> str:
        """Get the provider type identifier."""
        pass

    @abstractmethod
    def initialize(self) -> bool:
        """Initialize the provider."""
        pass

    @abstractmethod
    def health_check(self) -> Dict[str, Any]:
        """Check provider health status."""
        pass

    @abstractmethod
    def get_available_templates(self) -> List[Template]:
        """Get available machine templates."""
        pass

    @abstractmethod
    def provision_machines(self, request: Request) -> List[Machine]:
        """Provision machines based on request."""
        pass

    @abstractmethod
    def terminate_machines(self, machine_ids: List[str]) -> bool:
        """Terminate specified machines."""
        pass

    @abstractmethod
    def get_machine_status(self, machine_ids: List[str]) -> List[Machine]:
        """Get status of specified machines."""
        pass

    @abstractmethod
    def validate_template(self, template: Template) -> List[str]:
        """Validate template configuration."""
        pass

Provider Factory

The provider factory creates provider instances dynamically:

from typing import Dict, Type, Optional
from orb.infrastructure.logging.logger import get_logger

class ProviderFactory:
    """Factory for creating provider instances."""

    _providers: Dict[str, Type[ProviderInterface]] = {}

    @classmethod
    def register_provider(cls, provider_type: str, provider_class: Type[ProviderInterface]):
        """Register a provider implementation."""
        cls._providers[provider_type] = provider_class
        get_logger(__name__).info(f"Registered provider: {provider_type}")

    @classmethod
    def create_provider(cls, provider_type: str, config: Any) -> ProviderInterface:
        """Create a provider instance."""
        if provider_type not in cls._providers:
            raise ValueError(f"Unknown provider type: {provider_type}")

        provider_class = cls._providers[provider_type]
        return provider_class(config)

    @classmethod
    def get_available_providers(cls) -> List[str]:
        """Get list of available provider types."""
        return list(cls._providers.keys())

# Register AWS provider
from orb.providers.aws.aws_provider import AWSProvider
ProviderFactory.register_provider("aws", AWSProvider)

AWS Provider Implementation

AWS Provider Architecture

The AWS provider implements a sophisticated handler-based architecture:

graph TB
    subgraph "AWS Provider"
        AWSProvider[AWS Provider]
        HandlerFactory[Handler Factory]
        OperationsUtility[Operations Utility]
    end

    subgraph "AWS Handlers"
        EC2FleetHandler[EC2 Fleet Handler]
        ASGHandler[Auto Scaling Handler]
        SpotFleetHandler[Spot Fleet Handler]
        RunInstancesHandler[Run Instances Handler]
    end

    subgraph "AWS Services"
        EC2[Amazon EC2]
        AutoScaling[Auto Scaling]
        CloudWatch[CloudWatch]
        IAM[IAM]
    end

    AWSProvider --> HandlerFactory
    HandlerFactory --> EC2FleetHandler
    HandlerFactory --> ASGHandler
    HandlerFactory --> SpotFleetHandler
    HandlerFactory --> RunInstancesHandler

    EC2FleetHandler --> OperationsUtility
    ASGHandler --> OperationsUtility
    SpotFleetHandler --> OperationsUtility
    RunInstancesHandler --> OperationsUtility

    OperationsUtility --> EC2
    OperationsUtility --> AutoScaling
    OperationsUtility --> CloudWatch

AWS Provider Class

from typing import List, Dict, Any, Optional
from orb.infrastructure.interfaces.provider import ProviderInterface
from orb.providers.aws.infrastructure.aws_handler_factory import AWSHandlerFactory
from orb.providers.aws.configuration.config import AWSConfig
from orb.providers.aws.utilities.aws_operations import AWSOperations

class AWSProvider(ProviderInterface):
    """AWS cloud provider implementation."""

    def __init__(self, config: AWSConfig):
        """Initialize AWS provider."""
        self.config = config
        self.handler_factory = AWSHandlerFactory(config)
        self.aws_operations = AWSOperations(config)
        self._initialized = False
        self.logger = get_logger(__name__)

    def get_provider_type(self) -> str:
        """Get provider type."""
        return "aws"

    def initialize(self) -> bool:
        """Initialize AWS provider."""
        try:
            # Test AWS connectivity
            self.aws_operations.test_connection()

            # Initialize handler factory
            self.handler_factory.initialize()

            self._initialized = True
            self.logger.info("AWS provider initialized successfully")
            return True

        except Exception as e:
            self.logger.error(f"Failed to initialize AWS provider: {e}")
            return False

    def health_check(self) -> Dict[str, Any]:
        """Check AWS provider health."""
        if not self._initialized:
            return {"status": "unhealthy", "reason": "not_initialized"}

        try:
            # Check AWS service connectivity
            ec2_health = self.aws_operations.check_ec2_connectivity()
            asg_health = self.aws_operations.check_autoscaling_connectivity()

            return {
                "status": "healthy" if ec2_health and asg_health else "degraded",
                "services": {
                    "ec2": "healthy" if ec2_health else "unhealthy",
                    "autoscaling": "healthy" if asg_health else "unhealthy"
                },
                "region": self.config.region,
                "account_id": self.aws_operations.get_account_id()
            }

        except Exception as e:
            return {"status": "unhealthy", "reason": str(e)}

AWS Handler Hierarchy

Base Handler

from abc import ABC, abstractmethod
from typing import List, Dict, Any
from orb.providers.aws.utilities.aws_operations import AWSOperations

class BaseAWSHandler(ABC):
    """Base class for all AWS handlers."""

    def __init__(self, aws_operations: AWSOperations):
        """Initialize base handler."""
        self.aws_operations = aws_operations
        self.logger = get_logger(self.__class__.__name__)

    @abstractmethod
    def get_handler_type(self) -> str:
        """Get handler type identifier."""
        pass

    @abstractmethod
    def provision_machines(self, template: Dict[str, Any],
                          machine_count: int) -> List[Dict[str, Any]]:
        """Provision machines using this handler."""
        pass

    @abstractmethod
    def terminate_machines(self, machine_ids: List[str]) -> bool:
        """Terminate machines using this handler."""
        pass

    @abstractmethod
    def get_machine_status(self, machine_ids: List[str]) -> List[Dict[str, Any]]:
        """Get machine status using this handler."""
        pass

    def validate_template(self, template: Dict[str, Any]) -> List[str]:
        """Validate template for this handler."""
        errors = []

        # Common validation
        if not template.get('vm_type'):
            errors.append("vm_type is required")

        if not template.get('image_id'):
            errors.append("image_id is required")

        return errors

EC2 Fleet Handler

class EC2FleetHandler(BaseAWSHandler):
    """Handler for EC2 Fleet operations."""

    def get_handler_type(self) -> str:
        return "EC2Fleet"

    def provision_machines(self, template: Dict[str, Any],
                          machine_count: int) -> List[Dict[str, Any]]:
        """Provision machines using EC2 Fleet."""
        try:
            # Create fleet configuration
            fleet_config = self._build_fleet_config(template, machine_count)

            # Create EC2 fleet
            fleet_response = self.aws_operations.create_ec2_fleet(fleet_config)

            # Wait for instances to be created
            instance_ids = self._wait_for_fleet_instances(fleet_response['FleetId'])

            # Get instance details
            machines = []
            for instance_id in instance_ids:
                instance_details = self.aws_operations.get_instance_details(instance_id)
                machines.append({
                    'machine_id': f"machine-{instance_id}",
                    'provider_instance_id': instance_id,
                    'status': 'PENDING',
                    'instance_type': instance_details.get('InstanceType'),
                    'private_ip': instance_details.get('PrivateIpAddress'),
                    'public_ip': instance_details.get('PublicIpAddress')
                })

            return machines

        except Exception as e:
            self.logger.error(f"Failed to provision machines with EC2 Fleet: {e}")
            raise

    def _build_fleet_config(self, template: Dict[str, Any],
                           machine_count: int) -> Dict[str, Any]:
        """Build EC2 Fleet configuration."""
        return {
            'LaunchTemplateConfigs': [{
                'LaunchTemplateSpecification': {
                    'LaunchTemplateName': template.get('launch_template_name'),
                    'Version': template.get('launch_template_version', '$Latest')
                },
                'Overrides': [{
                    'InstanceType': template['vm_type'],
                    'SubnetId': subnet_id,
                    'AvailabilityZone': self.aws_operations.get_subnet_az(subnet_id)
                } for subnet_id in template.get('subnet_ids', [])]
            }],
            'TargetCapacitySpecification': {
                'TotalTargetCapacity': machine_count,
                'OnDemandTargetCapacity': machine_count,
                'DefaultTargetCapacityType': 'on-demand'
            },
            'Type': 'instant'
        }

Auto Scaling Group Handler

class ASGHandler(BaseAWSHandler):
    """Handler for Auto Scaling Group operations."""

    def get_handler_type(self) -> str:
        return "ASG"

    def provision_machines(self, template: Dict[str, Any],
                          machine_count: int) -> List[Dict[str, Any]]:
        """Provision machines using Auto Scaling Group."""
        try:
            # Create launch template if needed
            launch_template = self._ensure_launch_template(template)

            # Create Auto Scaling Group
            asg_name = f"hostfactory-{template['template_id']}-{int(time.time())}"
            asg_config = self._build_asg_config(template, machine_count,
                                              launch_template['LaunchTemplateName'])

            self.aws_operations.create_auto_scaling_group(asg_name, asg_config)

            # Wait for instances to be launched
            instance_ids = self._wait_for_asg_instances(asg_name, machine_count)

            # Get instance details
            machines = []
            for instance_id in instance_ids:
                instance_details = self.aws_operations.get_instance_details(instance_id)
                machines.append({
                    'machine_id': f"machine-{instance_id}",
                    'provider_instance_id': instance_id,
                    'status': 'PENDING',
                    'instance_type': instance_details.get('InstanceType'),
                    'private_ip': instance_details.get('PrivateIpAddress'),
                    'asg_name': asg_name
                })

            return machines

        except Exception as e:
            self.logger.error(f"Failed to provision machines with ASG: {e}")
            raise

AWS Operations Utility

The AWS Operations utility consolidates common AWS operations:

class AWSOperations:
    """Consolidated AWS operations utility."""

    def __init__(self, config: AWSConfig):
        """Initialize AWS operations."""
        self.config = config
        self.ec2_client = self._create_ec2_client()
        self.autoscaling_client = self._create_autoscaling_client()
        self.logger = get_logger(__name__)

    def create_ec2_fleet(self, fleet_config: Dict[str, Any]) -> Dict[str, Any]:
        """Create EC2 fleet with error handling and retry."""
        try:
            response = self.ec2_client.create_fleet(**fleet_config)

            if response.get('Errors'):
                error_msg = f"EC2 Fleet creation errors: {response['Errors']}"
                self.logger.error(error_msg)
                raise AWSProviderError(error_msg)

            return response

        except ClientError as e:
            error_code = e.response['Error']['Code']
            if error_code in ['InsufficientInstanceCapacity', 'RequestLimitExceeded']:
                # Retryable errors
                raise RetryableAWSError(f"AWS API error: {error_code}")
            else:
                # Non-retryable errors
                raise AWSProviderError(f"AWS API error: {error_code}")

    def terminate_instances(self, instance_ids: List[str]) -> bool:
        """Terminate EC2 instances with consolidated error handling."""
        try:
            if not instance_ids:
                return True

            response = self.ec2_client.terminate_instances(InstanceIds=instance_ids)

            # Check for termination errors
            terminating_instances = response.get('TerminatingInstances', [])
            failed_instances = [
                inst for inst in terminating_instances
                if inst.get('CurrentState', {}).get('Name') not in ['shutting-down', 'terminated']
            ]

            if failed_instances:
                self.logger.warning(f"Some instances failed to terminate: {failed_instances}")
                return False

            return True

        except ClientError as e:
            self.logger.error(f"Failed to terminate instances: {e}")
            return False

    def get_instance_details(self, instance_id: str) -> Dict[str, Any]:
        """Get detailed instance information."""
        try:
            response = self.ec2_client.describe_instances(InstanceIds=[instance_id])

            if not response['Reservations']:
                raise AWSProviderError(f"Instance {instance_id} not found")

            instance = response['Reservations'][0]['Instances'][0]
            return {
                'InstanceId': instance['InstanceId'],
                'InstanceType': instance['InstanceType'],
                'State': instance['State']['Name'],
                'PrivateIpAddress': instance.get('PrivateIpAddress'),
                'PublicIpAddress': instance.get('PublicIpAddress'),
                'LaunchTime': instance.get('LaunchTime'),
                'Tags': {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}
            }

        except ClientError as e:
            self.logger.error(f"Failed to get instance details: {e}")
            raise AWSProviderError(f"Failed to get instance details: {e}")

Adding New Providers

Implement Provider Interface

# src/providers/provider1/provider1_provider.py
from orb.infrastructure.interfaces.provider import ProviderInterface

class Provider1Provider(ProviderInterface):
    """Provider1 cloud provider implementation."""

    def __init__(self, config: Provider1Config):
        self.config = config
        # Initialize Provider1 clients

    def get_provider_type(self) -> str:
        return "provider1"

    def initialize(self) -> bool:
        # Initialize Provider1 provider
        pass

    # Implement all abstract methods...

Create Provider Configuration with BaseSettings

# src/providers/provider1/configuration/config.py
from typing import Optional
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
from orb.infrastructure.interfaces.provider import BaseProviderConfig

class Provider1ProviderConfig(BaseSettings, BaseProviderConfig):
    """Provider1 provider configuration with automatic environment variable support."""

    model_config = SettingsConfigDict(
        env_prefix='ORB_PROVIDER1_',
        case_sensitive=False,
        populate_by_name=True,
        env_nested_delimiter='__'
    )

    # Provider identification
    provider_type: str = "provider1"

    # Provider1 Authentication - automatically mapped to ORB_PROVIDER1_* env vars
    account_id: str = Field(..., description="Provider1 account ID")
    tenant_id: str = Field(..., description="Provider1 tenant ID")
    client_id: str = Field(..., description="Provider1 client ID")
    client_secret: str = Field(..., description="Provider1 client secret")

    # Provider1 Settings
    resource_group: str = Field(..., description="Provider1 resource group")
    location: str = Field("Region A", description="Provider1 location")

    # Optional settings
    endpoint_url: Optional[str] = Field(None, description="Provider1 endpoint URL")
    max_retries: int = Field(3, description="Maximum retries for Provider1 API calls")
    timeout: int = Field(30, description="Request timeout in seconds")

Register Provider Settings

# src/providers/provider1/__init__.py
from orb.config.schemas.provider_settings_registry import ProviderSettingsRegistry
from .configuration.config import Provider1ProviderConfig

# Register Provider1 provider settings for automatic environment variable support
ProviderSettingsRegistry.register_provider_settings("provider1", Provider1ProviderConfig)

Register Provider

# Register Provider1 provider
from orb.providers.provider1.provider1_provider import Provider1Provider
ProviderFactory.register_provider("provider1", Provider1Provider)

Configuration with Environment Variable Support

Configuration File:

{
  "providers": [{
    "name": "provider1-region-a",
    "type": "provider1",
    "config": {
      "subscription_id": "your-subscription-id",
      "tenant_id": "your-tenant-id",
      "client_id": "your-client-id",
      "resource_group": "hostfactory-rg",
      "location": "East US"
    }
  }]
}

Environment Variables (Override Config File):

# All Provider1 fields support environment variable overrides
export ORB_PROVIDER1_ACCOUNT_ID="prod-account-id"
export ORB_PROVIDER1_CLIENT_SECRET="secure-client-secret"
export ORB_PROVIDER1_RESOURCE_GROUP="production-rg"
export ORB_PROVIDER1_LOCATION="Region B"
export ORB_PROVIDER1_MAX_RETRIES=5

Provider Configuration with BaseSettings

The Open Resource Broker uses Pydantic BaseSettings for provider configuration, enabling automatic environment variable mapping, type validation, and extensible configuration schemas.

BaseSettings Benefits

  • Automatic Environment Variable Mapping: No manual mapping required
  • Type Safety: Automatic type conversion and validation
  • Provider Extensibility: Each provider defines its own configuration schema
  • Environment Variable Precedence: Environment variables override configuration file values

AWS Provider Configuration Example

class AWSProviderConfig(BaseSettings, BaseProviderConfig):
    """AWS provider configuration with automatic environment variable support."""

    model_config = SettingsConfigDict(
        env_prefix='ORB_AWS_',
        case_sensitive=False,
        populate_by_name=True,
        env_nested_delimiter='__'
    )

    # Authentication - automatically mapped to ORB_AWS_* env vars
    region: str = Field("us-east-1", description="AWS region")
    profile: Optional[str] = Field(None, description="AWS profile")
    role_arn: Optional[str] = Field(None, description="AWS role ARN")
    access_key_id: Optional[str] = Field(None, description="AWS access key ID")
    secret_access_key: Optional[str] = Field(None, description="AWS secret access key")

    # Service settings
    aws_max_retries: int = Field(3, description="Maximum retries for AWS API calls")
    aws_read_timeout: int = Field(30, description="Read timeout in seconds")

    # Complex nested objects support JSON environment variables
    handlers: HandlersConfig = Field(default_factory=HandlersConfig)
    launch_template: LaunchTemplateConfiguration = Field(default_factory=LaunchTemplateConfiguration)

Environment Variable Support

All provider configuration fields automatically support environment variable overrides:

# Basic configuration
export ORB_AWS_REGION="us-west-2"
export ORB_AWS_PROFILE="production"
export ORB_AWS_ROLE_ARN="arn:aws:iam::123456789012:role/OrbitRole"

# Service settings
export ORB_AWS_AWS_MAX_RETRIES=5
export ORB_AWS_AWS_READ_TIMEOUT=60

# Complex nested objects (JSON format)
export ORB_AWS_HANDLERS='{"ec2_fleet": true, "spot_fleet": false}'
export ORB_AWS_LAUNCH_TEMPLATE='{"create_per_request": false, "reuse_existing": true}'

# Nested delimiter support
export ORB_AWS_HANDLERS__EC2_FLEET=true
export ORB_AWS_HANDLERS__SPOT_FLEET=false

Provider Settings Registry

The provider settings registry manages BaseSettings classes for automatic environment variable loading:

from orb.config.schemas.provider_settings_registry import ProviderSettingsRegistry

# Register provider settings class
ProviderSettingsRegistry.register_provider_settings("aws", AWSProviderConfig)

# Create settings instance with automatic env var loading
settings = ProviderSettingsRegistry.create_settings("aws", config_dict)

Provider Configuration

AWS Provider Configuration

{
  "provider": {
    "type": "aws",
    "aws": {
      "region": "us-east-1",
      "profile": "default",
      "handlers": {
        "preferred_order": ["EC2Fleet", "ASG", "SpotFleet", "RunInstances"],
        "ec2_fleet": {
          "enabled": true,
          "default_type": "instant",
          "max_spot_percentage": 50
        },
        "asg": {
          "enabled": true,
          "default_min_size": 0,
          "default_max_size": 100,
          "health_check_type": "EC2"
        },
        "spot_fleet": {
          "enabled": true,
          "allocation_strategy": "lowestPrice",
          "target_capacity": 10
        },
        "run_instances": {
          "enabled": true,
          "max_instances_per_call": 20
        }
      }
    }
  }
}

Provider Best Practices

Design Principles

  1. Provider Agnostic Domain: Keep domain layer free of cloud-specific code
  2. Interface Segregation: Define focused interfaces for specific capabilities
  3. Error Handling: Implement comprehensive error handling and classification
  4. Retry Logic: Handle transient errors with appropriate retry strategies
  5. Resource Cleanup: Ensure appropriate cleanup of cloud resources

Implementation Guidelines

  1. Use Factory Pattern: Create providers through factory for flexibility
  2. Configuration Validation: Validate provider configuration on startup
  3. Health Monitoring: Implement health checks for provider connectivity
  4. Logging: Add comprehensive logging for debugging and monitoring
  5. Testing: Create mock providers for testing without cloud resources

Performance Considerations

  1. Connection Pooling: Reuse connections to cloud APIs
  2. Batch Operations: Group similar operations for efficiency
  3. Caching: Cache frequently accessed data (templates, metadata)
  4. Async Operations: Use asynchronous operations where possible
  5. Rate Limiting: Respect cloud provider rate limits

Next Steps