Unified container for all workflow state.
This is the single source of truth for workflow state, consolidating
what was previously scattered across IncrementalLogReader, WorkflowProgress,
TUI, EventAccumulator, and TimeEstimator.
Attributes:
| Name |
Type |
Description |
paths |
WorkflowPaths
|
Centralized path resolution.
|
jobs |
JobRegistry
|
Registry of all job state.
|
rules |
RuleRegistry
|
Registry of all rule timing statistics.
|
clock |
Clock
|
|
config |
EstimationConfig
|
Estimation configuration.
|
status |
WorkflowStatus
|
|
total_jobs |
int | None
|
Total number of jobs in workflow (if known).
|
start_time |
float | None
|
Workflow start timestamp.
|
current_log |
Path | None
|
Path to current log file being monitored.
|
Source code in snakesee/state/workflow_state.py
| @dataclass
class WorkflowState:
"""Unified container for all workflow state.
This is the single source of truth for workflow state, consolidating
what was previously scattered across IncrementalLogReader, WorkflowProgress,
TUI, EventAccumulator, and TimeEstimator.
Attributes:
paths: Centralized path resolution.
jobs: Registry of all job state.
rules: Registry of all rule timing statistics.
clock: Injectable time source.
config: Estimation configuration.
status: Overall workflow status.
total_jobs: Total number of jobs in workflow (if known).
start_time: Workflow start timestamp.
current_log: Path to current log file being monitored.
"""
paths: WorkflowPaths
jobs: JobRegistry
rules: RuleRegistry
clock: Clock
config: EstimationConfig
status: WorkflowStatus = field(default_factory=_default_status)
total_jobs: int | None = None
start_time: float | None = None
current_log: Path | None = None
@property
def completed_count(self) -> int:
"""Number of completed jobs."""
return len(self.jobs.completed())
@property
def failed_count(self) -> int:
"""Number of failed jobs."""
return len(self.jobs.failed())
@property
def running_count(self) -> int:
"""Number of currently running jobs."""
return len(self.jobs.running())
@property
def pending_count(self) -> int:
"""Number of pending jobs."""
from snakesee.state.job_registry import JobStatus
return len([j for j in self.jobs.all_jobs() if j.status == JobStatus.PENDING])
@property
def progress_fraction(self) -> float:
"""Fraction of jobs completed (0.0 to 1.0)."""
if self.total_jobs is None or self.total_jobs == 0:
return 0.0
return self.completed_count / self.total_jobs
@property
def elapsed_seconds(self) -> float | None:
"""Elapsed time since workflow start."""
if self.start_time is None:
return None
return self.clock.now() - self.start_time
def update_status(self) -> None:
"""Update workflow status based on current state."""
from snakesee.models import WorkflowStatus
if self.total_jobs is None:
# Unknown total - check if we have failures anyway
if self.failed_count > 0:
self.status = WorkflowStatus.FAILED
elif self.running_count > 0:
self.status = WorkflowStatus.RUNNING
else:
self.status = WorkflowStatus.UNKNOWN
elif self.running_count > 0:
self.status = WorkflowStatus.RUNNING
elif self.failed_count > 0 and self.completed_count < self.total_jobs:
self.status = WorkflowStatus.FAILED
elif self.completed_count >= self.total_jobs:
self.status = WorkflowStatus.COMPLETED
elif self.completed_count == 0 and self.running_count == 0:
self.status = WorkflowStatus.NOT_STARTED
else:
# Check for stale workflow
if self.elapsed_seconds is not None:
if self.elapsed_seconds > self.config.time.stale_workflow_threshold:
self.status = WorkflowStatus.STALE
else:
self.status = WorkflowStatus.RUNNING
else:
# No start time - assume incomplete
self.status = WorkflowStatus.INCOMPLETE
def record_job_completion(self, job_key: str) -> None:
"""Record a job completion and update rule statistics.
Args:
job_key: Key of the completed job.
"""
job = self.jobs.get(job_key)
if job is not None and job.duration is not None:
self.rules.record_job_completion(job)
def to_progress(self) -> WorkflowProgress:
"""Convert to WorkflowProgress for backward compatibility.
Returns:
WorkflowProgress snapshot of current state.
"""
from snakesee.models import WorkflowProgress
return WorkflowProgress(
workflow_dir=self.paths.workflow_dir,
status=self.status,
total_jobs=self.total_jobs or 0,
completed_jobs=self.completed_count,
running_jobs=[job.to_job_info() for job in self.jobs.running()],
failed_jobs=self.failed_count,
failed_jobs_list=[job.to_job_info() for job in self.jobs.failed()],
pending_jobs_list=[job.to_job_info() for job in self.jobs.submitted()],
start_time=self.start_time,
log_file=self.current_log,
)
def clear(self) -> None:
"""Clear all state."""
from snakesee.models import WorkflowStatus
self.jobs.clear()
self.rules.clear()
self.status = WorkflowStatus.UNKNOWN
self.total_jobs = None
self.start_time = None
self.current_log = None
@classmethod
def create(
cls,
workflow_dir: Path,
clock: Clock | None = None,
config: EstimationConfig | None = None,
) -> WorkflowState:
"""Create a new WorkflowState for a workflow directory.
Args:
workflow_dir: Path to the workflow directory.
clock: Optional clock for time operations.
config: Optional estimation configuration.
Returns:
New WorkflowState instance.
"""
from snakesee.state.clock import get_clock
from snakesee.state.config import DEFAULT_CONFIG
from snakesee.state.job_registry import JobRegistry
from snakesee.state.paths import WorkflowPaths
from snakesee.state.rule_registry import RuleRegistry
actual_clock = clock or get_clock()
actual_config = config or DEFAULT_CONFIG
return cls(
paths=WorkflowPaths(workflow_dir=workflow_dir),
jobs=JobRegistry(),
rules=RuleRegistry(config=actual_config),
clock=actual_clock,
config=actual_config,
)
|
Attributes
completed_count
property
Number of completed jobs.
elapsed_seconds
property
elapsed_seconds: float | None
Elapsed time since workflow start.
progress_fraction
property
Fraction of jobs completed (0.0 to 1.0).
running_count
property
Number of currently running jobs.
Functions
clear
Clear all state.
Source code in snakesee/state/workflow_state.py
| def clear(self) -> None:
"""Clear all state."""
from snakesee.models import WorkflowStatus
self.jobs.clear()
self.rules.clear()
self.status = WorkflowStatus.UNKNOWN
self.total_jobs = None
self.start_time = None
self.current_log = None
|
create
classmethod
Create a new WorkflowState for a workflow directory.
Parameters:
| Name |
Type |
Description |
Default |
workflow_dir
|
Path
|
Path to the workflow directory.
|
required
|
clock
|
Clock | None
|
Optional clock for time operations.
|
None
|
config
|
EstimationConfig | None
|
Optional estimation configuration.
|
None
|
Returns:
Source code in snakesee/state/workflow_state.py
| @classmethod
def create(
cls,
workflow_dir: Path,
clock: Clock | None = None,
config: EstimationConfig | None = None,
) -> WorkflowState:
"""Create a new WorkflowState for a workflow directory.
Args:
workflow_dir: Path to the workflow directory.
clock: Optional clock for time operations.
config: Optional estimation configuration.
Returns:
New WorkflowState instance.
"""
from snakesee.state.clock import get_clock
from snakesee.state.config import DEFAULT_CONFIG
from snakesee.state.job_registry import JobRegistry
from snakesee.state.paths import WorkflowPaths
from snakesee.state.rule_registry import RuleRegistry
actual_clock = clock or get_clock()
actual_config = config or DEFAULT_CONFIG
return cls(
paths=WorkflowPaths(workflow_dir=workflow_dir),
jobs=JobRegistry(),
rules=RuleRegistry(config=actual_config),
clock=actual_clock,
config=actual_config,
)
|
record_job_completion
record_job_completion(job_key: str) -> None
Record a job completion and update rule statistics.
Parameters:
| Name |
Type |
Description |
Default |
job_key
|
str
|
Key of the completed job.
|
required
|
Source code in snakesee/state/workflow_state.py
| def record_job_completion(self, job_key: str) -> None:
"""Record a job completion and update rule statistics.
Args:
job_key: Key of the completed job.
"""
job = self.jobs.get(job_key)
if job is not None and job.duration is not None:
self.rules.record_job_completion(job)
|
to_progress
Convert to WorkflowProgress for backward compatibility.
Returns:
Source code in snakesee/state/workflow_state.py
| def to_progress(self) -> WorkflowProgress:
"""Convert to WorkflowProgress for backward compatibility.
Returns:
WorkflowProgress snapshot of current state.
"""
from snakesee.models import WorkflowProgress
return WorkflowProgress(
workflow_dir=self.paths.workflow_dir,
status=self.status,
total_jobs=self.total_jobs or 0,
completed_jobs=self.completed_count,
running_jobs=[job.to_job_info() for job in self.jobs.running()],
failed_jobs=self.failed_count,
failed_jobs_list=[job.to_job_info() for job in self.jobs.failed()],
pending_jobs_list=[job.to_job_info() for job in self.jobs.submitted()],
start_time=self.start_time,
log_file=self.current_log,
)
|
update_status
Update workflow status based on current state.
Source code in snakesee/state/workflow_state.py
| def update_status(self) -> None:
"""Update workflow status based on current state."""
from snakesee.models import WorkflowStatus
if self.total_jobs is None:
# Unknown total - check if we have failures anyway
if self.failed_count > 0:
self.status = WorkflowStatus.FAILED
elif self.running_count > 0:
self.status = WorkflowStatus.RUNNING
else:
self.status = WorkflowStatus.UNKNOWN
elif self.running_count > 0:
self.status = WorkflowStatus.RUNNING
elif self.failed_count > 0 and self.completed_count < self.total_jobs:
self.status = WorkflowStatus.FAILED
elif self.completed_count >= self.total_jobs:
self.status = WorkflowStatus.COMPLETED
elif self.completed_count == 0 and self.running_count == 0:
self.status = WorkflowStatus.NOT_STARTED
else:
# Check for stale workflow
if self.elapsed_seconds is not None:
if self.elapsed_seconds > self.config.time.stale_workflow_threshold:
self.status = WorkflowStatus.STALE
else:
self.status = WorkflowStatus.RUNNING
else:
# No start time - assume incomplete
self.status = WorkflowStatus.INCOMPLETE
|