Ports and Adapters Pattern Implementation¶
This document describes the implementation of the Ports and Adapters pattern (also known as Hexagonal Architecture) in the Open Host Factory Plugin, which enables clean separation between business logic and external concerns.
Ports and Adapters Overview¶
The Ports and Adapters pattern isolates the core business logic from external dependencies by:
- Ports: Abstract interfaces defining contracts for external interactions
- Adapters: Concrete implementations of ports for specific technologies
- Inversion of Control: Business logic depends on abstractions, not implementations
- Testability: Easy mocking and testing through interface substitution
Port Definitions¶
Ports are abstract interfaces defined in the domain layer that specify contracts for external interactions.
Core Ports¶
Logging Port¶
# src/domain/base/ports/logging_port.py
from abc import ABC, abstractmethod
class LoggingPort(ABC):
"""Abstract interface for logging operations."""
@abstractmethod
def info(self, message: str, **kwargs) -> None:
"""Log informational message."""
pass
@abstractmethod
def error(self, message: str, **kwargs) -> None:
"""Log error message."""
pass
@abstractmethod
def warning(self, message: str, **kwargs) -> None:
"""Log warning message."""
pass
@abstractmethod
def debug(self, message: str, **kwargs) -> None:
"""Log debug message."""
pass
Configuration Port¶
# src/domain/base/ports/configuration_port.py
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
class ConfigurationPort(ABC):
"""Abstract interface for configuration access."""
@abstractmethod
def get(self, key: str, default: Any = None) -> Any:
"""Get configuration value by key."""
pass
@abstractmethod
def get_section(self, section: str) -> Dict[str, Any]:
"""Get entire configuration section."""
pass
@abstractmethod
def has_key(self, key: str) -> bool:
"""Check if configuration key exists."""
pass
@abstractmethod
def get_provider_config(self) -> Dict[str, Any]:
"""Get provider-specific configuration."""
pass
Container Port¶
# src/domain/base/ports/container_port.py
from abc import ABC, abstractmethod
from typing import Type, TypeVar, Any
T = TypeVar('T')
class ContainerPort(ABC):
"""Abstract interface for dependency injection container."""
@abstractmethod
def get(self, interface: Type[T]) -> T:
"""Resolve dependency by interface type."""
pass
@abstractmethod
def register_singleton(self, interface: Type, implementation: Type) -> None:
"""Register singleton service."""
pass
@abstractmethod
def register_transient(self, interface: Type, implementation: Type) -> None:
"""Register transient service."""
pass
Domain-Specific Ports¶
Template Repository Port¶
# src/domain/template/repository.py
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
from .aggregate import Template
class TemplateRepository(ABC):
"""Abstract interface for template data access."""
@abstractmethod
async def get_all(self,
filters: Optional[Dict[str, Any]] = None,
limit: Optional[int] = None,
offset: Optional[int] = None) -> List[Template]:
"""Retrieve all templates with optional filtering."""
pass
@abstractmethod
async def get_by_id(self, template_id: str) -> Optional[Template]:
"""Retrieve template by ID."""
pass
@abstractmethod
async def save(self, template: Template) -> None:
"""Save template."""
pass
@abstractmethod
async def delete(self, template_id: str) -> bool:
"""Delete template by ID."""
pass
Provider Strategy Port¶
# src/domain/base/provider_interfaces.py
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from src.domain.request.aggregate import Request
from src.domain.machine.aggregate import Machine
class ProviderStrategy(ABC):
"""Abstract interface for cloud provider strategies."""
@abstractmethod
async def provision_instances(self, request: Request) -> List[Machine]:
"""Provision compute instances."""
pass
@abstractmethod
async def terminate_instances(self, instance_ids: List[str]) -> bool:
"""Terminate compute instances."""
pass
@abstractmethod
async def get_instance_status(self, instance_ids: List[str]) -> Dict[str, str]:
"""Get status of compute instances."""
pass
@abstractmethod
async def validate_template(self, template_config: Dict[str, Any]) -> bool:
"""Validate template configuration."""
pass
Adapter Implementations¶
Adapters are concrete implementations of ports that handle specific technologies or external systems.
Infrastructure Adapters¶
Logging Adapter¶
# src/infrastructure/adapters/logging_adapter.py
from src.domain.base.ports import LoggingPort
from src.infrastructure.logging.logger import get_logger
import logging
class LoggingAdapter(LoggingPort):
"""Concrete logging implementation using Python logging."""
def __init__(self, name: str = __name__):
self._logger = get_logger(name)
def info(self, message: str, **kwargs) -> None:
"""Log informational message."""
self._logger.info(message, extra=kwargs)
def error(self, message: str, **kwargs) -> None:
"""Log error message."""
self._logger.error(message, extra=kwargs)
def warning(self, message: str, **kwargs) -> None:
"""Log warning message."""
self._logger.warning(message, extra=kwargs)
def debug(self, message: str, **kwargs) -> None:
"""Log debug message."""
self._logger.debug(message, extra=kwargs)
Configuration Adapter¶
# src/infrastructure/adapters/configuration_adapter.py
from src.domain.base.ports import ConfigurationPort
from src.config.manager import ConfigurationManager
from typing import Any, Dict
class ConfigurationAdapter(ConfigurationPort):
"""Concrete configuration implementation using ConfigurationManager."""
def __init__(self, config_manager: ConfigurationManager):
self._config_manager = config_manager
def get(self, key: str, default: Any = None) -> Any:
"""Get configuration value by key."""
return self._config_manager.get(key, default)
def get_section(self, section: str) -> Dict[str, Any]:
"""Get entire configuration section."""
return self._config_manager.get_section(section)
def has_key(self, key: str) -> bool:
"""Check if configuration key exists."""
return self._config_manager.has_key(key)
def get_provider_config(self) -> Dict[str, Any]:
"""Get provider-specific configuration."""
return self._config_manager.get_provider_config()
Container Adapter¶
# src/infrastructure/adapters/container_adapter.py
from src.domain.base.ports import ContainerPort
from src.infrastructure.di.container import DIContainer
from typing import Type, TypeVar
T = TypeVar('T')
class ContainerAdapter(ContainerPort):
"""Concrete container implementation using DIContainer."""
def __init__(self, di_container: DIContainer):
self._container = di_container
def get(self, interface: Type[T]) -> T:
"""Resolve dependency by interface type."""
return self._container.get(interface)
def register_singleton(self, interface: Type, implementation: Type) -> None:
"""Register singleton service."""
self._container.register_singleton(interface, implementation)
def register_transient(self, interface: Type, implementation: Type) -> None:
"""Register transient service."""
self._container.register_transient(interface, implementation)
Provider Adapters¶
AWS Template Adapter¶
# src/providers/aws/infrastructure/adapters/template_adapter.py
from src.domain.base.dependency_injection import injectable
from src.domain.base.ports import LoggingPort, ConfigurationPort
from src.providers.aws.infrastructure.aws_client import AWSClient
from typing import Dict, Any, List
@injectable
class AWSTemplateAdapter:
"""AWS-specific template operations adapter."""
def __init__(self,
aws_client: AWSClient,
logger: LoggingPort,
config: ConfigurationPort):
self._aws_client = aws_client
self._logger = logger
self._config = config
async def validate_template(self, template_config: Dict[str, Any]) -> bool:
"""Validate AWS-specific template configuration."""
self._logger.info(f"Validating AWS template configuration")
# AWS-specific validation logic
required_fields = ['image_id', 'vm_type', 'subnet_ids']
for field in required_fields:
if field not in template_config:
self._logger.error(f"Missing required field: {field}")
return False
# Validate AMI exists
ami_id = template_config.get('image_id')
if not await self._validate_ami_exists(ami_id):
return False
return True
async def _validate_ami_exists(self, ami_id: str) -> bool:
"""Validate that AMI exists in AWS."""
try:
ec2_client = self._aws_client.get_client('ec2')
response = ec2_client.describe_images(ImageIds=[ami_id])
return len(response['Images']) > 0
except Exception as e:
self._logger.error(f"Error validating AMI {ami_id}: {e}")
return False
AWS Provider Strategy Adapter¶
# src/providers/aws/strategy/aws_provider_strategy.py
from src.domain.base.dependency_injection import injectable
from src.domain.base.ports import LoggingPort
from src.domain.base.provider_interfaces import ProviderStrategy
from src.providers.aws.configuration.config import AWSProviderConfig
from src.providers.aws.infrastructure.aws_client import AWSClient
from typing import List, Dict, Any
@injectable
class AWSProviderStrategy(ProviderStrategy):
"""AWS implementation of provider strategy."""
def __init__(self,
config: AWSProviderConfig,
logger: LoggingPort):
self._config = config
self._logger = logger
self._aws_client = AWSClient(config, logger)
async def provision_instances(self, request: Request) -> List[Machine]:
"""Provision AWS compute instances."""
self._logger.info(f"Provisioning {request.max_number} instances for template {request.template_id}")
# AWS-specific provisioning logic
ec2_client = self._aws_client.get_client('ec2')
# Build launch parameters
launch_params = self._build_launch_parameters(request)
# Launch instances
response = ec2_client.run_instances(**launch_params)
# Convert to domain objects
machines = []
for instance in response['Instances']:
machine = Machine(
machine_id=instance['InstanceId'],
instance_type=instance['InstanceType'],
status=instance['State']['Name'],
request_id=request.id
)
machines.append(machine)
self._logger.info(f"Provisioned {len(machines)} instances")
return machines
def _build_launch_parameters(self, request: Request) -> Dict[str, Any]:
"""Build AWS-specific launch parameters."""
# Implementation details for AWS parameter building
pass
Repository Adapters¶
DynamoDB Template Repository¶
# src/infrastructure/persistence/dynamodb/template_repository.py
from src.domain.template.repository import TemplateRepository
from src.domain.template.aggregate import Template
from src.domain.base.ports import LoggingPort
from typing import List, Optional, Dict, Any
import boto3
class DynamoDBTemplateRepository(TemplateRepository):
"""DynamoDB implementation of template repository."""
def __init__(self,
table_name: str,
region: str,
logger: LoggingPort):
self._table_name = table_name
self._region = region
self._logger = logger
self._dynamodb = boto3.resource('dynamodb', region_name=region)
self._table = self._dynamodb.Table(table_name)
async def get_all(self,
filters: Optional[Dict[str, Any]] = None,
limit: Optional[int] = None,
offset: Optional[int] = None) -> List[Template]:
"""Retrieve all templates from DynamoDB."""
self._logger.info("Retrieving templates from DynamoDB")
try:
# Build scan parameters
scan_params = {}
if limit:
scan_params['Limit'] = limit
# Apply filters if provided
if filters:
scan_params.update(self._build_filter_expression(filters))
# Perform scan
response = self._table.scan(**scan_params)
# Convert to domain objects
templates = []
for item in response['Items']:
template = self._item_to_template(item)
templates.append(template)
self._logger.info(f"Retrieved {len(templates)} templates")
return templates
except Exception as e:
self._logger.error(f"Error retrieving templates: {e}")
raise
async def get_by_id(self, template_id: str) -> Optional[Template]:
"""Retrieve template by ID from DynamoDB."""
self._logger.info(f"Retrieving template {template_id}")
try:
response = self._table.get_item(Key={'template_id': template_id})
if 'Item' not in response:
return None
template = self._item_to_template(response['Item'])
return template
except Exception as e:
self._logger.error(f"Error retrieving template {template_id}: {e}")
raise
def _item_to_template(self, item: Dict[str, Any]) -> Template:
"""Convert DynamoDB item to Template domain object."""
return Template(
template_id=item['template_id'],
max_number=item['max_number'],
attributes=item.get('attributes', {})
)
Benefits of Ports and Adapters¶
Technology Independence¶
Business Logic Isolation - Domain logic doesn't depend on specific technologies - Can switch from DynamoDB to PostgreSQL without changing business rules - Can switch from AWS to Azure without changing core functionality
Framework Flexibility - Infrastructure can use different frameworks - Easy migration between technologies - Reduced vendor lock-in
Testability¶
Unit Testing with Mocks
def test_application_service():
"""Test application service with mocked ports."""
# Create mocks
mock_logger = Mock(spec=LoggingPort)
mock_config = Mock(spec=ConfigurationPort)
mock_template_repo = Mock(spec=TemplateRepository)
# Create service with mocked dependencies
service = ApplicationService(
logger=mock_logger,
config=mock_config,
template_repo=mock_template_repo
)
# Test behavior
# Verify interactions with mocks
Integration Testing
def test_with_real_adapters():
"""Test with real adapter implementations."""
# Use real adapters with test configuration
logger = LoggingAdapter("test")
config = ConfigurationAdapter(test_config_manager)
template_repo = InMemoryTemplateRepository()
# Test with real implementations
service = ApplicationService(logger, config, template_repo)
# Test actual behavior
Maintainability¶
Clear Boundaries - Well-defined interfaces between layers - Easy to understand responsibilities - Reduced coupling between components
Easy Extension - New adapters can be added without changing business logic - Multiple implementations of same port - Configuration-driven adapter selection
Configuration-Driven Adapter Selection¶
The system can select adapters based on configuration:
# src/infrastructure/di/adapter_registration.py
def register_adapters(container: DIContainer, config: ConfigurationPort) -> None:
"""Register adapters based on configuration."""
# Storage adapter selection
storage_type = config.get("storage.type", "memory")
if storage_type == "dynamodb":
container.register_singleton(
TemplateRepository,
DynamoDBTemplateRepository
)
elif storage_type == "postgresql":
container.register_singleton(
TemplateRepository,
PostgreSQLTemplateRepository
)
else:
container.register_singleton(
TemplateRepository,
InMemoryTemplateRepository
)
# Provider adapter selection
provider_type = config.get("provider.type", "mock")
if provider_type == "aws":
container.register_singleton(
ProviderStrategy,
AWSProviderStrategy
)
else:
container.register_singleton(
ProviderStrategy,
MockProviderStrategy
)
Error Handling in Adapters¶
Adapters handle technology-specific errors and translate them to domain exceptions:
class DynamoDBTemplateRepository(TemplateRepository):
async def get_by_id(self, template_id: str) -> Optional[Template]:
try:
# DynamoDB operations
pass
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'ResourceNotFoundException':
return None
elif error_code == 'ValidationException':
raise TemplateValidationError(f"Invalid template ID: {template_id}")
else:
raise TemplateRepositoryError(f"DynamoDB error: {e}")
except Exception as e:
raise TemplateRepositoryError(f"Unexpected error: {e}")
Adapter Lifecycle Management¶
Adapters are managed by the dependency injection container:
# Singleton adapters (shared instances)
container.register_singleton(LoggingPort, LoggingAdapter)
container.register_singleton(ConfigurationPort, ConfigurationAdapter)
# Transient adapters (new instance per request)
container.register_transient(TemplateRepository, DynamoDBTemplateRepository)
# Factory-created adapters (complex creation logic)
container.register_factory(
ProviderStrategy,
lambda c: create_provider_strategy(c.get(ConfigurationPort))
)
This Ports and Adapters implementation provides clean separation between business logic and external concerns, enabling high testability, maintainability, and flexibility in the Open Host Factory Plugin.