Skip to content

Strategy Pattern Implementation

This document describes the implementation of the Strategy pattern in the Open Host Factory Plugin, which enables pluggable provider implementations and runtime behavior selection.

Strategy Pattern Overview

The Strategy pattern allows the plugin to:

  • Encapsulate algorithms: Different provider implementations as separate strategies
  • Runtime selection: Choose provider strategy based on configuration
  • Easy extension: Add new providers without modifying existing code
  • Consistent interface: All providers implement the same contract

Provider Strategy Implementation

Base Strategy Interface

The core provider strategy interface defines the contract for all provider implementations:

# src/providers/base/strategy/provider_strategy.py
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from src.domain.request.aggregate import Request
from src.domain.machine.aggregate import Machine
from src.domain.template.aggregate import Template

class ProviderStrategy(ABC):
    """Abstract base class for provider strategies."""

    @abstractmethod
    async def provision_instances(self, request: Request) -> List[Machine]:
        """Provision compute instances based on request."""
        pass

    @abstractmethod
    async def terminate_instances(self, instance_ids: List[str]) -> bool:
        """Terminate specified compute instances."""
        pass

    @abstractmethod
    async def get_instance_status(self, instance_ids: List[str]) -> Dict[str, str]:
        """Get current status of specified instances."""
        pass

    @abstractmethod
    async def validate_template(self, template: Template) -> bool:
        """Validate template configuration for this provider."""
        pass

    @abstractmethod
    async def get_available_templates(self) -> List[Template]:
        """Get list of available templates for this provider."""
        pass

    @abstractmethod
    def get_provider_info(self) -> Dict[str, Any]:
        """Get provider-specific information."""
        pass

    @abstractmethod
    async def health_check(self) -> bool:
        """Check provider health and connectivity."""
        pass

AWS Provider Strategy

The AWS provider strategy implements the provider interface for Amazon Web Services:

# src/providers/aws/strategy/aws_provider_strategy.py
from src.domain.base.dependency_injection import injectable
from src.domain.base.ports import LoggingPort
from src.providers.base.strategy.provider_strategy import ProviderStrategy
from src.providers.aws.configuration.config import AWSProviderConfig
from src.providers.aws.infrastructure.aws_client import AWSClient
from src.providers.aws.infrastructure.aws_handler_factory import AWSHandlerFactory

