Skip to content

Repository Pattern Implementation

This document describes the implementation of the Repository pattern in the Open Host Factory Plugin, which provides a clean abstraction layer for data access operations while supporting multiple storage backends.

Repository Pattern Overview

The Repository pattern provides:

  • Data access abstraction: Clean separation between business logic and data persistence
  • Multiple storage backends: Support for different storage technologies
  • Consistent interface: Uniform data access across different storage types
  • Testability: Easy mocking and testing of data operations

Repository Interface Definitions

Repository interfaces are defined in the domain layer and implemented in the infrastructure layer.

Base Repository Interface

# src/domain/base/repository.py
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any, TypeVar, Generic

T = TypeVar('T')

class Repository(ABC, Generic[T]):
    """Base repository interface for all domain entities."""

    @abstractmethod
    async def get_by_id(self, entity_id: str) -> Optional[T]:
        """Retrieve entity by ID."""
        pass

    @abstractmethod
    async def get_all(self, 
                     filters: Optional[Dict[str, Any]] = None,
                     limit: Optional[int] = None,
                     offset: Optional[int] = None) -> List[T]:
        """Retrieve all entities with optional filtering."""
        pass

    @abstractmethod
    async def save(self, entity: T) -> None:
        """Save entity."""
        pass

    @abstractmethod
    async def delete(self, entity_id: str) -> bool:
        """Delete entity by ID."""
        pass

    @abstractmethod
    async def exists(self, entity_id: str) -> bool:
        """Check if entity exists."""
        pass

Template Repository Interface

# src/domain/template/repository.py
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
from .aggregate import Template

class TemplateRepository(ABC):
    """Repository interface for template entities."""

    @abstractmethod
    async def get_all(self, 
                     filters: Optional[Dict[str, Any]] = None,
                     limit: Optional[int] = None,
                     offset: Optional[int] = None) -> List[Template]:
        """Retrieve all templates with optional filtering."""
        pass

    @abstractmethod
    async def get_by_id(self, template_id: str) -> Optional[Template]:
        """Retrieve template by ID."""
        pass

    @abstractmethod
    async def save(self, template: Template) -> None:
        """Save template."""
        pass

    @abstractmethod
    async def delete(self, template_id: str) -> bool:
        """Delete template by ID."""
        pass

    @abstractmethod
    async def find_by_attributes(self, attributes: Dict[str, Any]) -> List[Template]:
        """Find templates by specific attributes."""
        pass

    @abstractmethod
    async def get_by_provider_type(self, provider_type: str) -> List[Template]:
        """Get templates for specific provider type."""
        pass

Request Repository Interface

# src/domain/request/repository.py
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
from .aggregate import Request
from .value_objects import RequestStatus

class RequestRepository(ABC):
    """Repository interface for request entities."""

    @abstractmethod
    async def get_all(self, 
                     filters: Optional[Dict[str, Any]] = None,
                     limit: Optional[int] = None,
                     offset: Optional[int] = None) -> List[Request]:
        """Retrieve all requests with optional filtering."""
        pass

    @abstractmethod
    async def get_by_id(self, request_id: str) -> Optional[Request]:
        """Retrieve request by ID."""
        pass

    @abstractmethod
    async def save(self, request: Request) -> None:
        """Save request."""
        pass

    @abstractmethod
    async def delete(self, request_id: str) -> bool:
        """Delete request by ID."""
        pass

    @abstractmethod
    async def get_by_status(self, status: RequestStatus) -> List[Request]:
        """Get requests by status."""
        pass

    @abstractmethod
    async def get_active_requests(self) -> List[Request]:
        """Get all active (non-completed) requests."""
        pass

    @abstractmethod
    async def update_status(self, request_id: str, status: RequestStatus) -> bool:
        """Update request status."""
        pass

Machine Repository Interface

# src/domain/machine/repository.py
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
from .aggregate import Machine
from .machine_status import MachineStatus

class MachineRepository(ABC):
    """Repository interface for machine entities."""

    @abstractmethod
    async def get_all(self, 
                     filters: Optional[Dict[str, Any]] = None,
                     limit: Optional[int] = None,
                     offset: Optional[int] = None) -> List[Machine]:
        """Retrieve all machines with optional filtering."""
        pass

    @abstractmethod
    async def get_by_id(self, machine_id: str) -> Optional[Machine]:
        """Retrieve machine by ID."""
        pass

    @abstractmethod
    async def save(self, machine: Machine) -> None:
        """Save machine."""
        pass

    @abstractmethod
    async def delete(self, machine_id: str) -> bool:
        """Delete machine by ID."""
        pass

    @abstractmethod
    async def get_by_request_id(self, request_id: str) -> List[Machine]:
        """Get machines associated with a request."""
        pass

    @abstractmethod
    async def get_by_status(self, status: MachineStatus) -> List[Machine]:
        """Get machines by status."""
        pass

    @abstractmethod
    async def update_status(self, machine_id: str, status: MachineStatus) -> bool:
        """Update machine status."""
        pass

    @abstractmethod
    async def get_by_instance_ids(self, instance_ids: List[str]) -> List[Machine]:
        """Get machines by cloud provider instance IDs."""
        pass

