Executive: The Orchestrator
The Executive is the central nervous system of any Procela simulation. It orchestrates mechanisms, variables, and governance units, ensuring they work together coherently while maintaining complete auditability and reproducibility.
This guide covers:
- Core responsibilities and architecture
- Basic and advanced configuration
- State management and checkpointing
- Parallel and distributed execution
- Event system for extensibility
- Performance optimization techniques
- Error handling and recovery
- Practical examples for different use cases
- Best practices for production use
Core Responsibilities
The Executive handles:
- Scheduling: Executes mechanisms
- State Management: Maintains random state, step counter, and system history
- Resolution: Coordinates variable policy resolution
- Governance: Triggers governance units at appropriate phases
- Auditability: Tracks every action for reproducibility
- Error Handling: Manages failures and rollbacks
Basic Executive Usage
Creating and Running a Simulation
from procela import Executive, Mechanism, Variable
# Create components
X = Variable("X", RangeDomain(0, 100))
Y = Variable("Y", RangeDomain(0, 100))
mechanisms = [MyMechanism(), AnotherMechanism()]
# Create executive
executive = Executive(mechanisms=mechanisms)
# Initialize and run
executive.run(steps=100)
# Access results
print(f"Final X: {X.value}, Final Y: {Y.value}")
Executive Lifecycle
# 1. Creation
executive = Executive(mechanisms=mechanisms)
# 3. Execution
executive.run(steps=100) # Run simulation
# 4. (Optional) Reset or continue
executive.reset() # Reset a new world with same configurations
executive.run(steps=50) # Run additional steps
Executive Architecture
Internal Structure
┌─────────────────────────────────────────────────────────────┐
│ EXECUTIVE │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Mechanisms │ │ Variables │ │ Governance │ │
│ │ Registry │ │ Registry │ │ Units │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Execution Pipeline │ │
│ │ ┌──────┐ ┌──────────┐ ┌─────────┐ ┌──────────┐ │ │
│ │ │ PRE │→ │MECHANISMS│→ │RESOLVE │→ │ POST │ │ │
│ │ │Phase │ │ Phase │ │ Phase │ │ Phase │ │ │
│ │ └──────┘ └──────────┘ └─────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Random │ │ History │ │ Checkpoint │ │
│ │ State │ │ Store │ │ Manager │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Execution Pipeline
Each simulation step follows this pipeline:
def step(self):
"""Execute one simulation step"""
# Phase 1: PRE
self._check_invariants(InvariantPhase.PRE)
# Phase 2: MECHANISM EXECUTION
for mechanism in self.active_mechanisms:
mechanism.transform()
# # Phase 3: RUNTIME
self._check_invariants(InvariantPhase.RUNTIME)
# Phase 3: RESOLUTION
for variable in self.variables:
variable.resolve()
# Phase 4: POST
self._check_invariants(InvariantPhase.POST)
self._step_index += 1
Advanced Configuration
Custom Execution Order
Control the order of mechanism execution:
class OrderedExecutive(Executive):
"""Executive with explicit mechanism ordering"""
def __init__(self, mechanisms, execution_order=None):
super().__init__(mechanisms=mechanisms)
self.execution_order = execution_order or mechanisms
def _execute_mechanisms(self):
"""Execute in specified order"""
for mechanism in self.execution_order:
if mechanism.is_enabled():
mechanism.run()
Conditional Execution
Execute mechanisms based on conditions:
class ConditionalExecutive(Executive):
"""Executive that conditionally executes mechanisms"""
def __init__(self, mechanisms, condition_func=None):
super().__init__(mechanisms=mechanisms)
self.condition_func = condition_func
def step(self):
"""Execute step with conditional mechanisms"""
if self.condition_func:
# Determine which mechanisms should run
to_execute = [m for m in self.mechanisms()
if m.is_enabled and self.condition_func(m, self)]
else:
to_execute = self.mechanisms()
# Execute selected mechanisms
for mechanism in to_execute:
mechanism.run()
# Resolution
for variable in self.writable():
variable.resolve_conflict()
State Management
Random State Control
import numpy as np
class ReproducibleExecutive(Executive):
"""Executive with deterministic random behavior"""
def __init__(self, mechanisms, random_seed=42):
super().__init__(mechanisms=mechanisms)
self.random_seed = random_seed
self.random_state = np.random.RandomState(random_seed)
def step(self):
"""Execute with deterministic random state"""
# Set random state for all mechanisms
for mechanism in self.mechanisms():
mechanism.random_state = self.random_state
super().step()
def reset_random_state(self):
"""Reset random state to initial seed"""
self.random_state = np.random.RandomState(self.random_seed)
Checkpoint and Restore
from procela import Executive, Mechanism
class CheckpointManager:
"""Manages simulation checkpoints"""
def __init__(self, executive: Executive):
self.executive = executive
self.checkpoints = {}
def save_checkpoint(self, name):
"""Save current simulation state"""
checkpoint = self.executive.create_checkpoint()
self.checkpoints[name] = checkpoint
return checkpoint
def restore_checkpoint(self, name):
"""Restore simulation state from checkpoint"""
checkpoint = self.checkpoints.get(name)
self.executive.restore_checkpoint(checkpoint)
# Usage
executive = Executive(mechanisms=mechanisms)
checkpoint_mgr = CheckpointManager(executive)
executive.run(steps=50)
checkpoint_mgr.save_checkpoint('after_50_steps')
# Continue or experiment
executive.run(steps=25)
checkpoint_mgr.restore_checkpoint('after_50_steps') # Rollback
Parallel Execution
Multi-Threaded Mechanism Execution
from concurrent.futures import ThreadPoolExecutor
import threading
from procela import Executive
class ParallelExecutive(Executive):
"""Executive that executes mechanisms in parallel"""
def __init__(self, mechanisms, max_workers=None):
super().__init__(mechanisms=mechanisms)
self.max_workers = max_workers or len(mechanisms)
self.lock = threading.Lock()
def step(self):
"""Execute mechanisms concurrently"""
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all mechanisms for execution
futures = [
executor.submit(self._safe_transform, mechanism)
for mechanism in self.mechanisms()
]
# Wait for completion
for future in futures:
future.result()
def _safe_transform(self, mechanism):
"""Thread-safe mechanism execution"""
try:
mechanism.transform()
except Exception as e:
with self.lock:
print(f"Error in {mechanism.key()}: {e}")
raise
Distributed Execution (Batch Processing)
from procela import Executive
class BatchExecutive:
"""Executive for batch processing of multiple scenarios"""
def __init__(self, mechanism_factory, n_scenarios=10):
super().__init__(mechanisms=mechanism_factory())
self.mechanism_factory = mechanism_factory
self.n_scenarios = n_scenarios
self.results = []
def run_scenarios(self, steps=100):
"""Run multiple scenarios in parallel"""
import multiprocessing as mp
with mp.Pool(processes=mp.cpu_count()) as pool:
scenarios = [
pool.apply_async(self._run_scenario, (steps, seed))
for seed in range(self.n_scenarios)
]
self.results = [s.get() for s in scenarios]
return self.results
def _run_scenario(self, steps, seed):
"""Run single scenario"""
mechanisms = self.mechanism_factory()
executive = Executive(mechanisms=mechanisms)
executive.rng.seed(seed)
executive.run(steps=steps)
return {
'seed': seed,
'final_values': {var.name: var.value for var in executive.variables()},
'memory': {var.name: var.memory for var in executive.variables()}
}
Event System
Custom Event Handlers
from procela import Executive
class EventDrivenExecutive(Executive):
"""Executive with event emission"""
def __init__(self, mechanisms):
super().__init__(mechanisms=mechanisms)
self.event_handlers = {
'pre_step': [],
'post_mechanism': [],
'post_resolution': [],
'post_step': [],
'error': []
}
def on(self, event, handler):
"""Register event handler"""
if event in self.event_handlers:
self.event_handlers[event].append(handler)
def _emit(self, event, *args, **kwargs):
"""Emit event to registered handlers"""
for handler in self.event_handlers.get(event, []):
try:
handler(*args, **kwargs)
except Exception as e:
print(f"Error in event handler: {e}")
def step(self):
"""Execute step with events"""
try:
self._emit('pre_step', self.step_index())
# Execute mechanisms
for mechanism in self.mechanisms():
if mechanism.is_enabled():
mechanism.run()
self._emit('post_mechanism', mechanism, self.step_index())
# Resolve variables
for variable in self.writable():
variable.resolve_conflict()
self._emit('post_resolution', self.variables(), self.step_index())
self._step_index += 1
self._emit('post_step', self.step_index())
except Exception as e:
self._emit('error', e, self.step_index())
raise
# Usage
executive = EventDrivenExecutive(mechanisms)
def log_step(step):
print(f"Starting step {step}")
def log_variables(variables, step):
print(f"Step {step}: X={variables[0].value:.2f}")
executive.on('pre_step', log_step)
executive.on('post_resolution', log_variables)
executive.run(steps=10)
Performance Optimization
Lazy Variable Resolution
from procela import Executive, InvariantPhase
class OptimizedExecutive(Executive):
"""Executive with performance optimizations"""
def __init__(self, mechanisms, lazy_resolution=False):
super().__init__(mechanisms=mechanisms)
self.lazy_resolution = lazy_resolution
self._resolution_cache = {}
def step(self):
"""Optimized step execution"""
# Phase 1: PRE invariants
self._check_invariants(InvariantPhase.PRE)
# Phase 2: Execute mechanisms
for mechanism in self.mechanisms():
mechanism.run()
# Phase 3: RUNTIME invariants
self._check_invariants(InvariantPhase.RUNTIME)
# Phase 4: Resolve variables (with caching)
for variable in self.writable():
if self.lazy_resolution:
# Only resolve if value was requested
self._mark_dirty(variable)
else:
variable.resolve_conflict()
# Phase 5: POST invariants
self._check_invariants(InvariantPhase.POST)
self._step_index += 1
def get_variable_value(self, variable):
"""Lazy resolution - resolve only when needed"""
if self.lazy_resolution and variable in self._dirty_variables:
variable.resolve_conflict()
self._dirty_variables.remove(variable)
return variable.value
Profiling and Monitoring
import numpy as np
import cProfile
import pstats
from io import StringIO
from procela import Executive, InvariantPhase, Timer
class ProfiledExecutive(Executive):
"""Executive with built-in profiling"""
def __init__(self, mechanisms, profile=False):
super().__init__(mechanisms=mechanisms)
self.profile = profile
self.profiler = cProfile.Profile() if profile else None
self.timings = {
'mechanisms': [],
'resolution': [],
'governance': []
}
def run(self, steps=100):
"""Run with profiling"""
if self.profile:
self.profiler.enable()
try:
with Timer() as timer:
super().run(steps)
finally:
elapsed = timer.elapsed
if self.profile:
self.profiler.disable()
self._print_profile_stats()
print(f"\nTotal execution time: {elapsed:.2f}s")
def _print_profile_stats(self):
"""Print profiling statistics"""
s = StringIO()
ps = pstats.Stats(self.profiler, stream=s).sort_stats('cumulative')
ps.print_stats(20) # Top 20 functions
print(s.getvalue())
def _timed_execute(self, name, func):
"""Time execution of a function"""
with Timer() as timer:
result = func()
self.timings[name].append(timer.elapsed)
return result
def step(self):
"""Step with timing"""
# Time mechanisms
self._timed_execute('mechanisms', self._execute_mechanisms)
# Time resolution
self._timed_execute('resolution', self._resolve_variables)
# Time governance
self._timed_execute('governance', lambda: self._check_invariants(InvariantPhase.POST))
self.step_counter += 1
def print_timing_report(self):
"""Print performance report"""
print("\n=== Performance Report ===")
for name, timings in self.timings.items():
if timings:
avg = np.mean(timings) * 1000 # ms
total = np.sum(timings)
print(f"{name:15s}: avg={avg:.2f}ms, total={total:.2f}s, calls={len(timings)}")
Error Handling and Recovery
Robust Executive with Recovery
import time
from procela import Executive
class RobustExecutive(Executive):
"""Executive with error recovery mechanisms"""
def __init__(self, mechanisms, max_retries=3):
super().__init__(mechanisms=mechanisms)
self.max_retries = max_retries
self.error_log = []
def step(self):
"""Execute step with retry logic"""
retries = 0
last_error = None
while retries < self.max_retries:
try:
# Save checkpoint before step
checkpoint = self.create_checkpoint()
# Execute step
super().step()
# Success
return
except Exception as e:
last_error = e
retries += 1
# Log error
self.error_log.append({
'step': self.step_index(),
'error': str(e),
'retry': retries
})
# Restore checkpoint
self.restore_checkpoint(checkpoint)
print(f"⚠️ Error at step {self.step_index()}, retry {retries}/{self.max_retries}: {e}")
# Exponential backoff
time.sleep(0.1 * (2 ** retries))
# Max retries exceeded
raise RuntimeError(
f"Failed to execute step {self.step_index()} after "
f"{self.max_retries} retries"
) from last_error
Practical Examples
Example 1: Multi-Step Training Simulation
from procela import Executive
class TrainingExecutive(Executive):
"""Executive for training simulations with phases"""
def __init__(self, mechanisms, phase_durations):
super().__init__(mechanisms=mechanisms)
self.phase_durations = phase_durations
self.current_phase = 0
self.phase_results = []
def run(self):
"""Run through training phases"""
for phase, duration in enumerate(self.phase_durations):
self.current_phase = phase
print(f"\n=== Phase {phase + 1}/{len(self.phase_durations)} ===")
# Configure for this phase
self._configure_phase(phase)
# Run phase
super().run(steps=duration)
# Record results
self.phase_results.append({
'phase': phase,
'final_state': {var.name: var.value for var in self.variables()},
'metrics': self._compute_metrics()
})
# Transition to next phase
self._transition_phase(phase)
return self.phase_results
def _configure_phase(self, phase):
"""Configure executive for specific phase"""
if phase == 0:
# Exploration phase
for mechanism in self.mechanisms():
mechanism.exploration_rate = 0.5
elif phase == 1:
# Exploitation phase
for mechanism in self.mechanisms():
mechanism.exploration_rate = 0.1
Example 2: Adaptive Executive
import numpy as np
from procela import Executive, HighestConfidencePolicy, WeightedVotingPolicy
class AdaptiveExecutive(Executive):
"""Executive that adapts its behavior based on signals"""
def __init__(self, mechanisms, performance_threshold=0.5):
super().__init__(mechanisms=mechanisms)
self.performance_threshold = performance_threshold
self.performance_history = []
def step(self):
"""Adaptive step execution"""
# Normal execution
super().step()
# Monitor performance
performance = self._compute_performance()
self.performance_history.append(performance)
# Adapt if performance drops
if len(self.performance_history) > 10:
recent_avg = np.mean(self.performance_history[-10:])
if recent_avg < self.performance_threshold:
self._adapt_execution()
def _adapt_execution(self):
"""Adapt execution strategy"""
print("⚠️ Performance below threshold, adapting...")
# Option 1: Increase governance sensitivity
for invariant in self._invariants:
if hasattr(invariant, 'threshold'):
invariant.threshold *= 0.9
# Option 2: Activate backup mechanisms
for mechanism in self.mechanisms():
if hasattr(mechanism, 'backup_mode'):
mechanism.backup_mode = True
# Option 3: Change resolution policies
for variable in self.variables():
if isinstance(variable.policy, WeightedVotingPolicy):
variable.policy = HighestConfidencePolicy()
Best Practices
1. Use Random Number Generator with seeds for Reproducibility
import random
import numpy as np
from procela import Executive
# Good
rng = np.random.default_rng(42)
# or
#rng = random.Random(42)
executive = Executive(mechanisms=mechanisms, rng=rng)
# Bad - non-deterministic
executive = Executive(mechanisms=mechanisms)
2. Monitor Performance
# Track execution time for large simulations
from procela import Executive, Timer
executive = Executive(...)
with Timer() as timer:
executive.run(steps=10000)
print(f"Execution time: {timer.elapsed:.2f}s")
Next Steps
- Learn about Variables that the Executive orchestrates
- Explore Mechanisms that the Executive executes
- Understand Governance that the Executive triggers
- See the API Reference for complete Executive documentation