@injectable
class AWSProviderStrategy(ProviderStrategy):
    """AWS implementation of the provider strategy."""

    def __init__(self, config: AWSProviderConfig, logger: LoggingPort):
        self._config = config
        self._logger = logger
        self._aws_client = AWSClient(config, logger)
        self._handler_factory = AWSHandlerFactory(self._aws_client, logger, config)
        self._initialized = False

    async def provision_instances(self, request: Request) -> List[Machine]:
        """Provision AWS compute instances."""
        self._logger.info(f"Provisioning instances for request {request.id}")

        # Get appropriate handler based on template configuration
        handler = self._handler_factory.create_handler_for_template(request.template)

        # Provision instances using handler
        machines = await handler.provision_instances(request)

        self._logger.info(f"Provisioned {len(machines)} instances")
        return machines

    async def terminate_instances(self, instance_ids: List[str]) -> bool:
        """Terminate AWS compute instances."""
        self._logger.info(f"Terminating {len(instance_ids)} instances")

        try:
            ec2_client = self._aws_client.get_client('ec2')
            response = ec2_client.terminate_instances(InstanceIds=instance_ids)

            # Check termination status
            terminating_instances = response.get('TerminatingInstances', [])
            success = len(terminating_instances) == len(instance_ids)

            if success:
                self._logger.info("All instances terminated successfully")
            else:
                self._logger.warning("Some instances failed to terminate")

            return success

        except Exception as e:
            self._logger.error(f"Error terminating instances: {e}")
            return False

    async def get_instance_status(self, instance_ids: List[str]) -> Dict[str, str]:
        """Get AWS instance status."""
        self._logger.info(f"Getting status for {len(instance_ids)} instances")

        try:
            ec2_client = self._aws_client.get_client('ec2')
            response = ec2_client.describe_instances(InstanceIds=instance_ids)

            status_map = {}
            for reservation in response['Reservations']:
                for instance in reservation['Instances']:
                    instance_id = instance['InstanceId']
                    state = instance['State']['Name']
                    status_map[instance_id] = state

            return status_map

        except Exception as e:
            self._logger.error(f"Error getting instance status: {e}")
            return {}

    async def validate_template(self, template: Template) -> bool:
        """Validate AWS template configuration."""
        self._logger.info(f"Validating template {template.template_id}")

        # Use template adapter for validation
        template_adapter = self._get_template_adapter()
        return await template_adapter.validate_template(template.to_dict())

    async def get_available_templates(self) -> List[Template]:
        """Get available AWS templates."""
        self._logger.info("Retrieving available AWS templates")

        # Implementation would retrieve templates from configuration
        # or discover them from AWS resources
        templates = []

        # Example template configurations
        default_templates = self._config.get_default_templates()
        for template_config in default_templates:
            template = Template.from_dict(template_config)
            templates.append(template)

        return templates

    def get_provider_info(self) -> Dict[str, Any]:
        """Get AWS provider information."""
        return {
            "provider_type": "aws",
            "region": self._config.region,
            "profile": self._config.profile,
            "initialized": self._initialized,
            "supported_handlers": [
                "ec2_fleet",
                "spot_fleet", 
                "auto_scaling_group",
                "run_instances"
            ]
        }

    async def health_check(self) -> bool:
        """Check AWS provider health."""
        try:
            # Test AWS connectivity
            sts_client = self._aws_client.get_client('sts')
            response = sts_client.get_caller_identity()

            self._logger.info(f"AWS health check passed for account {response.get('Account')}")
            return True

        except Exception as e:
            self._logger.error(f"AWS health check failed: {e}")
            return False

Provider Context (Strategy Manager)

The provider context manages strategy selection and execution:

# src/providers/base/strategy/provider_context.py
from typing import Dict, List, Optional, Any
from src.domain.base.ports import LoggingPort
from src.providers.base.strategy.provider_strategy import ProviderStrategy

class ProviderContext:
    """Context for managing provider strategies."""

    def __init__(self, logger: LoggingPort):
        self._logger = logger
        self._strategies: Dict[str, ProviderStrategy] = {}
        self._current_strategy: Optional[str] = None
        self._default_strategy: Optional[str] = None

    def register_strategy(self, name: str, strategy: ProviderStrategy) -> None:
        """Register a provider strategy."""
        self._strategies[name] = strategy
        self._logger.info(f"Registered provider strategy: {name}")

        # Set as default if first strategy
        if self._default_strategy is None:
            self._default_strategy = name
            self._current_strategy = name

    def set_current_strategy(self, name: str) -> bool:
        """Set the current active strategy."""
        if name not in self._strategies:
            self._logger.error(f"Strategy not found: {name}")
            return False

        self._current_strategy = name
        self._logger.info(f"Switched to strategy: {name}")
        return True

    def get_current_strategy(self) -> Optional[ProviderStrategy]:
        """Get the current active strategy."""
        if self._current_strategy is None:
            return None

        return self._strategies.get(self._current_strategy)

    def get_available_strategies(self) -> List[str]:
        """Get list of available strategy names."""
        return list(self._strategies.keys())

    async def provision_instances(self, request: Request) -> List[Machine]:
        """Provision instances using current strategy."""
        strategy = self.get_current_strategy()
        if strategy is None:
            raise RuntimeError("No provider strategy available")

        return await strategy.provision_instances(request)

    async def terminate_instances(self, instance_ids: List[str]) -> bool:
        """Terminate instances using current strategy."""
        strategy = self.get_current_strategy()
        if strategy is None:
            raise RuntimeError("No provider strategy available")

        return await strategy.terminate_instances(instance_ids)

    def get_provider_info(self) -> Dict[str, Any]:
        """Get information about current provider."""
        strategy = self.get_current_strategy()
        if strategy is None:
            return {"error": "No provider strategy available"}

        info = strategy.get_provider_info()
        info["current_strategy"] = self._current_strategy
        info["available_strategies"] = self.get_available_strategies()

        return info