DynamoDB Repository Implementations

DynamoDB implementations provide scalable, managed NoSQL storage.

DynamoDB Template Repository

# src/infrastructure/persistence/dynamodb/template_repository.py
from typing import List, Optional, Dict, Any
import boto3
from boto3.dynamodb.conditions import Key, Attr
from src.domain.template.repository import TemplateRepository
from src.domain.template.aggregate import Template
from src.domain.base.ports import LoggingPort

class DynamoDBTemplateRepository(TemplateRepository):
    """DynamoDB implementation of template repository."""

    def __init__(self, 
                 table_name: str,
                 region: str,
                 profile: Optional[str] = None,
                 logger: LoggingPort = None):
        self._table_name = table_name
        self._region = region
        self._profile = profile
        self._logger = logger

        # Initialize DynamoDB resources
        session = boto3.Session(profile_name=profile) if profile else boto3.Session()
        self._dynamodb = session.resource('dynamodb', region_name=region)
        self._table = self._dynamodb.Table(table_name)

    async def get_all(self, 
                     filters: Optional[Dict[str, Any]] = None,
                     limit: Optional[int] = None,
                     offset: Optional[int] = None) -> List[Template]:
        """Retrieve all templates from DynamoDB."""
        self._logger.info("Retrieving all templates from DynamoDB")

        try:
            # Build scan parameters
            scan_params = {}

            if limit:
                scan_params['Limit'] = limit

            # Apply filters
            if filters:
                filter_expression = self._build_filter_expression(filters)
                if filter_expression:
                    scan_params['FilterExpression'] = filter_expression

            # Handle pagination
            if offset:
                # DynamoDB uses LastEvaluatedKey for pagination
                # This is a simplified implementation
                scan_params['ExclusiveStartKey'] = {'template_id': str(offset)}

            # Perform scan
            response = self._table.scan(**scan_params)

            # Convert items to domain objects
            templates = []
            for item in response.get('Items', []):
                template = self._item_to_template(item)
                templates.append(template)

            self._logger.info(f"Retrieved {len(templates)} templates")
            return templates

        except Exception as e:
            self._logger.error(f"Error retrieving templates: {e}")
            raise

    async def get_by_id(self, template_id: str) -> Optional[Template]:
        """Retrieve template by ID from DynamoDB."""
        self._logger.info(f"Retrieving template: {template_id}")

        try:
            response = self._table.get_item(
                Key={'template_id': template_id}
            )

            if 'Item' not in response:
                self._logger.info(f"Template not found: {template_id}")
                return None

            template = self._item_to_template(response['Item'])
            self._logger.info(f"Retrieved template: {template_id}")
            return template

        except Exception as e:
            self._logger.error(f"Error retrieving template {template_id}: {e}")
            raise

    async def save(self, template: Template) -> None:
        """Save template to DynamoDB."""
        self._logger.info(f"Saving template: {template.template_id}")

        try:
            item = self._template_to_item(template)

            self._table.put_item(Item=item)

            self._logger.info(f"Saved template: {template.template_id}")

        except Exception as e:
            self._logger.error(f"Error saving template {template.template_id}: {e}")
            raise

    async def delete(self, template_id: str) -> bool:
        """Delete template from DynamoDB."""
        self._logger.info(f"Deleting template: {template_id}")

        try:
            response = self._table.delete_item(
                Key={'template_id': template_id},
                ReturnValues='ALL_OLD'
            )

            deleted = 'Attributes' in response

            if deleted:
                self._logger.info(f"Deleted template: {template_id}")
            else:
                self._logger.warning(f"Template not found for deletion: {template_id}")

            return deleted

        except Exception as e:
            self._logger.error(f"Error deleting template {template_id}: {e}")
            raise

    async def find_by_attributes(self, attributes: Dict[str, Any]) -> List[Template]:
        """Find templates by specific attributes."""
        self._logger.info(f"Finding templates by attributes: {attributes}")

        try:
            # Build filter expression for attributes
            filter_expressions = []
            for key, value in attributes.items():
                filter_expressions.append(Attr(f'attributes.{key}').eq(value))

            if not filter_expressions:
                return []

            # Combine filter expressions
            filter_expression = filter_expressions[0]
            for expr in filter_expressions[1:]:
                filter_expression = filter_expression & expr

            # Perform scan with filter
            response = self._table.scan(FilterExpression=filter_expression)

            # Convert to domain objects
            templates = []
            for item in response.get('Items', []):
                template = self._item_to_template(item)
                templates.append(template)

            self._logger.info(f"Found {len(templates)} templates matching attributes")
            return templates

        except Exception as e:
            self._logger.error(f"Error finding templates by attributes: {e}")
            raise

    async def get_by_provider_type(self, provider_type: str) -> List[Template]:
        """Get templates for specific provider type."""
        return await self.find_by_attributes({'provider_type': provider_type})

    def _item_to_template(self, item: Dict[str, Any]) -> Template:
        """Convert DynamoDB item to Template domain object."""
        return Template(
            template_id=item['template_id'],
            max_number=int(item['max_number']),
            attributes=item.get('attributes', {})
        )

    def _template_to_item(self, template: Template) -> Dict[str, Any]:
        """Convert Template domain object to DynamoDB item."""
        return {
            'template_id': template.template_id,
            'max_number': template.max_number,
            'attributes': template.attributes
        }

    def _build_filter_expression(self, filters: Dict[str, Any]):
        """Build DynamoDB filter expression from filters."""
        if not filters:
            return None

        filter_expressions = []
        for key, value in filters.items():
            if key == 'max_number_gte':
                filter_expressions.append(Attr('max_number').gte(value))
            elif key == 'max_number_lte':
                filter_expressions.append(Attr('max_number').lte(value))
            else:
                filter_expressions.append(Attr(key).eq(value))

        if not filter_expressions:
            return None

        # Combine expressions with AND
        result = filter_expressions[0]
        for expr in filter_expressions[1:]:
            result = result & expr

        return result

