Event Subscribers
ASH provides a comprehensive event system that allows plugins to react to various events during the scanning process. This enables you to create custom logging, notifications, integrations, and other reactive behaviors.
For detailed visual diagrams of the event system architecture and workflow, see Event Subscriber Diagrams.
Overview
The event system uses a discovery-based pattern similar to how ASH discovers scanners, converters, and reporters. Event subscribers are registered using the ASH_EVENT_HANDLERS
dictionary in your plugin module.
Basic Event Subscriber
Here's a simple example of creating an event subscriber:
# my_ash_plugins/__init__.py
from automated_security_helper.plugins.events import AshEventType
def handle_scan_complete(**kwargs):
"""Handle scan complete event"""
scanner = kwargs.get('scanner', 'Unknown')
remaining_count = kwargs.get('remaining_count', 0)
print(f"Scanner '{scanner}' completed!")
if remaining_count > 0:
print(f"{remaining_count} scanners still running")
else:
print("All scanners completed!")
return True
# Event callback registry
ASH_EVENT_HANDLERS = {
AshEventType.SCAN_COMPLETE: [handle_scan_complete],
}
Available Event Types
ASH provides the following event types:
Phase Events
AshEventType.CONVERT_START
: Fired when the convert phase beginsAshEventType.CONVERT_COMPLETE
: Fired when the convert phase completesAshEventType.SCAN_START
: Fired when the scan phase beginsAshEventType.SCAN_COMPLETE
: Fired when each individual scanner completesAshEventType.REPORT_START
: Fired when the report phase beginsAshEventType.REPORT_COMPLETE
: Fired when the report phase completes
General Events
AshEventType.ERROR
: Fired when errors occurAshEventType.WARNING
: Fired for warning conditionsAshEventType.INFO
: Fired for informational events
Event Data
Each event type provides specific data through keyword arguments:
SCAN_COMPLETE Event Data
The SCAN_COMPLETE
event is fired each time an individual scanner finishes and provides:
scanner
: Name of the completed scannercompleted_count
: Number of scanners completed so fartotal_count
: Total number of scannersremaining_count
: Number of scanners still runningremaining_scanners
: List of remaining scanner namesmessage
: Human-readable summary messagephase
: The phase name ("scan")plugin_context
: The current plugin context
Common Event Data
All events include:
phase
: The name of the current phaseplugin_context
: The current plugin context with source/output directories and configuration
Multiple Event Subscribers
You can register multiple subscribers for the same event:
def log_scan_completion(**kwargs):
"""Log scan completion to file"""
scanner = kwargs.get('scanner')
with open('/tmp/scan.log', 'a') as f:
f.write(f"Scanner {scanner} completed at {datetime.now()}\n")
return True
def notify_scan_completion(**kwargs):
"""Send notification about scan completion"""
scanner = kwargs.get('scanner')
remaining = kwargs.get('remaining_count', 0)
# Send notification logic here
return True
ASH_EVENT_HANDLERS = {
AshEventType.SCAN_COMPLETE: [
log_scan_completion,
notify_scan_completion,
],
}
Multiple Event Types
You can subscribe to multiple event types:
def handle_phase_start(**kwargs):
"""Handle any phase start"""
phase = kwargs.get('phase', 'Unknown')
print(f"Phase '{phase}' started")
return True
def handle_phase_complete(**kwargs):
"""Handle any phase completion"""
phase = kwargs.get('phase', 'Unknown')
print(f"Phase '{phase}' completed")
return True
ASH_EVENT_HANDLERS = {
AshEventType.SCAN_START: [handle_phase_start],
AshEventType.SCAN_COMPLETE: [handle_scan_completion],
AshEventType.CONVERT_START: [handle_phase_start],
AshEventType.CONVERT_COMPLETE: [handle_phase_complete],
AshEventType.REPORT_START: [handle_phase_start],
AshEventType.REPORT_COMPLETE: [handle_phase_complete],
}
Error Handling
Event subscribers should handle errors gracefully to avoid disrupting the scan process:
def robust_event_handler(**kwargs):
"""Event handler with proper error handling"""
try:
scanner = kwargs.get('scanner', 'Unknown')
# Your event handling logic here
print(f"Processing completion of {scanner}")
return True
except Exception as e:
# Log the error but don't re-raise to avoid disrupting the scan
print(f"Error in event handler: {e}")
return False
Real-World Examples
Slack Notifications
import requests
def notify_slack_on_completion(**kwargs):
"""Send Slack notification when all scanners complete"""
remaining_count = kwargs.get('remaining_count', 0)
if remaining_count == 0: # All scanners completed
webhook_url = os.environ.get("SLACK_WEBHOOK", None)
if webhook_url is None:
ASH_LOGGER.error("SLACK_WEBHOOK variable is unset! Unable to send webhook.")
return False
message = {
"text": "🎉 ASH security scan completed successfully!",
"channel": "#security-alerts"
}
try:
requests.post(webhook_url, json=message)
except Exception as e:
print(f"Failed to send Slack notification: {e}")
return True
ASH_EVENT_HANDLERS = {
AshEventType.EXECUTION_COMPLETE: [notify_slack_on_completion],
}
Custom Metrics Collection
import time
# Global state for tracking metrics
scan_metrics = {}
def track_scan_metrics(**kwargs):
"""Track scan performance metrics"""
scanner = kwargs.get('scanner')
completed_count = kwargs.get('completed_count', 0)
total_count = kwargs.get('total_count', 0)
# Record completion time
scan_metrics[scanner]['completed_at'] = time.time()
# Calculate progress
progress = (completed_count / total_count) * 100 if total_count > 0 else 0
print(f"Scan progress: {progress:.1f}% ({completed_count}/{total_count})")
return True
ASH_EVENT_HANDLERS = {
AshEventType.SCAN_COMPLETE: [track_scan_metrics],
}
Integration with External Systems
import json
import requests
from datetime import datetime, timezone
def send_to_monitoring_system(**kwargs):
"""Send scan completion data to external monitoring system"""
try:
scanner = kwargs.get('scanner')
completed_count = kwargs.get('completed_count', 0)
total_count = kwargs.get('total_count', 0)
remaining_count = kwargs.get('remaining_count', 0)
# Prepare monitoring data
monitoring_data = {
'timestamp': datetime.now(timezone.utc).isoformat(),
'event_type': 'scanner_completed',
'scanner_name': scanner,
'progress': {
'completed': completed_count,
'total': total_count,
'remaining': remaining_count,
'percentage': (completed_count / total_count * 100) if total_count > 0 else 0
}
}
# Send to monitoring endpoint
response = requests.post(
'https://monitoring.example.com/api/events',
json=monitoring_data,
headers={'Content-Type': 'application/json'},
timeout=5
)
if response.status_code == 200:
print(f"Successfully sent monitoring data for {scanner}")
else:
print(f"Failed to send monitoring data: {response.status_code}")
except Exception as e:
print(f"Error sending monitoring data: {e}")
return True
ASH_EVENT_HANDLERS = {
AshEventType.SCAN_COMPLETE: [send_to_monitoring_system],
}
Database Logging
import sqlite3
from datetime import datetime
def log_to_database(**kwargs):
"""Log scan events to SQLite database"""
try:
scanner = kwargs.get('scanner')
completed_count = kwargs.get('completed_count', 0)
total_count = kwargs.get('total_count', 0)
phase = kwargs.get('phase', 'unknown')
# Connect to database
conn = sqlite3.connect('/tmp/ash_scan_log.db')
cursor = conn.cursor()
# Create table if it doesn't exist
cursor.execute('''
CREATE TABLE IF NOT EXISTS scan_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
phase TEXT,
scanner TEXT,
completed_count INTEGER,
total_count INTEGER,
progress_percentage REAL
)
''')
# Insert event data
progress = (completed_count / total_count * 100) if total_count > 0 else 0
cursor.execute('''
INSERT INTO scan_events
(timestamp, phase, scanner, completed_count, total_count, progress_percentage)
VALUES (?, ?, ?, ?, ?, ?)
''', (
datetime.now(timezone.utc).isoformat(),
phase,
scanner,
completed_count,
total_count,
progress
))
conn.commit()
conn.close()
print(f"Logged scan event for {scanner} to database")
except Exception as e:
print(f"Error logging to database: {e}")
return True
ASH_EVENT_HANDLERS = {
AshEventType.SCAN_COMPLETE: [log_to_database],
}
Plugin Discovery
ASH automatically discovers event subscribers by:
- Loading modules specified in the
internal_modules
list (for built-in plugins) - Loading additional modules specified in configuration via
ash_plugin_modules
- Scanning for
ASH_EVENT_HANDLERS
constants in loaded modules - Registering discovered event subscribers with the plugin manager
The event subscribers are called in the order they appear in the callback list for each event type.
Best Practices
- Return Values: Always return
True
for successful handling orFalse
for errors - Error Handling: Use try-catch blocks to prevent event handler errors from disrupting scans
- Performance: Keep event handlers lightweight to avoid slowing down the scan process
- Logging: Use appropriate log levels and avoid excessive output
- State Management: Be careful with global state in multi-threaded environments
- Resource Cleanup: Clean up any resources (files, connections) in your event handlers
- Timeouts: Use timeouts for external API calls to prevent hanging
- Graceful Degradation: Design handlers to fail gracefully without affecting the main scan process
Advanced Usage
Conditional Event Handling
def conditional_handler(**kwargs):
"""Only handle events under certain conditions"""
scanner = kwargs.get('scanner')
remaining_count = kwargs.get('remaining_count', 0)
# Only notify for critical scanners or when all complete
critical_scanners = ['bandit', 'semgrep', 'checkov']
if scanner in critical_scanners or remaining_count == 0:
print(f"Important: {scanner} completed!")
# Send notification logic here
return True
Event Filtering
def filtered_handler(**kwargs):
"""Filter events based on context"""
plugin_context = kwargs.get('plugin_context')
# Only handle events for certain source directories
if plugin_context and 'production' in str(plugin_context.source_dir):
scanner = kwargs.get('scanner')
print(f"Production scan: {scanner} completed")
# Handle production-specific logic
return True
Stateful Event Handling
class ScanProgressTracker:
def __init__(self):
self.start_time = None
self.completed_scanners = []
def handle_scan_start(self, **kwargs):
"""Track scan start time"""
self.start_time = time.time()
self.completed_scanners = []
print("Scan progress tracking started")
return True
def handle_scan_complete(self, **kwargs):
"""Track individual scanner completion"""
scanner = kwargs.get('scanner')
remaining_count = kwargs.get('remaining_count', 0)
self.completed_scanners.append(scanner)
if self.start_time:
elapsed = time.time() - self.start_time
print(f"Scanner {scanner} completed after {elapsed:.1f}s")
if remaining_count == 0:
total_time = time.time() - self.start_time if self.start_time else 0
print(f"All scanners completed in {total_time:.1f}s")
print(f"Completion order: {', '.join(self.completed_scanners)}")
return True
# Create tracker instance
tracker = ScanProgressTracker()
ASH_EVENT_HANDLERS = {
AshEventType.SCAN_START: [tracker.handle_scan_start],
AshEventType.SCAN_COMPLETE: [tracker.handle_scan_complete],
}
Integration with Built-in Events
ASH includes built-in event subscribers for core functionality like scan completion logging. Your custom event subscribers will run alongside these built-in handlers, allowing you to extend ASH's behavior without replacing core functionality.
The built-in scan completion logger provides enhanced logging that shows remaining scanners:
Your custom event subscribers will receive the same event data and can provide additional functionality like notifications, metrics collection, or integration with external systems.