Strategy Factory Pattern

The strategy factory creates appropriate strategies based on configuration:

# src/infrastructure/factories/provider_strategy_factory.py
from src.domain.base.ports import LoggingPort, ConfigurationPort
from src.providers.base.strategy.provider_strategy import ProviderStrategy
from src.providers.aws.strategy.aws_provider_strategy import AWSProviderStrategy
from src.providers.aws.configuration.config import AWSProviderConfig

class ProviderStrategyFactory:
    """Factory for creating provider strategies."""

    def __init__(self, 
                 config_manager: ConfigurationPort,
                 logger: LoggingPort):
        self._config_manager = config_manager
        self._logger = logger

    def create_strategy(self, provider_type: str) -> ProviderStrategy:
        """Create provider strategy based on type."""
        self._logger.info(f"Creating provider strategy: {provider_type}")

        if provider_type.lower() == "aws":
            return self._create_aws_strategy()
        else:
            raise ValueError(f"Unsupported provider type: {provider_type}")

    def _create_aws_strategy(self) -> AWSProviderStrategy:
        """Create AWS provider strategy."""
        # Get AWS configuration
        aws_config_data = self._config_manager.get_section("aws")
        aws_config = AWSProviderConfig(**aws_config_data)

        # Create strategy
        strategy = AWSProviderStrategy(
            config=aws_config,
            logger=self._logger
        )

        return strategy

    def get_available_providers(self) -> List[str]:
        """Get list of available provider types."""
        return ["aws"]  # Can be extended for other providers

Configuration-Driven Strategy Selection

Strategies are selected based on configuration:

# config/providers.yml
providers:
  - name: aws-primary
    type: aws
    config:
      region: us-east-1
      profile: default
      handlers:
        default: ec2_fleet
        ec2_fleet:
          enabled: true
        spot_fleet:
          enabled: true
          max_spot_price: 0.10
        auto_scaling_group:
          enabled: true
          min_size: 1
          max_size: 10
# Strategy registration based on configuration
def register_provider_strategies(container: DIContainer) -> None:
    """Register provider strategies based on configuration."""
    config = container.get(ConfigurationPort)
    logger = container.get(LoggingPort)

    # Get provider configuration
    provider_configs = config.get("providers", [])

    # Create provider context
    provider_context = ProviderContext(logger)

    # Register strategies
    strategy_factory = ProviderStrategyFactory(config, logger)

    for provider_config in provider_configs:
        provider_name = provider_config["name"]
        provider_type = provider_config["type"]

        # Create strategy
        strategy = strategy_factory.create_strategy(provider_type)

        # Register with context
        provider_context.register_strategy(provider_name, strategy)

    # Register context in container
    container.register_instance(ProviderContext, provider_context)

Handler Strategy Pattern (Sub-strategies)

Within the AWS provider, different handlers implement sub-strategies for different provisioning methods:

Handler Strategy Interface

# src/providers/aws/infrastructure/handlers/base_handler.py
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from src.domain.request.aggregate import Request
from src.domain.machine.aggregate import Machine

@injectable
class AWSHandler(ABC):
    """Abstract base class for AWS handlers."""

    def __init__(self, 
                 aws_client: AWSClient,
                 logger: LoggingPort,
                 config: ConfigurationPort):
        self._aws_client = aws_client
        self._logger = logger
        self._config = config

    @abstractmethod
    async def provision_instances(self, request: Request) -> List[Machine]:
        """Provision instances using this handler."""
        pass

    @abstractmethod
    async def terminate_instances(self, instance_ids: List[str]) -> bool:
        """Terminate instances using this handler."""
        pass

    @abstractmethod
    def get_handler_info(self) -> Dict[str, Any]:
        """Get handler-specific information."""
        pass

Concrete Handler Strategies