In-Memory Repository Implementations

In-memory implementations provide fast access for development and testing.

In-Memory Template Repository

# src/infrastructure/persistence/memory/template_repository.py
from typing import List, Optional, Dict, Any
from src.domain.template.repository import TemplateRepository
from src.domain.template.aggregate import Template
from src.domain.base.ports import LoggingPort

class InMemoryTemplateRepository(TemplateRepository):
    """In-memory implementation of template repository."""

    def __init__(self, logger: LoggingPort = None):
        self._templates: Dict[str, Template] = {}
        self._logger = logger

    async def get_all(self, 
                     filters: Optional[Dict[str, Any]] = None,
                     limit: Optional[int] = None,
                     offset: Optional[int] = None) -> List[Template]:
        """Retrieve all templates from memory."""
        self._logger.info("Retrieving all templates from memory")

        templates = list(self._templates.values())

        # Apply filters
        if filters:
            templates = self._apply_filters(templates, filters)

        # Apply pagination
        if offset:
            templates = templates[offset:]

        if limit:
            templates = templates[:limit]

        self._logger.info(f"Retrieved {len(templates)} templates")
        return templates

    async def get_by_id(self, template_id: str) -> Optional[Template]:
        """Retrieve template by ID from memory."""
        self._logger.info(f"Retrieving template: {template_id}")

        template = self._templates.get(template_id)

        if template:
            self._logger.info(f"Retrieved template: {template_id}")
        else:
            self._logger.info(f"Template not found: {template_id}")

        return template

    async def save(self, template: Template) -> None:
        """Save template to memory."""
        self._logger.info(f"Saving template: {template.template_id}")

        self._templates[template.template_id] = template

        self._logger.info(f"Saved template: {template.template_id}")

    async def delete(self, template_id: str) -> bool:
        """Delete template from memory."""
        self._logger.info(f"Deleting template: {template_id}")

        if template_id in self._templates:
            del self._templates[template_id]
            self._logger.info(f"Deleted template: {template_id}")
            return True
        else:
            self._logger.warning(f"Template not found for deletion: {template_id}")
            return False

    async def find_by_attributes(self, attributes: Dict[str, Any]) -> List[Template]:
        """Find templates by specific attributes."""
        self._logger.info(f"Finding templates by attributes: {attributes}")

        matching_templates = []

        for template in self._templates.values():
            if self._template_matches_attributes(template, attributes):
                matching_templates.append(template)

        self._logger.info(f"Found {len(matching_templates)} templates matching attributes")
        return matching_templates

    async def get_by_provider_type(self, provider_type: str) -> List[Template]:
        """Get templates for specific provider type."""
        return await self.find_by_attributes({'provider_type': provider_type})

    def _apply_filters(self, templates: List[Template], filters: Dict[str, Any]) -> List[Template]:
        """Apply filters to template list."""
        filtered_templates = []

        for template in templates:
            if self._template_matches_filters(template, filters):
                filtered_templates.append(template)

        return filtered_templates

    def _template_matches_filters(self, template: Template, filters: Dict[str, Any]) -> bool:
        """Check if template matches filter criteria."""
        for key, value in filters.items():
            if key == 'max_number_gte':
                if template.max_number < value:
                    return False
            elif key == 'max_number_lte':
                if template.max_number > value:
                    return False
            elif hasattr(template, key):
                if getattr(template, key) != value:
                    return False

        return True

    def _template_matches_attributes(self, template: Template, attributes: Dict[str, Any]) -> bool:
        """Check if template matches attribute criteria."""
        for key, value in attributes.items():
            if key not in template.attributes or template.attributes[key] != value:
                return False

        return True

Repository Registration and Usage

Repositories are registered in the DI container based on configuration.

Repository Registration

# src/infrastructure/di/repository_services.py
def register_repository_services(container: DIContainer) -> None:
    """Register repository services based on configuration."""

    config = container.get(ConfigurationPort)
    logger = container.get(LoggingPort)

    storage_type = config.get("storage.type", "memory")

    if storage_type == "dynamodb":
        _register_dynamodb_repositories(container, config, logger)
    elif storage_type == "memory":
        _register_memory_repositories(container, logger)
    else:
        raise ValueError(f"Unsupported storage type: {storage_type}")

def _register_dynamodb_repositories(container: DIContainer, 
                                   config: ConfigurationPort, 
                                   logger: LoggingPort) -> None:
    """Register DynamoDB repository implementations."""

    dynamodb_config = config.get_section("storage.dynamodb")

    # Register template repository
    container.register_singleton(
        TemplateRepository,
        lambda c: DynamoDBTemplateRepository(
            table_name=dynamodb_config.get("templates_table", "templates"),
            region=dynamodb_config.get("region", "us-east-1"),
            profile=dynamodb_config.get("profile"),
            logger=logger
        )
    )

    # Register request repository
    container.register_singleton(
        RequestRepository,
        lambda c: DynamoDBRequestRepository(
            table_name=dynamodb_config.get("requests_table", "requests"),
            region=dynamodb_config.get("region", "us-east-1"),
            profile=dynamodb_config.get("profile"),
            logger=logger
        )
    )

    # Register machine repository
    container.register_singleton(
        MachineRepository,
        lambda c: DynamoDBMachineRepository(
            table_name=dynamodb_config.get("machines_table", "machines"),
            region=dynamodb_config.get("region", "us-east-1"),
            profile=dynamodb_config.get("profile"),
            logger=logger
        )
    )

def _register_memory_repositories(container: DIContainer, logger: LoggingPort) -> None:
    """Register in-memory repository implementations."""

    container.register_singleton(
        TemplateRepository,
        lambda c: InMemoryTemplateRepository(logger=logger)
    )

    container.register_singleton(
        RequestRepository,
        lambda c: InMemoryRequestRepository(logger=logger)
    )

    container.register_singleton(
        MachineRepository,
        lambda c: InMemoryMachineRepository(logger=logger)
    )

Repository Usage in Application Layer

# src/application/commands/template_handlers.py
class GetTemplatesHandler:
    """Handler for retrieving templates."""

    def __init__(self, 
                 template_repo: TemplateRepository,
                 logger: LoggingPort):
        self._template_repo = template_repo
        self._logger = logger

    async def handle(self, query: GetTemplatesQuery) -> List[TemplateResponse]:
        """Handle template retrieval query."""
        self._logger.info("Handling get templates query")

        # Use repository to get templates
        templates = await self._template_repo.get_all(
            filters=query.filters,
            limit=query.limit,
            offset=query.offset
        )

        # Convert to response DTOs
        responses = [
            TemplateResponse.from_domain(template)
            for template in templates
        ]

        return responses

Benefits of Repository Pattern Implementation

Data Access Abstraction

  • Clean separation between business logic and data persistence
  • Consistent interface across different storage technologies
  • Easy switching between storage backends

Multiple Storage Support

  • DynamoDB for scalable cloud storage
  • In-memory for development and testing
  • Easy addition of new storage backends

Testability

  • Easy mocking of repository interfaces
  • In-memory implementations for fast testing
  • Isolated testing of business logic

Maintainability

  • Clear data access patterns
  • Centralized data access logic
  • Easy modification of storage implementations

This Repository pattern implementation provides a solid foundation for data access operations while maintaining clean architecture principles and supporting multiple storage backends in the Open Host Factory Plugin.