# src/providers/aws/infrastructure/handlers/ec2_fleet_handler.py
@injectable
class EC2FleetHandler(AWSHandler):
    """Handler for EC2 Fleet provisioning."""

    async def provision_instances(self, request: Request) -> List[Machine]:
        """Provision instances using EC2 Fleet."""
        self._logger.info(f"Provisioning instances using EC2 Fleet")

        # Build fleet configuration
        fleet_config = self._build_fleet_config(request)

        # Create fleet
        ec2_client = self._aws_client.get_client('ec2')
        response = ec2_client.create_fleet(**fleet_config)

        # Process response and create machine objects
        machines = self._process_fleet_response(response, request)

        return machines

# src/providers/aws/infrastructure/handlers/spot_fleet_handler.py
@injectable
class SpotFleetHandler(AWSHandler):
    """Handler for Spot Fleet provisioning."""

    async def provision_instances(self, request: Request) -> List[Machine]:
        """Provision instances using Spot Fleet."""
        self._logger.info(f"Provisioning instances using Spot Fleet")

        # Build spot fleet configuration
        spot_config = self._build_spot_fleet_config(request)

        # Create spot fleet
        ec2_client = self._aws_client.get_client('ec2')
        response = ec2_client.request_spot_fleet(**spot_config)

        # Process response and create machine objects
        machines = self._process_spot_fleet_response(response, request)

        return machines

Handler Factory (Strategy Factory)

# src/providers/aws/infrastructure/aws_handler_factory.py
@injectable
class AWSHandlerFactory:
    """Factory for creating AWS handlers based on strategy."""

    def __init__(self,
                 aws_client: AWSClient,
                 logger: LoggingPort,
                 config: ConfigurationPort):
        self._aws_client = aws_client
        self._logger = logger
        self._config = config
        self._handlers: Dict[str, Type[AWSHandler]] = {}
        self._register_handlers()

    def _register_handlers(self):
        """Register available handler types."""
        from .handlers.ec2_fleet_handler import EC2FleetHandler
        from .handlers.spot_fleet_handler import SpotFleetHandler
        from .handlers.asg_handler import ASGHandler
        from .handlers.run_instances_handler import RunInstancesHandler

        self._handlers = {
            "ec2_fleet": EC2FleetHandler,
            "spot_fleet": SpotFleetHandler,
            "auto_scaling_group": ASGHandler,
            "run_instances": RunInstancesHandler
        }

    def create_handler(self, handler_type: str) -> AWSHandler:
        """Create handler based on type."""
        if handler_type not in self._handlers:
            raise ValueError(f"Unknown handler type: {handler_type}")

        handler_class = self._handlers[handler_type]
        return handler_class(self._aws_client, self._logger, self._config)

    def create_handler_for_template(self, template: Template) -> AWSHandler:
        """Create appropriate handler based on template configuration."""
        # Determine handler type from template
        handler_type = self._determine_handler_type(template)
        return self.create_handler(handler_type)

    def _determine_handler_type(self, template: Template) -> str:
        """Determine appropriate handler type for template."""
        # Logic to determine handler based on template attributes
        attributes = template.attributes

        if attributes.get("use_spot_instances", False):
            return "spot_fleet"
        elif attributes.get("use_auto_scaling", False):
            return "auto_scaling_group"
        elif attributes.get("use_fleet", True):
            return "ec2_fleet"
        else:
            return "run_instances"

Benefits of Strategy Pattern Implementation

Extensibility

  • Easy to add new cloud providers
  • New provisioning methods can be added as strategies
  • Configuration-driven strategy selection

Maintainability

  • Each strategy is self-contained
  • Clear separation of provider-specific logic
  • Easy to modify individual strategies

Testability

  • Strategies can be tested independently
  • Easy mocking of specific strategies
  • Integration testing with different strategies

Runtime Flexibility

  • Strategy selection based on configuration
  • Dynamic strategy switching possible
  • Multiple strategies can coexist

This Strategy pattern implementation provides a flexible and extensible foundation for supporting multiple cloud providers and provisioning methods in the Open Host Factory Plugin.