Skip to content

state

Centralized state management for snakesee.

This module provides a unified architecture for managing workflow state, replacing scattered state across multiple components with a single source of truth.

Key classes: - Clock: Injectable time source for testability - EstimationConfig: Centralized configuration for estimation - WorkflowPaths: Centralized path resolution - JobRegistry: Single source of truth for job state (Phase 6) - RuleRegistry: Centralized rule statistics (Phase 7) - WorkflowState: Top-level state container (Phase 8)

Classes

Clock

Bases: Protocol

Protocol for injectable time sources.

This enables deterministic testing by allowing tests to provide a controlled time source instead of using real wall-clock time.

Source code in snakesee/state/clock.py
class Clock(Protocol):
    """Protocol for injectable time sources.

    This enables deterministic testing by allowing tests to provide
    a controlled time source instead of using real wall-clock time.
    """

    def now(self) -> float:
        """Return current time as Unix timestamp (seconds since epoch)."""
        ...

    def monotonic(self) -> float:
        """Return monotonic clock value for measuring durations.

        This is not affected by system clock adjustments and is suitable
        for measuring elapsed time.
        """
        ...

Functions

monotonic
monotonic() -> float

Return monotonic clock value for measuring durations.

This is not affected by system clock adjustments and is suitable for measuring elapsed time.

Source code in snakesee/state/clock.py
def monotonic(self) -> float:
    """Return monotonic clock value for measuring durations.

    This is not affected by system clock adjustments and is suitable
    for measuring elapsed time.
    """
    ...
now
now() -> float

Return current time as Unix timestamp (seconds since epoch).

Source code in snakesee/state/clock.py
def now(self) -> float:
    """Return current time as Unix timestamp (seconds since epoch)."""
    ...

ConfidenceThresholds dataclass

Thresholds for confidence-based decisions.

Attributes:

Name Type Description
size_scaling_min float

Min confidence to use size-scaled estimate.

high_confidence float

Show simple estimate without range.

medium_confidence float

Show range.

low_confidence float

Show "rough" caveat.

max_confidence float

Cap to avoid overconfidence.

bootstrap_confidence float

Confidence when no progress yet.

Source code in snakesee/state/config.py
@dataclass(frozen=True)
class ConfidenceThresholds:
    """Thresholds for confidence-based decisions.

    Attributes:
        size_scaling_min: Min confidence to use size-scaled estimate.
        high_confidence: Show simple estimate without range.
        medium_confidence: Show range.
        low_confidence: Show "rough" caveat.
        max_confidence: Cap to avoid overconfidence.
        bootstrap_confidence: Confidence when no progress yet.
    """

    size_scaling_min: float = 0.3
    high_confidence: float = 0.7
    medium_confidence: float = 0.4
    low_confidence: float = 0.1
    max_confidence: float = 0.9
    bootstrap_confidence: float = 0.05

ConfidenceWeights dataclass

Weights for computing overall confidence score.

Used in _estimate_weighted() to combine multiple factors. All weights should sum to 1.0.

Attributes:

Name Type Description
sample_size float

Weight for historical sample count.

recency float

Weight for data recency.

consistency float

Weight for recent run consistency.

data_coverage float

Weight for fraction of rules with data.

Raises:

Type Description
ValueError

If weights do not sum to 1.0.

Source code in snakesee/state/config.py
@dataclass(frozen=True)
class ConfidenceWeights:
    """Weights for computing overall confidence score.

    Used in _estimate_weighted() to combine multiple factors.
    All weights should sum to 1.0.

    Attributes:
        sample_size: Weight for historical sample count.
        recency: Weight for data recency.
        consistency: Weight for recent run consistency.
        data_coverage: Weight for fraction of rules with data.

    Raises:
        ValueError: If weights do not sum to 1.0.
    """

    sample_size: float = 0.4
    recency: float = 0.3
    consistency: float = 0.2
    data_coverage: float = 0.1

    def __post_init__(self) -> None:
        """Validate that weights sum to 1.0."""
        total = self.sample_size + self.recency + self.consistency + self.data_coverage
        if abs(total - 1.0) > 0.001:
            raise ValueError(f"Confidence weights must sum to 1.0, got {total}")

Functions

__post_init__
__post_init__() -> None

Validate that weights sum to 1.0.

Source code in snakesee/state/config.py
def __post_init__(self) -> None:
    """Validate that weights sum to 1.0."""
    total = self.sample_size + self.recency + self.consistency + self.data_coverage
    if abs(total - 1.0) > 0.001:
        raise ValueError(f"Confidence weights must sum to 1.0, got {total}")

EstimationConfig dataclass

Central configuration for time estimation.

This class consolidates all estimation-related configuration that was previously scattered across multiple files as magic numbers.

Attributes:

Name Type Description
weighting_strategy WeightingStrategy

How to weight historical data ("time" or "index").

half_life_days float

Half-life in days for time-based weighting.

half_life_logs int

Half-life in run count for index-based weighting.

variance VarianceMultipliers

Variance multiplier settings.

confidence_weights ConfidenceWeights

Weights for confidence calculation.

confidence_thresholds ConfidenceThresholds

Decision thresholds based on confidence.

time TimeConstants

Time-related constants.

min_samples_for_conditioning int

Minimum samples for wildcard conditioning.

min_pairs_for_size_scaling int

Minimum pairs for size correlation.

high_variance_cv float

Coefficient of variation threshold for "high variance".

default_global_mean float

Fallback duration when no data available.

Raises: ValueError: If any parameter is invalid (e.g., non-positive half_life).

Source code in snakesee/state/config.py
@dataclass(frozen=True)
class EstimationConfig:
    """Central configuration for time estimation.

    This class consolidates all estimation-related configuration that was
    previously scattered across multiple files as magic numbers.

    Attributes:
        weighting_strategy: How to weight historical data ("time" or "index").
        half_life_days: Half-life in days for time-based weighting.
        half_life_logs: Half-life in run count for index-based weighting.
        variance: Variance multiplier settings.
        confidence_weights: Weights for confidence calculation.
        confidence_thresholds: Decision thresholds based on confidence.
        time: Time-related constants.
        min_samples_for_conditioning: Minimum samples for wildcard conditioning.
        min_pairs_for_size_scaling: Minimum pairs for size correlation.
        high_variance_cv: Coefficient of variation threshold for "high variance".
        default_global_mean: Fallback duration when no data available.
    Raises:
        ValueError: If any parameter is invalid (e.g., non-positive half_life).
    """

    weighting_strategy: WeightingStrategy = "index"
    half_life_days: float = 7.0
    half_life_logs: int = 10

    variance: VarianceMultipliers = field(default_factory=VarianceMultipliers)
    confidence_weights: ConfidenceWeights = field(default_factory=ConfidenceWeights)
    confidence_thresholds: ConfidenceThresholds = field(default_factory=ConfidenceThresholds)
    time: TimeConstants = field(default_factory=TimeConstants)

    # Sample size thresholds
    # Synced with constants.MIN_SAMPLES_FOR_CONDITIONING
    min_samples_for_conditioning: int = 3
    min_pairs_for_size_scaling: int = 3
    min_samples_for_confidence: int = 10

    # Variance thresholds
    high_variance_cv: float = 0.5
    variance_ratio_threshold: float = 0.1

    # Default values
    default_global_mean: float = 60.0
    default_recency_factor: float = 0.5

    # Size scaling bounds
    size_dampening_power: float = 0.5
    size_ratio_min: float = 0.5
    size_ratio_max: float = 2.0
    size_confidence_max: float = 0.8
    size_confidence_divisor: int = 10

    # Bootstrap estimation bounds (when no jobs completed)
    bootstrap_lower_multiplier: float = 0.2
    bootstrap_upper_multiplier: float = 3.0

    # Simple estimate bounds (used when no historical data)
    simple_estimate_confidence_cap: float = 0.7
    simple_estimate_jobs_divisor: int = 20

    # Fuzzy rule matching
    fuzzy_match_max_distance: int = 3

    def __post_init__(self) -> None:
        """Validate configuration parameters."""
        if self.half_life_logs <= 0:
            raise ValueError(f"half_life_logs must be > 0, got {self.half_life_logs}")
        if self.half_life_days <= 0:
            raise ValueError(f"half_life_days must be > 0, got {self.half_life_days}")
        if self.default_global_mean <= 0:
            raise ValueError(f"default_global_mean must be > 0, got {self.default_global_mean}")

Functions

__post_init__
__post_init__() -> None

Validate configuration parameters.

Source code in snakesee/state/config.py
def __post_init__(self) -> None:
    """Validate configuration parameters."""
    if self.half_life_logs <= 0:
        raise ValueError(f"half_life_logs must be > 0, got {self.half_life_logs}")
    if self.half_life_days <= 0:
        raise ValueError(f"half_life_days must be > 0, got {self.half_life_days}")
    if self.default_global_mean <= 0:
        raise ValueError(f"default_global_mean must be > 0, got {self.default_global_mean}")

FrozenClock

Clock frozen at a specific time for testing.

Useful for testing time-dependent logic without flakiness.

Attributes:

Name Type Description
frozen_time

The frozen Unix timestamp.

frozen_monotonic

The frozen monotonic value.

Example

clock = FrozenClock(1700000000.0) assert clock.now() == 1700000000.0

clock.advance(60.0) # Advance by 1 minute assert clock.now() == 1700000060.0

Source code in snakesee/state/clock.py
class FrozenClock:
    """Clock frozen at a specific time for testing.

    Useful for testing time-dependent logic without flakiness.

    Attributes:
        frozen_time: The frozen Unix timestamp.
        frozen_monotonic: The frozen monotonic value.

    Example:
        clock = FrozenClock(1700000000.0)
        assert clock.now() == 1700000000.0

        clock.advance(60.0)  # Advance by 1 minute
        assert clock.now() == 1700000060.0
    """

    def __init__(
        self,
        frozen_time: float | None = None,
        frozen_monotonic: float | None = None,
    ) -> None:
        """Initialize with specific frozen times.

        Args:
            frozen_time: Unix timestamp to freeze at. Defaults to current time.
            frozen_monotonic: Monotonic value to freeze at. Defaults to 0.0.
        """
        self._time = frozen_time if frozen_time is not None else _time.time()
        self._monotonic = frozen_monotonic if frozen_monotonic is not None else 0.0

    def now(self) -> float:
        """Return the frozen time."""
        return self._time

    def monotonic(self) -> float:
        """Return the frozen monotonic value."""
        return self._monotonic

    def advance(self, seconds: float) -> None:
        """Advance the frozen time by the given number of seconds.

        Args:
            seconds: Number of seconds to advance (can be negative).
        """
        self._time += seconds
        self._monotonic += seconds

    def set_time(self, timestamp: float) -> None:
        """Set the frozen time to a specific timestamp.

        Args:
            timestamp: Unix timestamp to set.
        """
        self._time = timestamp

    def set_monotonic(self, value: float) -> None:
        """Set the frozen monotonic value.

        Args:
            value: Monotonic value to set.
        """
        self._monotonic = value

Functions

__init__
__init__(frozen_time: float | None = None, frozen_monotonic: float | None = None) -> None

Initialize with specific frozen times.

Parameters:

Name Type Description Default
frozen_time float | None

Unix timestamp to freeze at. Defaults to current time.

None
frozen_monotonic float | None

Monotonic value to freeze at. Defaults to 0.0.

None
Source code in snakesee/state/clock.py
def __init__(
    self,
    frozen_time: float | None = None,
    frozen_monotonic: float | None = None,
) -> None:
    """Initialize with specific frozen times.

    Args:
        frozen_time: Unix timestamp to freeze at. Defaults to current time.
        frozen_monotonic: Monotonic value to freeze at. Defaults to 0.0.
    """
    self._time = frozen_time if frozen_time is not None else _time.time()
    self._monotonic = frozen_monotonic if frozen_monotonic is not None else 0.0
advance
advance(seconds: float) -> None

Advance the frozen time by the given number of seconds.

Parameters:

Name Type Description Default
seconds float

Number of seconds to advance (can be negative).

required
Source code in snakesee/state/clock.py
def advance(self, seconds: float) -> None:
    """Advance the frozen time by the given number of seconds.

    Args:
        seconds: Number of seconds to advance (can be negative).
    """
    self._time += seconds
    self._monotonic += seconds
monotonic
monotonic() -> float

Return the frozen monotonic value.

Source code in snakesee/state/clock.py
def monotonic(self) -> float:
    """Return the frozen monotonic value."""
    return self._monotonic
now
now() -> float

Return the frozen time.

Source code in snakesee/state/clock.py
def now(self) -> float:
    """Return the frozen time."""
    return self._time
set_monotonic
set_monotonic(value: float) -> None

Set the frozen monotonic value.

Parameters:

Name Type Description Default
value float

Monotonic value to set.

required
Source code in snakesee/state/clock.py
def set_monotonic(self, value: float) -> None:
    """Set the frozen monotonic value.

    Args:
        value: Monotonic value to set.
    """
    self._monotonic = value
set_time
set_time(timestamp: float) -> None

Set the frozen time to a specific timestamp.

Parameters:

Name Type Description Default
timestamp float

Unix timestamp to set.

required
Source code in snakesee/state/clock.py
def set_time(self, timestamp: float) -> None:
    """Set the frozen time to a specific timestamp.

    Args:
        timestamp: Unix timestamp to set.
    """
    self._time = timestamp

Job dataclass

Mutable job state container.

Unlike the frozen JobInfo dataclass, this class allows state updates as jobs progress through their lifecycle.

Attributes:

Name Type Description
key str

Unique key for this job (rule + output hash or job_id).

rule str

The Snakemake rule name.

status JobStatus

Current job status.

job_id str | None

Snakemake job ID (may be None initially).

start_time float | None

Unix timestamp when job started.

end_time float | None

Unix timestamp when job completed.

wildcards dict[str, str]

Dictionary of wildcard values.

threads int | None

Number of threads allocated.

input_size int | None

Total input file size in bytes.

log_file Path | None

Path to job's log file.

Source code in snakesee/state/job_registry.py
@dataclass
class Job:
    """Mutable job state container.

    Unlike the frozen JobInfo dataclass, this class allows state updates
    as jobs progress through their lifecycle.

    Attributes:
        key: Unique key for this job (rule + output hash or job_id).
        rule: The Snakemake rule name.
        status: Current job status.
        job_id: Snakemake job ID (may be None initially).
        start_time: Unix timestamp when job started.
        end_time: Unix timestamp when job completed.
        wildcards: Dictionary of wildcard values.
        threads: Number of threads allocated.
        input_size: Total input file size in bytes.
        log_file: Path to job's log file.
    """

    key: str
    rule: str
    status: JobStatus = JobStatus.PENDING
    job_id: str | None = None
    start_time: float | None = None
    end_time: float | None = None
    wildcards: dict[str, str] = field(default_factory=dict)
    threads: int | None = None
    input_size: int | None = None
    log_file: Path | None = None
    stats_recorded: bool = False

    @property
    def elapsed(self) -> float | None:
        """Elapsed time in seconds since job started."""
        if self.start_time is None:
            return None
        if self.end_time is not None:
            return self.end_time - self.start_time
        from snakesee.state.clock import get_clock

        return get_clock().now() - self.start_time

    @property
    def duration(self) -> float | None:
        """Total duration for completed jobs."""
        if self.start_time is not None and self.end_time is not None:
            return self.end_time - self.start_time
        return None

    def to_job_info(self) -> JobInfo:
        """Convert to immutable JobInfo for backward compatibility."""
        from snakesee.models import JobInfo

        return JobInfo(
            rule=self.rule,
            job_id=self.job_id,
            wildcards=self.wildcards if self.wildcards else None,
            start_time=self.start_time,
            end_time=self.end_time,
            input_size=self.input_size,
            threads=self.threads,
            log_file=self.log_file,
        )

    @classmethod
    def from_job_info(cls, job_info: JobInfo, key: str | None = None) -> Job:
        """Create a Job from a JobInfo instance."""
        # Determine status based on JobInfo fields
        if job_info.end_time is not None:
            status = JobStatus.COMPLETED
        elif job_info.start_time is not None:
            status = JobStatus.RUNNING
        else:
            status = JobStatus.PENDING

        # Generate key if not provided
        if key is None:
            # Use job_id if available, otherwise use deterministic hash
            # This matches apply_job_info() to avoid duplicate entries
            key = job_info.job_id or f"{job_info.rule}:{hash(job_info)}"

        return cls(
            key=key,
            rule=job_info.rule,
            status=status,
            job_id=job_info.job_id,
            start_time=job_info.start_time,
            end_time=job_info.end_time,
            wildcards=dict(job_info.wildcards) if job_info.wildcards else {},
            threads=job_info.threads,
            input_size=job_info.input_size,
            log_file=job_info.log_file,
        )

Attributes

duration property
duration: float | None

Total duration for completed jobs.

elapsed property
elapsed: float | None

Elapsed time in seconds since job started.

Functions

from_job_info classmethod
from_job_info(job_info: JobInfo, key: str | None = None) -> Job

Create a Job from a JobInfo instance.

Source code in snakesee/state/job_registry.py
@classmethod
def from_job_info(cls, job_info: JobInfo, key: str | None = None) -> Job:
    """Create a Job from a JobInfo instance."""
    # Determine status based on JobInfo fields
    if job_info.end_time is not None:
        status = JobStatus.COMPLETED
    elif job_info.start_time is not None:
        status = JobStatus.RUNNING
    else:
        status = JobStatus.PENDING

    # Generate key if not provided
    if key is None:
        # Use job_id if available, otherwise use deterministic hash
        # This matches apply_job_info() to avoid duplicate entries
        key = job_info.job_id or f"{job_info.rule}:{hash(job_info)}"

    return cls(
        key=key,
        rule=job_info.rule,
        status=status,
        job_id=job_info.job_id,
        start_time=job_info.start_time,
        end_time=job_info.end_time,
        wildcards=dict(job_info.wildcards) if job_info.wildcards else {},
        threads=job_info.threads,
        input_size=job_info.input_size,
        log_file=job_info.log_file,
    )
to_job_info
to_job_info() -> JobInfo

Convert to immutable JobInfo for backward compatibility.

Source code in snakesee/state/job_registry.py
def to_job_info(self) -> JobInfo:
    """Convert to immutable JobInfo for backward compatibility."""
    from snakesee.models import JobInfo

    return JobInfo(
        rule=self.rule,
        job_id=self.job_id,
        wildcards=self.wildcards if self.wildcards else None,
        start_time=self.start_time,
        end_time=self.end_time,
        input_size=self.input_size,
        threads=self.threads,
        log_file=self.log_file,
    )

JobRegistry

Central registry for all job state.

Provides O(1) lookup by job key or job_id, and efficient iteration over jobs by status.

Example

registry = JobRegistry() job = registry.get_or_create("job_1", "align") job.status = JobStatus.RUNNING job.start_time = time.time() registry.update_indexes(job) running = registry.running() # [job]

Source code in snakesee/state/job_registry.py
class JobRegistry:
    """Central registry for all job state.

    Provides O(1) lookup by job key or job_id, and efficient iteration
    over jobs by status.

    Example:
        >>> registry = JobRegistry()
        >>> job = registry.get_or_create("job_1", "align")
        >>> job.status = JobStatus.RUNNING
        >>> job.start_time = time.time()
        >>> registry.update_indexes(job)
        >>> running = registry.running()  # [job]
    """

    def __init__(self) -> None:
        """Initialize empty registry."""
        self._lock = threading.RLock()
        self._jobs: dict[str, Job] = {}
        self._by_job_id: dict[str, str] = {}  # job_id -> key
        self._by_status: dict[JobStatus, set[str]] = {status: set() for status in JobStatus}
        self._by_rule: dict[str, set[str]] = {}

    def __len__(self) -> int:
        """Return number of jobs in registry."""
        with self._lock:
            return len(self._jobs)

    def __contains__(self, key: str) -> bool:
        """Check if job exists by key."""
        with self._lock:
            return key in self._jobs

    def get(self, key: str) -> Job | None:
        """Get job by key."""
        with self._lock:
            return self._jobs.get(key)

    def get_by_job_id(self, job_id: str) -> Job | None:
        """Get job by Snakemake job_id."""
        with self._lock:
            key = self._by_job_id.get(job_id)
            return self._jobs.get(key) if key else None

    def get_or_create(self, key: str, rule: str) -> tuple[Job, bool]:
        """Get existing job or create a new one.

        Args:
            key: Unique job key.
            rule: Rule name for new job.

        Returns:
            Tuple of (job, created) where created is True if job was new.
        """
        with self._lock:
            if key in self._jobs:
                return self._jobs[key], False

            job = Job(key=key, rule=rule)
            self._add_unlocked(job)
            return job, True

    def add(self, job: Job) -> None:
        """Add a job to the registry."""
        with self._lock:
            self._add_unlocked(job)

    def _add_unlocked(self, job: Job) -> None:
        """Internal add with index updates (caller must hold lock)."""
        self._jobs[job.key] = job
        self._by_status[job.status].add(job.key)

        if job.job_id is not None:
            self._by_job_id[job.job_id] = job.key

        if job.rule not in self._by_rule:
            self._by_rule[job.rule] = set()
        self._by_rule[job.rule].add(job.key)

    def update_indexes(self, job: Job, old_status: JobStatus | None = None) -> None:
        """Update indexes after job state change.

        Args:
            job: The job that was updated.
            old_status: Previous status if status changed, for index update.
        """
        with self._lock:
            self._update_indexes_unlocked(job, old_status)

    def _update_indexes_unlocked(self, job: Job, old_status: JobStatus | None = None) -> None:
        """Update indexes without acquiring lock (caller must hold lock)."""
        # Update job_id index if needed
        if job.job_id is not None and job.job_id not in self._by_job_id:
            self._by_job_id[job.job_id] = job.key

        # Update status index if status changed
        if old_status is not None and old_status != job.status:
            self._by_status[old_status].discard(job.key)
            self._by_status[job.status].add(job.key)

    def set_status(self, job: Job, status: JobStatus) -> None:
        """Update job status and indexes."""
        with self._lock:
            old_status = job.status
            job.status = status
            self._update_indexes_unlocked(job, old_status)

    def running(self) -> list[Job]:
        """Get all running jobs."""
        with self._lock:
            return [self._jobs[key] for key in self._by_status[JobStatus.RUNNING]]

    def completed(self) -> list[Job]:
        """Get all completed jobs."""
        with self._lock:
            return [self._jobs[key] for key in self._by_status[JobStatus.COMPLETED]]

    def failed(self) -> list[Job]:
        """Get all failed jobs."""
        with self._lock:
            return [self._jobs[key] for key in self._by_status[JobStatus.FAILED]]

    def incomplete(self) -> list[Job]:
        """Get all incomplete jobs."""
        with self._lock:
            return [self._jobs[key] for key in self._by_status[JobStatus.INCOMPLETE]]

    def submitted(self) -> list[Job]:
        """Get all submitted jobs that haven't started yet."""
        with self._lock:
            return [self._jobs[key] for key in self._by_status[JobStatus.SUBMITTED]]

    def pending(self) -> list[Job]:
        """Get all pending jobs (not yet submitted)."""
        with self._lock:
            return [self._jobs[key] for key in self._by_status[JobStatus.PENDING]]

    def by_rule(self, rule: str) -> list[Job]:
        """Get all jobs for a specific rule."""
        with self._lock:
            keys = self._by_rule.get(rule, set())
            return [self._jobs[key] for key in keys]

    def all_jobs(self) -> list[Job]:
        """Get all jobs in the registry."""
        with self._lock:
            return list(self._jobs.values())

    def running_job_infos(self) -> list[JobInfo]:
        """Get running jobs as JobInfo for backward compatibility."""
        return [job.to_job_info() for job in self.running()]

    def completed_job_infos(self) -> list[JobInfo]:
        """Get completed jobs as JobInfo for backward compatibility."""
        return [job.to_job_info() for job in self.completed()]

    def failed_job_infos(self) -> list[JobInfo]:
        """Get failed jobs as JobInfo for backward compatibility."""
        return [job.to_job_info() for job in self.failed()]

    def submitted_job_infos(self) -> list[JobInfo]:
        """Get submitted jobs as JobInfo for backward compatibility."""
        return [job.to_job_info() for job in self.submitted()]

    def clear(self) -> None:
        """Clear all jobs from the registry."""
        with self._lock:
            self._jobs.clear()
            self._by_job_id.clear()
            for status_set in self._by_status.values():
                status_set.clear()
            self._by_rule.clear()

    def apply_job_info(self, job_info: JobInfo, key: str | None = None) -> Job:
        """Add or update a job from a JobInfo.

        Args:
            job_info: JobInfo to apply.
            key: Optional key. If None, uses job_id or generates one.

        Returns:
            The created or updated Job.
        """
        with self._lock:
            # Determine key
            if key is None:
                key = job_info.job_id or f"{job_info.rule}:{hash(job_info)}"

            # Look up existing job without additional lock acquisition
            existing = self._jobs.get(key)
            if existing is None and job_info.job_id:
                existing_key = self._by_job_id.get(job_info.job_id)
                if existing_key:
                    existing = self._jobs.get(existing_key)

            if existing:
                # Update existing job
                old_status = existing.status
                if job_info.start_time is not None:
                    existing.start_time = job_info.start_time
                if job_info.end_time is not None:
                    existing.end_time = job_info.end_time
                    existing.status = JobStatus.COMPLETED
                elif job_info.start_time is not None:
                    existing.status = JobStatus.RUNNING
                if job_info.threads is not None:
                    existing.threads = job_info.threads
                if job_info.wildcards:
                    existing.wildcards = dict(job_info.wildcards)
                if job_info.input_size is not None:
                    existing.input_size = job_info.input_size
                if job_info.log_file is not None:
                    existing.log_file = job_info.log_file
                if job_info.job_id is not None and existing.job_id is None:
                    existing.job_id = job_info.job_id
                self._update_indexes_unlocked(existing, old_status)
                return existing
            else:
                # Create new job
                job = Job.from_job_info(job_info, key)
                self._add_unlocked(job)
                return job

    def apply_event(self, event: SnakeseeEvent) -> Job | None:
        """Apply a snakesee event to update job state.

        Args:
            event: The event to apply.

        Returns:
            The updated Job, or None if event couldn't be applied.
        """
        from snakesee.events import EventType

        # Ignore non-job events (e.g., WORKFLOW_STARTED, PROGRESS)
        # These don't have job_id/rule_name and would create synthetic "unknown" jobs
        if event.event_type not in {
            EventType.JOB_SUBMITTED,
            EventType.JOB_STARTED,
            EventType.JOB_FINISHED,
            EventType.JOB_ERROR,
        }:
            return None

        with self._lock:
            # Get rule name from event
            rule_name = event.rule_name or "unknown"

            # Use job_id as key if available (convert int to str for key)
            job_id_str = str(event.job_id) if event.job_id is not None else None
            key = job_id_str or f"{rule_name}:{event.timestamp}"

            # Look up job without additional lock acquisition
            job = self._jobs.get(key)
            if job is None and job_id_str:
                existing_key = self._by_job_id.get(job_id_str)
                if existing_key:
                    job = self._jobs.get(existing_key)

            if job is None:
                # Create new job from event
                job = Job(key=key, rule=rule_name)
                self._add_unlocked(job)
                if job_id_str:
                    job.job_id = job_id_str
                    self._by_job_id[job_id_str] = key
                if event.wildcards:
                    job.wildcards = dict(event.wildcards)
                if event.threads:
                    job.threads = event.threads

            old_status = job.status

            # Update based on event type
            if event.event_type == EventType.JOB_SUBMITTED:
                job.status = JobStatus.SUBMITTED
            elif event.event_type == EventType.JOB_STARTED:
                job.status = JobStatus.RUNNING
                job.start_time = event.timestamp
                if event.threads:
                    job.threads = event.threads
            elif event.event_type == EventType.JOB_FINISHED:
                job.status = JobStatus.COMPLETED
                job.end_time = event.timestamp
            elif event.event_type == EventType.JOB_ERROR:
                job.status = JobStatus.FAILED
                job.end_time = event.timestamp

            self._update_indexes_unlocked(job, old_status)
            return job

    def store_threads(self, job_id: str, threads: int) -> None:
        """Store thread count for a job by job_id.

        This is used when thread info comes from events before the job
        is fully tracked.
        """
        with self._lock:
            key = self._by_job_id.get(job_id)
            if key:
                job = self._jobs.get(key)
                if job:
                    job.threads = threads

Functions

__contains__
__contains__(key: str) -> bool

Check if job exists by key.

Source code in snakesee/state/job_registry.py
def __contains__(self, key: str) -> bool:
    """Check if job exists by key."""
    with self._lock:
        return key in self._jobs
__init__
__init__() -> None

Initialize empty registry.

Source code in snakesee/state/job_registry.py
def __init__(self) -> None:
    """Initialize empty registry."""
    self._lock = threading.RLock()
    self._jobs: dict[str, Job] = {}
    self._by_job_id: dict[str, str] = {}  # job_id -> key
    self._by_status: dict[JobStatus, set[str]] = {status: set() for status in JobStatus}
    self._by_rule: dict[str, set[str]] = {}
__len__
__len__() -> int

Return number of jobs in registry.

Source code in snakesee/state/job_registry.py
def __len__(self) -> int:
    """Return number of jobs in registry."""
    with self._lock:
        return len(self._jobs)
add
add(job: Job) -> None

Add a job to the registry.

Source code in snakesee/state/job_registry.py
def add(self, job: Job) -> None:
    """Add a job to the registry."""
    with self._lock:
        self._add_unlocked(job)
all_jobs
all_jobs() -> list[Job]

Get all jobs in the registry.

Source code in snakesee/state/job_registry.py
def all_jobs(self) -> list[Job]:
    """Get all jobs in the registry."""
    with self._lock:
        return list(self._jobs.values())
apply_event
apply_event(event: SnakeseeEvent) -> Job | None

Apply a snakesee event to update job state.

Parameters:

Name Type Description Default
event SnakeseeEvent

The event to apply.

required

Returns:

Type Description
Job | None

The updated Job, or None if event couldn't be applied.

Source code in snakesee/state/job_registry.py
def apply_event(self, event: SnakeseeEvent) -> Job | None:
    """Apply a snakesee event to update job state.

    Args:
        event: The event to apply.

    Returns:
        The updated Job, or None if event couldn't be applied.
    """
    from snakesee.events import EventType

    # Ignore non-job events (e.g., WORKFLOW_STARTED, PROGRESS)
    # These don't have job_id/rule_name and would create synthetic "unknown" jobs
    if event.event_type not in {
        EventType.JOB_SUBMITTED,
        EventType.JOB_STARTED,
        EventType.JOB_FINISHED,
        EventType.JOB_ERROR,
    }:
        return None

    with self._lock:
        # Get rule name from event
        rule_name = event.rule_name or "unknown"

        # Use job_id as key if available (convert int to str for key)
        job_id_str = str(event.job_id) if event.job_id is not None else None
        key = job_id_str or f"{rule_name}:{event.timestamp}"

        # Look up job without additional lock acquisition
        job = self._jobs.get(key)
        if job is None and job_id_str:
            existing_key = self._by_job_id.get(job_id_str)
            if existing_key:
                job = self._jobs.get(existing_key)

        if job is None:
            # Create new job from event
            job = Job(key=key, rule=rule_name)
            self._add_unlocked(job)
            if job_id_str:
                job.job_id = job_id_str
                self._by_job_id[job_id_str] = key
            if event.wildcards:
                job.wildcards = dict(event.wildcards)
            if event.threads:
                job.threads = event.threads

        old_status = job.status

        # Update based on event type
        if event.event_type == EventType.JOB_SUBMITTED:
            job.status = JobStatus.SUBMITTED
        elif event.event_type == EventType.JOB_STARTED:
            job.status = JobStatus.RUNNING
            job.start_time = event.timestamp
            if event.threads:
                job.threads = event.threads
        elif event.event_type == EventType.JOB_FINISHED:
            job.status = JobStatus.COMPLETED
            job.end_time = event.timestamp
        elif event.event_type == EventType.JOB_ERROR:
            job.status = JobStatus.FAILED
            job.end_time = event.timestamp

        self._update_indexes_unlocked(job, old_status)
        return job
apply_job_info
apply_job_info(job_info: JobInfo, key: str | None = None) -> Job

Add or update a job from a JobInfo.

Parameters:

Name Type Description Default
job_info JobInfo

JobInfo to apply.

required
key str | None

Optional key. If None, uses job_id or generates one.

None

Returns:

Type Description
Job

The created or updated Job.

Source code in snakesee/state/job_registry.py
def apply_job_info(self, job_info: JobInfo, key: str | None = None) -> Job:
    """Add or update a job from a JobInfo.

    Args:
        job_info: JobInfo to apply.
        key: Optional key. If None, uses job_id or generates one.

    Returns:
        The created or updated Job.
    """
    with self._lock:
        # Determine key
        if key is None:
            key = job_info.job_id or f"{job_info.rule}:{hash(job_info)}"

        # Look up existing job without additional lock acquisition
        existing = self._jobs.get(key)
        if existing is None and job_info.job_id:
            existing_key = self._by_job_id.get(job_info.job_id)
            if existing_key:
                existing = self._jobs.get(existing_key)

        if existing:
            # Update existing job
            old_status = existing.status
            if job_info.start_time is not None:
                existing.start_time = job_info.start_time
            if job_info.end_time is not None:
                existing.end_time = job_info.end_time
                existing.status = JobStatus.COMPLETED
            elif job_info.start_time is not None:
                existing.status = JobStatus.RUNNING
            if job_info.threads is not None:
                existing.threads = job_info.threads
            if job_info.wildcards:
                existing.wildcards = dict(job_info.wildcards)
            if job_info.input_size is not None:
                existing.input_size = job_info.input_size
            if job_info.log_file is not None:
                existing.log_file = job_info.log_file
            if job_info.job_id is not None and existing.job_id is None:
                existing.job_id = job_info.job_id
            self._update_indexes_unlocked(existing, old_status)
            return existing
        else:
            # Create new job
            job = Job.from_job_info(job_info, key)
            self._add_unlocked(job)
            return job
by_rule
by_rule(rule: str) -> list[Job]

Get all jobs for a specific rule.

Source code in snakesee/state/job_registry.py
def by_rule(self, rule: str) -> list[Job]:
    """Get all jobs for a specific rule."""
    with self._lock:
        keys = self._by_rule.get(rule, set())
        return [self._jobs[key] for key in keys]
clear
clear() -> None

Clear all jobs from the registry.

Source code in snakesee/state/job_registry.py
def clear(self) -> None:
    """Clear all jobs from the registry."""
    with self._lock:
        self._jobs.clear()
        self._by_job_id.clear()
        for status_set in self._by_status.values():
            status_set.clear()
        self._by_rule.clear()
completed
completed() -> list[Job]

Get all completed jobs.

Source code in snakesee/state/job_registry.py
def completed(self) -> list[Job]:
    """Get all completed jobs."""
    with self._lock:
        return [self._jobs[key] for key in self._by_status[JobStatus.COMPLETED]]
completed_job_infos
completed_job_infos() -> list[JobInfo]

Get completed jobs as JobInfo for backward compatibility.

Source code in snakesee/state/job_registry.py
def completed_job_infos(self) -> list[JobInfo]:
    """Get completed jobs as JobInfo for backward compatibility."""
    return [job.to_job_info() for job in self.completed()]
failed
failed() -> list[Job]

Get all failed jobs.

Source code in snakesee/state/job_registry.py
def failed(self) -> list[Job]:
    """Get all failed jobs."""
    with self._lock:
        return [self._jobs[key] for key in self._by_status[JobStatus.FAILED]]
failed_job_infos
failed_job_infos() -> list[JobInfo]

Get failed jobs as JobInfo for backward compatibility.

Source code in snakesee/state/job_registry.py
def failed_job_infos(self) -> list[JobInfo]:
    """Get failed jobs as JobInfo for backward compatibility."""
    return [job.to_job_info() for job in self.failed()]
get
get(key: str) -> Job | None

Get job by key.

Source code in snakesee/state/job_registry.py
def get(self, key: str) -> Job | None:
    """Get job by key."""
    with self._lock:
        return self._jobs.get(key)
get_by_job_id
get_by_job_id(job_id: str) -> Job | None

Get job by Snakemake job_id.

Source code in snakesee/state/job_registry.py
def get_by_job_id(self, job_id: str) -> Job | None:
    """Get job by Snakemake job_id."""
    with self._lock:
        key = self._by_job_id.get(job_id)
        return self._jobs.get(key) if key else None
get_or_create
get_or_create(key: str, rule: str) -> tuple[Job, bool]

Get existing job or create a new one.

Parameters:

Name Type Description Default
key str

Unique job key.

required
rule str

Rule name for new job.

required

Returns:

Type Description
tuple[Job, bool]

Tuple of (job, created) where created is True if job was new.

Source code in snakesee/state/job_registry.py
def get_or_create(self, key: str, rule: str) -> tuple[Job, bool]:
    """Get existing job or create a new one.

    Args:
        key: Unique job key.
        rule: Rule name for new job.

    Returns:
        Tuple of (job, created) where created is True if job was new.
    """
    with self._lock:
        if key in self._jobs:
            return self._jobs[key], False

        job = Job(key=key, rule=rule)
        self._add_unlocked(job)
        return job, True
incomplete
incomplete() -> list[Job]

Get all incomplete jobs.

Source code in snakesee/state/job_registry.py
def incomplete(self) -> list[Job]:
    """Get all incomplete jobs."""
    with self._lock:
        return [self._jobs[key] for key in self._by_status[JobStatus.INCOMPLETE]]
pending
pending() -> list[Job]

Get all pending jobs (not yet submitted).

Source code in snakesee/state/job_registry.py
def pending(self) -> list[Job]:
    """Get all pending jobs (not yet submitted)."""
    with self._lock:
        return [self._jobs[key] for key in self._by_status[JobStatus.PENDING]]
running
running() -> list[Job]

Get all running jobs.

Source code in snakesee/state/job_registry.py
def running(self) -> list[Job]:
    """Get all running jobs."""
    with self._lock:
        return [self._jobs[key] for key in self._by_status[JobStatus.RUNNING]]
running_job_infos
running_job_infos() -> list[JobInfo]

Get running jobs as JobInfo for backward compatibility.

Source code in snakesee/state/job_registry.py
def running_job_infos(self) -> list[JobInfo]:
    """Get running jobs as JobInfo for backward compatibility."""
    return [job.to_job_info() for job in self.running()]
set_status
set_status(job: Job, status: JobStatus) -> None

Update job status and indexes.

Source code in snakesee/state/job_registry.py
def set_status(self, job: Job, status: JobStatus) -> None:
    """Update job status and indexes."""
    with self._lock:
        old_status = job.status
        job.status = status
        self._update_indexes_unlocked(job, old_status)
store_threads
store_threads(job_id: str, threads: int) -> None

Store thread count for a job by job_id.

This is used when thread info comes from events before the job is fully tracked.

Source code in snakesee/state/job_registry.py
def store_threads(self, job_id: str, threads: int) -> None:
    """Store thread count for a job by job_id.

    This is used when thread info comes from events before the job
    is fully tracked.
    """
    with self._lock:
        key = self._by_job_id.get(job_id)
        if key:
            job = self._jobs.get(key)
            if job:
                job.threads = threads
submitted
submitted() -> list[Job]

Get all submitted jobs that haven't started yet.

Source code in snakesee/state/job_registry.py
def submitted(self) -> list[Job]:
    """Get all submitted jobs that haven't started yet."""
    with self._lock:
        return [self._jobs[key] for key in self._by_status[JobStatus.SUBMITTED]]
submitted_job_infos
submitted_job_infos() -> list[JobInfo]

Get submitted jobs as JobInfo for backward compatibility.

Source code in snakesee/state/job_registry.py
def submitted_job_infos(self) -> list[JobInfo]:
    """Get submitted jobs as JobInfo for backward compatibility."""
    return [job.to_job_info() for job in self.submitted()]
update_indexes
update_indexes(job: Job, old_status: JobStatus | None = None) -> None

Update indexes after job state change.

Parameters:

Name Type Description Default
job Job

The job that was updated.

required
old_status JobStatus | None

Previous status if status changed, for index update.

None
Source code in snakesee/state/job_registry.py
def update_indexes(self, job: Job, old_status: JobStatus | None = None) -> None:
    """Update indexes after job state change.

    Args:
        job: The job that was updated.
        old_status: Previous status if status changed, for index update.
    """
    with self._lock:
        self._update_indexes_unlocked(job, old_status)

JobStatus

Bases: Enum

Status of a job in the workflow.

Source code in snakesee/state/job_registry.py
class JobStatus(Enum):
    """Status of a job in the workflow."""

    PENDING = "pending"
    SUBMITTED = "submitted"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    INCOMPLETE = "incomplete"

OffsetClock

Clock with a fixed offset from system time.

Useful for simulating time shifts without fully freezing time.

Attributes:

Name Type Description
offset

Seconds to add to system time.

Source code in snakesee/state/clock.py
class OffsetClock:
    """Clock with a fixed offset from system time.

    Useful for simulating time shifts without fully freezing time.

    Attributes:
        offset: Seconds to add to system time.
    """

    def __init__(self, offset: float = 0.0) -> None:
        """Initialize with an offset.

        Args:
            offset: Seconds to add to current time (negative for past).
        """
        self._offset = offset

    def now(self) -> float:
        """Return system time plus offset."""
        return _time.time() + self._offset

    def monotonic(self) -> float:
        """Return system monotonic (offset doesn't apply to durations)."""
        return _time.monotonic()

Functions

__init__
__init__(offset: float = 0.0) -> None

Initialize with an offset.

Parameters:

Name Type Description Default
offset float

Seconds to add to current time (negative for past).

0.0
Source code in snakesee/state/clock.py
def __init__(self, offset: float = 0.0) -> None:
    """Initialize with an offset.

    Args:
        offset: Seconds to add to current time (negative for past).
    """
    self._offset = offset
monotonic
monotonic() -> float

Return system monotonic (offset doesn't apply to durations).

Source code in snakesee/state/clock.py
def monotonic(self) -> float:
    """Return system monotonic (offset doesn't apply to durations)."""
    return _time.monotonic()
now
now() -> float

Return system time plus offset.

Source code in snakesee/state/clock.py
def now(self) -> float:
    """Return system time plus offset."""
    return _time.time() + self._offset

RuleRegistry

Central registry for all rule timing statistics.

Consolidates what was previously scattered across TimeEstimator's rule_stats, thread_stats, and wildcard_stats dictionaries.

Example

registry = RuleRegistry() stats = registry.get_or_create("align") stats.record_completion(duration=100.0, timestamp=time.time(), threads=4) mean, var = registry.get_estimate("align", threads=4)

Source code in snakesee/state/rule_registry.py
class RuleRegistry:
    """Central registry for all rule timing statistics.

    Consolidates what was previously scattered across TimeEstimator's
    rule_stats, thread_stats, and wildcard_stats dictionaries.

    Example:
        >>> registry = RuleRegistry()
        >>> stats = registry.get_or_create("align")
        >>> stats.record_completion(duration=100.0, timestamp=time.time(), threads=4)
        >>> mean, var = registry.get_estimate("align", threads=4)
    """

    def __init__(self, config: EstimationConfig | None = None) -> None:
        """Initialize empty registry.

        Args:
            config: Optional estimation configuration.
        """
        from snakesee.state.config import DEFAULT_CONFIG

        self.config = config or DEFAULT_CONFIG
        self._lock = threading.RLock()
        self._rules: dict[str, RuleStatistics] = {}
        self._global_mean_cache: float | None = None
        self._cache_valid: bool = False

    def __len__(self) -> int:
        """Return number of rules in registry."""
        with self._lock:
            return len(self._rules)

    def __contains__(self, rule: str) -> bool:
        """Check if rule exists in registry."""
        with self._lock:
            return rule in self._rules

    def get(self, rule: str) -> RuleStatistics | None:
        """Get statistics for a rule."""
        with self._lock:
            return self._rules.get(rule)

    def get_or_create(self, rule: str) -> RuleStatistics:
        """Get existing statistics or create new ones.

        Args:
            rule: Rule name.

        Returns:
            RuleStatistics for the rule.
        """
        with self._lock:
            if rule not in self._rules:
                self._rules[rule] = RuleStatistics(rule=rule)
                self._cache_valid = False
            return self._rules[rule]

    def record_completion(
        self,
        rule: str,
        duration: float,
        timestamp: float,
        threads: int | None = None,
        wildcards: dict[str, str] | None = None,
        input_size: int | None = None,
    ) -> None:
        """Record a job completion.

        Args:
            rule: Rule name.
            duration: Job duration in seconds.
            timestamp: Completion timestamp.
            threads: Number of threads used.
            wildcards: Wildcard values.
            input_size: Input file size.
        """
        with self._lock:
            if rule not in self._rules:
                self._rules[rule] = RuleStatistics(rule=rule)
            stats = self._rules[rule]
            stats.record_completion(duration, timestamp, threads, wildcards, input_size)
            self._cache_valid = False

    def record_job_completion(self, job: Job) -> None:
        """Record completion from a Job object.

        Args:
            job: Completed job with timing data.
        """
        if job.duration is None:
            return

        # record_completion already acquires lock
        self.record_completion(
            rule=job.rule,
            duration=job.duration,
            timestamp=job.end_time or 0.0,
            threads=job.threads,
            wildcards=job.wildcards if job.wildcards else None,
            input_size=job.input_size,
        )

    def global_mean_duration(self) -> float:
        """Get global mean duration across all rules.

        Returns:
            Mean duration in seconds, or config default if no data.
        """
        with self._lock:
            if self._cache_valid and self._global_mean_cache is not None:
                return self._global_mean_cache

            all_durations: list[float] = []
            for stats in self._rules.values():
                all_durations.extend(stats.aggregate.durations)

            if all_durations:
                self._global_mean_cache = sum(all_durations) / len(all_durations)
            else:
                self._global_mean_cache = self.config.default_global_mean

            self._cache_valid = True
            return self._global_mean_cache

    def set_expected_counts(self, counts: dict[str, int]) -> None:
        """Set expected job counts for rules.

        Args:
            counts: Dictionary of rule name to expected count.
        """
        with self._lock:
            for rule, count in counts.items():
                if rule not in self._rules:
                    self._rules[rule] = RuleStatistics(rule=rule)
                self._rules[rule].expected_count = count

    def clear(self) -> None:
        """Clear all statistics."""
        with self._lock:
            self._rules.clear()
            self._cache_valid = False
            self._global_mean_cache = None

    # Backward compatibility methods

    def to_rule_stats_dict(self) -> dict[str, RuleTimingStats]:
        """Convert to dict for backward compatibility with TimeEstimator."""
        with self._lock:
            return {name: stats.aggregate for name, stats in self._rules.items()}

    def to_thread_stats_dict(self) -> dict[str, ThreadTimingStats]:
        """Convert to thread stats dict for backward compatibility."""
        with self._lock:
            result: dict[str, ThreadTimingStats] = {}
            for name, stats in self._rules.items():
                if stats.by_threads is not None:
                    result[name] = stats.by_threads
            return result

    def to_wildcard_stats_dict(self) -> dict[str, dict[str, WildcardTimingStats]]:
        """Convert to wildcard stats dict for backward compatibility."""
        with self._lock:
            result: dict[str, dict[str, WildcardTimingStats]] = {}
            for name, stats in self._rules.items():
                if stats.by_wildcard:
                    result[name] = stats.by_wildcard
            return result

    def load_from_rule_stats(
        self,
        rule_stats: dict[str, RuleTimingStats],
        thread_stats: dict[str, ThreadTimingStats] | None = None,
        wildcard_stats: dict[str, dict[str, WildcardTimingStats]] | None = None,
    ) -> None:
        """Load from existing stats dictionaries.

        Args:
            rule_stats: Aggregate rule timing stats.
            thread_stats: Thread-specific stats.
            wildcard_stats: Wildcard-specific stats.
        """
        with self._lock:
            for rule, stats in rule_stats.items():
                if rule not in self._rules:
                    self._rules[rule] = RuleStatistics(rule=rule)
                self._rules[rule].aggregate = stats

            if thread_stats:
                for rule, tstats in thread_stats.items():
                    if rule not in self._rules:
                        self._rules[rule] = RuleStatistics(rule=rule)
                    self._rules[rule].by_threads = tstats

            if wildcard_stats:
                for rule, wstats in wildcard_stats.items():
                    if rule not in self._rules:
                        self._rules[rule] = RuleStatistics(rule=rule)
                    self._rules[rule].by_wildcard = wstats

            self._cache_valid = False

    def all_rules(self) -> list[str]:
        """Get all rule names."""
        with self._lock:
            return list(self._rules.keys())

    def get_combination_stats(
        self,
        rule: str,
        wildcards: dict[str, str] | None,
        threads: int | None,
        min_samples: int | None = None,
    ) -> RuleTimingStats | None:
        """Get timing stats for a specific wildcard+threads combination.

        Args:
            rule: Rule name.
            wildcards: Wildcard values dict.
            threads: Thread count.
            min_samples: Minimum samples required. If None, uses MIN_SAMPLES_FOR_CONDITIONING.
                        Set to 1 to get stats even with a single historical run.

        Returns:
            RuleTimingStats for the combination, or None if not found or insufficient samples.
        """
        from snakesee.constants import MIN_SAMPLES_FOR_CONDITIONING

        if min_samples is None:
            min_samples = MIN_SAMPLES_FOR_CONDITIONING

        combo_key = _make_combination_key(wildcards, threads)
        if combo_key is None:
            return None

        with self._lock:
            stats = self._rules.get(rule)
            if stats is None:
                return None

            combo_stats = stats.by_combination.get(combo_key)
            if combo_stats is None:
                return None

            # Require minimum samples for conditioning
            if combo_stats.count < min_samples:
                return None

            return combo_stats

    def typical_threads(self, rule: str) -> float:
        """Get the typical thread count for a rule.

        Args:
            rule: Rule name.

        Returns:
            Weighted average thread count, or 1.0 if no thread data.
        """
        with self._lock:
            stats = self._rules.get(rule)
            if stats is None:
                return 1.0
            return stats.typical_threads

    def total_sample_count(self) -> int:
        """Get total number of samples across all rules."""
        with self._lock:
            return sum(stats.aggregate.count for stats in self._rules.values())

    def rule_count(self) -> int:
        """Get number of rules in the registry."""
        with self._lock:
            return len(self._rules)

Functions

__contains__
__contains__(rule: str) -> bool

Check if rule exists in registry.

Source code in snakesee/state/rule_registry.py
def __contains__(self, rule: str) -> bool:
    """Check if rule exists in registry."""
    with self._lock:
        return rule in self._rules
__init__
__init__(config: EstimationConfig | None = None) -> None

Initialize empty registry.

Parameters:

Name Type Description Default
config EstimationConfig | None

Optional estimation configuration.

None
Source code in snakesee/state/rule_registry.py
def __init__(self, config: EstimationConfig | None = None) -> None:
    """Initialize empty registry.

    Args:
        config: Optional estimation configuration.
    """
    from snakesee.state.config import DEFAULT_CONFIG

    self.config = config or DEFAULT_CONFIG
    self._lock = threading.RLock()
    self._rules: dict[str, RuleStatistics] = {}
    self._global_mean_cache: float | None = None
    self._cache_valid: bool = False
__len__
__len__() -> int

Return number of rules in registry.

Source code in snakesee/state/rule_registry.py
def __len__(self) -> int:
    """Return number of rules in registry."""
    with self._lock:
        return len(self._rules)
all_rules
all_rules() -> list[str]

Get all rule names.

Source code in snakesee/state/rule_registry.py
def all_rules(self) -> list[str]:
    """Get all rule names."""
    with self._lock:
        return list(self._rules.keys())
clear
clear() -> None

Clear all statistics.

Source code in snakesee/state/rule_registry.py
def clear(self) -> None:
    """Clear all statistics."""
    with self._lock:
        self._rules.clear()
        self._cache_valid = False
        self._global_mean_cache = None
get
get(rule: str) -> RuleStatistics | None

Get statistics for a rule.

Source code in snakesee/state/rule_registry.py
def get(self, rule: str) -> RuleStatistics | None:
    """Get statistics for a rule."""
    with self._lock:
        return self._rules.get(rule)
get_combination_stats
get_combination_stats(rule: str, wildcards: dict[str, str] | None, threads: int | None, min_samples: int | None = None) -> RuleTimingStats | None

Get timing stats for a specific wildcard+threads combination.

Parameters:

Name Type Description Default
rule str

Rule name.

required
wildcards dict[str, str] | None

Wildcard values dict.

required
threads int | None

Thread count.

required
min_samples int | None

Minimum samples required. If None, uses MIN_SAMPLES_FOR_CONDITIONING. Set to 1 to get stats even with a single historical run.

None

Returns:

Type Description
RuleTimingStats | None

RuleTimingStats for the combination, or None if not found or insufficient samples.

Source code in snakesee/state/rule_registry.py
def get_combination_stats(
    self,
    rule: str,
    wildcards: dict[str, str] | None,
    threads: int | None,
    min_samples: int | None = None,
) -> RuleTimingStats | None:
    """Get timing stats for a specific wildcard+threads combination.

    Args:
        rule: Rule name.
        wildcards: Wildcard values dict.
        threads: Thread count.
        min_samples: Minimum samples required. If None, uses MIN_SAMPLES_FOR_CONDITIONING.
                    Set to 1 to get stats even with a single historical run.

    Returns:
        RuleTimingStats for the combination, or None if not found or insufficient samples.
    """
    from snakesee.constants import MIN_SAMPLES_FOR_CONDITIONING

    if min_samples is None:
        min_samples = MIN_SAMPLES_FOR_CONDITIONING

    combo_key = _make_combination_key(wildcards, threads)
    if combo_key is None:
        return None

    with self._lock:
        stats = self._rules.get(rule)
        if stats is None:
            return None

        combo_stats = stats.by_combination.get(combo_key)
        if combo_stats is None:
            return None

        # Require minimum samples for conditioning
        if combo_stats.count < min_samples:
            return None

        return combo_stats
get_or_create
get_or_create(rule: str) -> RuleStatistics

Get existing statistics or create new ones.

Parameters:

Name Type Description Default
rule str

Rule name.

required

Returns:

Type Description
RuleStatistics

RuleStatistics for the rule.

Source code in snakesee/state/rule_registry.py
def get_or_create(self, rule: str) -> RuleStatistics:
    """Get existing statistics or create new ones.

    Args:
        rule: Rule name.

    Returns:
        RuleStatistics for the rule.
    """
    with self._lock:
        if rule not in self._rules:
            self._rules[rule] = RuleStatistics(rule=rule)
            self._cache_valid = False
        return self._rules[rule]
global_mean_duration
global_mean_duration() -> float

Get global mean duration across all rules.

Returns:

Type Description
float

Mean duration in seconds, or config default if no data.

Source code in snakesee/state/rule_registry.py
def global_mean_duration(self) -> float:
    """Get global mean duration across all rules.

    Returns:
        Mean duration in seconds, or config default if no data.
    """
    with self._lock:
        if self._cache_valid and self._global_mean_cache is not None:
            return self._global_mean_cache

        all_durations: list[float] = []
        for stats in self._rules.values():
            all_durations.extend(stats.aggregate.durations)

        if all_durations:
            self._global_mean_cache = sum(all_durations) / len(all_durations)
        else:
            self._global_mean_cache = self.config.default_global_mean

        self._cache_valid = True
        return self._global_mean_cache
load_from_rule_stats
load_from_rule_stats(rule_stats: dict[str, RuleTimingStats], thread_stats: dict[str, ThreadTimingStats] | None = None, wildcard_stats: dict[str, dict[str, WildcardTimingStats]] | None = None) -> None

Load from existing stats dictionaries.

Parameters:

Name Type Description Default
rule_stats dict[str, RuleTimingStats]

Aggregate rule timing stats.

required
thread_stats dict[str, ThreadTimingStats] | None

Thread-specific stats.

None
wildcard_stats dict[str, dict[str, WildcardTimingStats]] | None

Wildcard-specific stats.

None
Source code in snakesee/state/rule_registry.py
def load_from_rule_stats(
    self,
    rule_stats: dict[str, RuleTimingStats],
    thread_stats: dict[str, ThreadTimingStats] | None = None,
    wildcard_stats: dict[str, dict[str, WildcardTimingStats]] | None = None,
) -> None:
    """Load from existing stats dictionaries.

    Args:
        rule_stats: Aggregate rule timing stats.
        thread_stats: Thread-specific stats.
        wildcard_stats: Wildcard-specific stats.
    """
    with self._lock:
        for rule, stats in rule_stats.items():
            if rule not in self._rules:
                self._rules[rule] = RuleStatistics(rule=rule)
            self._rules[rule].aggregate = stats

        if thread_stats:
            for rule, tstats in thread_stats.items():
                if rule not in self._rules:
                    self._rules[rule] = RuleStatistics(rule=rule)
                self._rules[rule].by_threads = tstats

        if wildcard_stats:
            for rule, wstats in wildcard_stats.items():
                if rule not in self._rules:
                    self._rules[rule] = RuleStatistics(rule=rule)
                self._rules[rule].by_wildcard = wstats

        self._cache_valid = False
record_completion
record_completion(rule: str, duration: float, timestamp: float, threads: int | None = None, wildcards: dict[str, str] | None = None, input_size: int | None = None) -> None

Record a job completion.

Parameters:

Name Type Description Default
rule str

Rule name.

required
duration float

Job duration in seconds.

required
timestamp float

Completion timestamp.

required
threads int | None

Number of threads used.

None
wildcards dict[str, str] | None

Wildcard values.

None
input_size int | None

Input file size.

None
Source code in snakesee/state/rule_registry.py
def record_completion(
    self,
    rule: str,
    duration: float,
    timestamp: float,
    threads: int | None = None,
    wildcards: dict[str, str] | None = None,
    input_size: int | None = None,
) -> None:
    """Record a job completion.

    Args:
        rule: Rule name.
        duration: Job duration in seconds.
        timestamp: Completion timestamp.
        threads: Number of threads used.
        wildcards: Wildcard values.
        input_size: Input file size.
    """
    with self._lock:
        if rule not in self._rules:
            self._rules[rule] = RuleStatistics(rule=rule)
        stats = self._rules[rule]
        stats.record_completion(duration, timestamp, threads, wildcards, input_size)
        self._cache_valid = False
record_job_completion
record_job_completion(job: Job) -> None

Record completion from a Job object.

Parameters:

Name Type Description Default
job Job

Completed job with timing data.

required
Source code in snakesee/state/rule_registry.py
def record_job_completion(self, job: Job) -> None:
    """Record completion from a Job object.

    Args:
        job: Completed job with timing data.
    """
    if job.duration is None:
        return

    # record_completion already acquires lock
    self.record_completion(
        rule=job.rule,
        duration=job.duration,
        timestamp=job.end_time or 0.0,
        threads=job.threads,
        wildcards=job.wildcards if job.wildcards else None,
        input_size=job.input_size,
    )
rule_count
rule_count() -> int

Get number of rules in the registry.

Source code in snakesee/state/rule_registry.py
def rule_count(self) -> int:
    """Get number of rules in the registry."""
    with self._lock:
        return len(self._rules)
set_expected_counts
set_expected_counts(counts: dict[str, int]) -> None

Set expected job counts for rules.

Parameters:

Name Type Description Default
counts dict[str, int]

Dictionary of rule name to expected count.

required
Source code in snakesee/state/rule_registry.py
def set_expected_counts(self, counts: dict[str, int]) -> None:
    """Set expected job counts for rules.

    Args:
        counts: Dictionary of rule name to expected count.
    """
    with self._lock:
        for rule, count in counts.items():
            if rule not in self._rules:
                self._rules[rule] = RuleStatistics(rule=rule)
            self._rules[rule].expected_count = count
to_rule_stats_dict
to_rule_stats_dict() -> dict[str, RuleTimingStats]

Convert to dict for backward compatibility with TimeEstimator.

Source code in snakesee/state/rule_registry.py
def to_rule_stats_dict(self) -> dict[str, RuleTimingStats]:
    """Convert to dict for backward compatibility with TimeEstimator."""
    with self._lock:
        return {name: stats.aggregate for name, stats in self._rules.items()}
to_thread_stats_dict
to_thread_stats_dict() -> dict[str, ThreadTimingStats]

Convert to thread stats dict for backward compatibility.

Source code in snakesee/state/rule_registry.py
def to_thread_stats_dict(self) -> dict[str, ThreadTimingStats]:
    """Convert to thread stats dict for backward compatibility."""
    with self._lock:
        result: dict[str, ThreadTimingStats] = {}
        for name, stats in self._rules.items():
            if stats.by_threads is not None:
                result[name] = stats.by_threads
        return result
to_wildcard_stats_dict
to_wildcard_stats_dict() -> dict[str, dict[str, WildcardTimingStats]]

Convert to wildcard stats dict for backward compatibility.

Source code in snakesee/state/rule_registry.py
def to_wildcard_stats_dict(self) -> dict[str, dict[str, WildcardTimingStats]]:
    """Convert to wildcard stats dict for backward compatibility."""
    with self._lock:
        result: dict[str, dict[str, WildcardTimingStats]] = {}
        for name, stats in self._rules.items():
            if stats.by_wildcard:
                result[name] = stats.by_wildcard
        return result
total_sample_count
total_sample_count() -> int

Get total number of samples across all rules.

Source code in snakesee/state/rule_registry.py
def total_sample_count(self) -> int:
    """Get total number of samples across all rules."""
    with self._lock:
        return sum(stats.aggregate.count for stats in self._rules.values())
typical_threads
typical_threads(rule: str) -> float

Get the typical thread count for a rule.

Parameters:

Name Type Description Default
rule str

Rule name.

required

Returns:

Type Description
float

Weighted average thread count, or 1.0 if no thread data.

Source code in snakesee/state/rule_registry.py
def typical_threads(self, rule: str) -> float:
    """Get the typical thread count for a rule.

    Args:
        rule: Rule name.

    Returns:
        Weighted average thread count, or 1.0 if no thread data.
    """
    with self._lock:
        stats = self._rules.get(rule)
        if stats is None:
            return 1.0
        return stats.typical_threads

RuleStatistics dataclass

Complete timing statistics for a single rule.

Combines aggregate, thread-level, and wildcard-level statistics in a single container.

Attributes:

Name Type Description
rule str

The rule name.

aggregate RuleTimingStats

Overall timing statistics for the rule.

by_threads ThreadTimingStats | None

Thread-specific timing statistics.

by_wildcard dict[str, WildcardTimingStats]

Wildcard-specific timing statistics.

by_combination dict[str, RuleTimingStats]

Full wildcard+threads combination statistics.

expected_count int | None

Expected number of jobs for this rule (if known).

Source code in snakesee/state/rule_registry.py
@dataclass
class RuleStatistics:
    """Complete timing statistics for a single rule.

    Combines aggregate, thread-level, and wildcard-level statistics
    in a single container.

    Attributes:
        rule: The rule name.
        aggregate: Overall timing statistics for the rule.
        by_threads: Thread-specific timing statistics.
        by_wildcard: Wildcard-specific timing statistics.
        by_combination: Full wildcard+threads combination statistics.
        expected_count: Expected number of jobs for this rule (if known).
    """

    rule: str
    aggregate: RuleTimingStats = field(default_factory=_make_empty_rule_stats)
    by_threads: ThreadTimingStats | None = None
    by_wildcard: dict[str, WildcardTimingStats] = field(default_factory=dict)
    by_combination: dict[str, RuleTimingStats] = field(default_factory=dict)
    expected_count: int | None = None

    def __post_init__(self) -> None:
        """Ensure aggregate has correct rule name."""
        from snakesee.models import RuleTimingStats

        if self.aggregate.rule != self.rule:
            # Create new RuleTimingStats with correct rule name
            self.aggregate = RuleTimingStats(
                rule=self.rule,
                durations=list(self.aggregate.durations),
                timestamps=list(self.aggregate.timestamps),
                input_sizes=list(self.aggregate.input_sizes),
            )

    @property
    def typical_threads(self) -> float:
        """Weighted average of thread counts from historical data.

        Uses the number of observations at each thread count as weights.
        Defaults to 1.0 if no thread data is available.
        """
        if self.by_threads is None or not self.by_threads.stats_by_threads:
            return 1.0

        total_weight = 0
        weighted_sum = 0.0
        for thread_count, stats in self.by_threads.stats_by_threads.items():
            n = stats.count
            weighted_sum += thread_count * n
            total_weight += n

        return weighted_sum / total_weight if total_weight > 0 else 1.0

    def record_completion(
        self,
        duration: float,
        timestamp: float,
        threads: int | None = None,
        wildcards: dict[str, str] | None = None,
        input_size: int | None = None,
    ) -> None:
        """Record a job completion.

        Args:
            duration: Job duration in seconds.
            timestamp: Completion timestamp.
            threads: Number of threads used.
            wildcards: Wildcard values for the job.
            input_size: Input file size in bytes.
        """
        from snakesee.models import RuleTimingStats
        from snakesee.models import ThreadTimingStats
        from snakesee.models import WildcardTimingStats

        # Update aggregate stats
        self.aggregate.durations.append(duration)
        self.aggregate.timestamps.append(timestamp)
        self.aggregate.input_sizes.append(input_size)

        # Update thread-specific stats
        if threads is not None:
            if self.by_threads is None:
                self.by_threads = ThreadTimingStats(rule=self.rule, stats_by_threads={})
            # Get or create stats for this thread count
            if threads not in self.by_threads.stats_by_threads:
                self.by_threads.stats_by_threads[threads] = RuleTimingStats(
                    rule=f"{self.rule}@{threads}t"
                )
            thread_stats = self.by_threads.stats_by_threads[threads]
            thread_stats.durations.append(duration)
            thread_stats.timestamps.append(timestamp)

        # Update wildcard-specific stats (for each wildcard key)
        if wildcards:
            for wc_key, wc_value in wildcards.items():
                if wc_key not in self.by_wildcard:
                    self.by_wildcard[wc_key] = WildcardTimingStats(
                        rule=self.rule,
                        wildcard_key=wc_key,
                        stats_by_value={},
                    )
                wts = self.by_wildcard[wc_key]
                # Access stats_by_value directly (get_stats_for_value has min sample check)
                if wc_value not in wts.stats_by_value:
                    wts.stats_by_value[wc_value] = RuleTimingStats(
                        rule=f"{self.rule}:{wc_key}={wc_value}"
                    )
                value_stats = wts.stats_by_value[wc_value]
                value_stats.durations.append(duration)
                value_stats.timestamps.append(timestamp)

        # Update combination stats (full wildcard+threads key)
        combo_key = _make_combination_key(wildcards, threads)
        if combo_key:
            if combo_key not in self.by_combination:
                self.by_combination[combo_key] = RuleTimingStats(rule=f"{self.rule}@{combo_key}")
            combo_stats = self.by_combination[combo_key]
            combo_stats.durations.append(duration)
            combo_stats.timestamps.append(timestamp)

Attributes

typical_threads property
typical_threads: float

Weighted average of thread counts from historical data.

Uses the number of observations at each thread count as weights. Defaults to 1.0 if no thread data is available.

Functions

__post_init__
__post_init__() -> None

Ensure aggregate has correct rule name.

Source code in snakesee/state/rule_registry.py
def __post_init__(self) -> None:
    """Ensure aggregate has correct rule name."""
    from snakesee.models import RuleTimingStats

    if self.aggregate.rule != self.rule:
        # Create new RuleTimingStats with correct rule name
        self.aggregate = RuleTimingStats(
            rule=self.rule,
            durations=list(self.aggregate.durations),
            timestamps=list(self.aggregate.timestamps),
            input_sizes=list(self.aggregate.input_sizes),
        )
record_completion
record_completion(duration: float, timestamp: float, threads: int | None = None, wildcards: dict[str, str] | None = None, input_size: int | None = None) -> None

Record a job completion.

Parameters:

Name Type Description Default
duration float

Job duration in seconds.

required
timestamp float

Completion timestamp.

required
threads int | None

Number of threads used.

None
wildcards dict[str, str] | None

Wildcard values for the job.

None
input_size int | None

Input file size in bytes.

None
Source code in snakesee/state/rule_registry.py
def record_completion(
    self,
    duration: float,
    timestamp: float,
    threads: int | None = None,
    wildcards: dict[str, str] | None = None,
    input_size: int | None = None,
) -> None:
    """Record a job completion.

    Args:
        duration: Job duration in seconds.
        timestamp: Completion timestamp.
        threads: Number of threads used.
        wildcards: Wildcard values for the job.
        input_size: Input file size in bytes.
    """
    from snakesee.models import RuleTimingStats
    from snakesee.models import ThreadTimingStats
    from snakesee.models import WildcardTimingStats

    # Update aggregate stats
    self.aggregate.durations.append(duration)
    self.aggregate.timestamps.append(timestamp)
    self.aggregate.input_sizes.append(input_size)

    # Update thread-specific stats
    if threads is not None:
        if self.by_threads is None:
            self.by_threads = ThreadTimingStats(rule=self.rule, stats_by_threads={})
        # Get or create stats for this thread count
        if threads not in self.by_threads.stats_by_threads:
            self.by_threads.stats_by_threads[threads] = RuleTimingStats(
                rule=f"{self.rule}@{threads}t"
            )
        thread_stats = self.by_threads.stats_by_threads[threads]
        thread_stats.durations.append(duration)
        thread_stats.timestamps.append(timestamp)

    # Update wildcard-specific stats (for each wildcard key)
    if wildcards:
        for wc_key, wc_value in wildcards.items():
            if wc_key not in self.by_wildcard:
                self.by_wildcard[wc_key] = WildcardTimingStats(
                    rule=self.rule,
                    wildcard_key=wc_key,
                    stats_by_value={},
                )
            wts = self.by_wildcard[wc_key]
            # Access stats_by_value directly (get_stats_for_value has min sample check)
            if wc_value not in wts.stats_by_value:
                wts.stats_by_value[wc_value] = RuleTimingStats(
                    rule=f"{self.rule}:{wc_key}={wc_value}"
                )
            value_stats = wts.stats_by_value[wc_value]
            value_stats.durations.append(duration)
            value_stats.timestamps.append(timestamp)

    # Update combination stats (full wildcard+threads key)
    combo_key = _make_combination_key(wildcards, threads)
    if combo_key:
        if combo_key not in self.by_combination:
            self.by_combination[combo_key] = RuleTimingStats(rule=f"{self.rule}@{combo_key}")
        combo_stats = self.by_combination[combo_key]
        combo_stats.durations.append(duration)
        combo_stats.timestamps.append(timestamp)

SystemClock

Default clock implementation using system time.

This is the production implementation that delegates to the standard library's time module.

Source code in snakesee/state/clock.py
class SystemClock:
    """Default clock implementation using system time.

    This is the production implementation that delegates to the
    standard library's time module.
    """

    def now(self) -> float:
        """Return current time as Unix timestamp."""
        return _time.time()

    def monotonic(self) -> float:
        """Return monotonic clock value."""
        return _time.monotonic()

Functions

monotonic
monotonic() -> float

Return monotonic clock value.

Source code in snakesee/state/clock.py
def monotonic(self) -> float:
    """Return monotonic clock value."""
    return _time.monotonic()
now
now() -> float

Return current time as Unix timestamp.

Source code in snakesee/state/clock.py
def now(self) -> float:
    """Return current time as Unix timestamp."""
    return _time.time()

TimeConstants dataclass

Time-related constants used throughout the codebase.

Attributes:

Name Type Description
seconds_per_day int

Number of seconds in a day.

seconds_per_hour int

Number of seconds in an hour.

seconds_per_minute int

Number of seconds in a minute.

stale_workflow_threshold float

Seconds before workflow is considered stale.

timing_mismatch_tolerance float

Tolerance for validation timing comparisons.

Source code in snakesee/state/config.py
@dataclass(frozen=True)
class TimeConstants:
    """Time-related constants used throughout the codebase.

    Attributes:
        seconds_per_day: Number of seconds in a day.
        seconds_per_hour: Number of seconds in an hour.
        seconds_per_minute: Number of seconds in a minute.
        stale_workflow_threshold: Seconds before workflow is considered stale.
        timing_mismatch_tolerance: Tolerance for validation timing comparisons.
    """

    seconds_per_day: int = 86400
    seconds_per_hour: int = 3600
    seconds_per_minute: int = 60
    stale_workflow_threshold: float = STALE_WORKFLOW_THRESHOLD_SECONDS
    timing_mismatch_tolerance: float = 5.0  # seconds

VarianceMultipliers dataclass

Variance multipliers for different estimation scenarios.

When only one observation exists, variance is computed as (mean * multiplier)^2. Lower multipliers = tighter confidence bounds.

Attributes:

Name Type Description
exact_thread_match float

High confidence - exact thread count match.

exact_wildcard_match float

High confidence - exact wildcard match.

size_scaled float

Medium confidence - input-size scaling.

rule_fallback float

Standard - rule-level fallback.

aggregate_fallback float

Standard - aggregate across thread counts.

fuzzy_match float

Lower confidence - fuzzy rule matching.

global_fallback float

Lowest confidence - no rule data.

bootstrap float

No progress yet - initial estimate.

Source code in snakesee/state/config.py
@dataclass(frozen=True)
class VarianceMultipliers:
    """Variance multipliers for different estimation scenarios.

    When only one observation exists, variance is computed as (mean * multiplier)^2.
    Lower multipliers = tighter confidence bounds.

    Attributes:
        exact_thread_match: High confidence - exact thread count match.
        exact_wildcard_match: High confidence - exact wildcard match.
        size_scaled: Medium confidence - input-size scaling.
        rule_fallback: Standard - rule-level fallback.
        aggregate_fallback: Standard - aggregate across thread counts.
        fuzzy_match: Lower confidence - fuzzy rule matching.
        global_fallback: Lowest confidence - no rule data.
        bootstrap: No progress yet - initial estimate.
    """

    exact_thread_match: float = 0.2
    exact_wildcard_match: float = 0.2
    size_scaled: float = 0.25
    rule_fallback: float = 0.3
    aggregate_fallback: float = 0.3
    fuzzy_match: float = 0.4
    global_fallback: float = 0.5
    bootstrap: float = 0.5

WorkflowPaths dataclass

Centralized path management for Snakemake workflow directories.

This frozen dataclass provides computed paths for all standard Snakemake directory locations, eliminating ad-hoc path construction.

Attributes:

Name Type Description
workflow_dir Path

Root directory of the workflow (contains .snakemake/).

Example

paths = WorkflowPaths(Path("/my/workflow"))

Access computed paths

if paths.metadata_dir.exists(): for f in paths.get_metadata_files(): process(f)

Find logs

latest = paths.find_latest_log() all_logs = paths.find_all_logs()

Source code in snakesee/state/paths.py
@dataclass(frozen=True)
class WorkflowPaths:
    """Centralized path management for Snakemake workflow directories.

    This frozen dataclass provides computed paths for all standard
    Snakemake directory locations, eliminating ad-hoc path construction.

    Attributes:
        workflow_dir: Root directory of the workflow (contains .snakemake/).

    Example:
        paths = WorkflowPaths(Path("/my/workflow"))

        # Access computed paths
        if paths.metadata_dir.exists():
            for f in paths.get_metadata_files():
                process(f)

        # Find logs
        latest = paths.find_latest_log()
        all_logs = paths.find_all_logs()
    """

    workflow_dir: Path

    # =========================================================================
    # Core directory properties
    # =========================================================================

    @property
    def snakemake_dir(self) -> Path:
        """Path to .snakemake/ directory."""
        return self.workflow_dir / SNAKEMAKE_DIR

    @property
    def metadata_dir(self) -> Path:
        """Path to .snakemake/metadata/ directory."""
        return self.snakemake_dir / METADATA_DIR

    @property
    def log_dir(self) -> Path:
        """Path to .snakemake/log/ directory."""
        return self.snakemake_dir / LOG_DIR

    @property
    def incomplete_dir(self) -> Path:
        """Path to .snakemake/incomplete/ directory."""
        return self.snakemake_dir / INCOMPLETE_DIR

    @property
    def locks_dir(self) -> Path:
        """Path to .snakemake/locks/ directory."""
        return self.snakemake_dir / LOCKS_DIR

    @property
    def metadata_db(self) -> Path:
        """Path to Snakemake's SQLite metadata database."""
        return self.snakemake_dir / METADATA_DB_NAME

    # =========================================================================
    # Event and validation file paths
    # =========================================================================

    @property
    def events_file(self) -> Path:
        """Path to snakesee events file (.snakesee_events.jsonl)."""
        return self.workflow_dir / EVENT_FILE_NAME

    @property
    def validation_log(self) -> Path:
        """Path to validation log file (.snakesee_validation.log)."""
        return self.workflow_dir / VALIDATION_LOG_NAME

    @property
    def default_profile(self) -> Path:
        """Path to default profile file (.snakesee-profile.json)."""
        return self.workflow_dir / DEFAULT_PROFILE_NAME

    # =========================================================================
    # Existence checks
    # =========================================================================

    @property
    def exists(self) -> bool:
        """Check if this is a valid workflow directory."""
        return _cached_exists(self.snakemake_dir)

    @property
    def has_metadata(self) -> bool:
        """Check if metadata directory exists."""
        return _cached_exists(self.metadata_dir)

    @property
    def has_metadata_db(self) -> bool:
        """Whether a SQLite metadata database exists."""
        return _cached_exists(self.metadata_db)

    @property
    def has_logs(self) -> bool:
        """Check if log directory exists and contains logs."""
        if not _cached_exists(self.log_dir):
            return False
        return any(self.log_dir.glob(LOG_GLOB_PATTERN))

    @property
    def has_events(self) -> bool:
        """Check if events file exists and has content."""
        try:
            return self.events_file.stat().st_size > 0
        except OSError:
            return False

    @property
    def has_locks(self) -> bool:
        """Check if locks directory exists and contains files."""
        if not _cached_exists(self.locks_dir):
            return False
        try:
            return any(self.locks_dir.iterdir())
        except OSError:
            return False

    @property
    def has_incomplete(self) -> bool:
        """Check if incomplete directory exists and contains markers."""
        if not _cached_exists(self.incomplete_dir):
            return False
        try:
            return any(self.incomplete_dir.iterdir())
        except OSError:
            return False

    # =========================================================================
    # Log file discovery
    # =========================================================================

    def find_latest_log(self) -> Path | None:
        """Find the most recent snakemake log file.

        Returns:
            Path to the most recent log file, or None if no logs exist.
        """
        if not _cached_exists(self.log_dir):
            return None
        # Files from glob already exist at time of iteration; no need to re-check
        logs = list(self.log_dir.glob(LOG_GLOB_PATTERN))
        if not logs:
            return None
        logs.sort(key=safe_mtime)
        return logs[-1]

    def find_all_logs(self) -> list[Path]:
        """Find all snakemake log files, sorted by modification time.

        Returns:
            List of paths sorted oldest to newest.
        """
        if not _cached_exists(self.log_dir):
            return []
        # Files from glob already exist at time of iteration; no need to re-check
        logs = list(self.log_dir.glob(LOG_GLOB_PATTERN))
        logs.sort(key=safe_mtime)
        return logs

    def find_logs_sorted_newest_first(self) -> list[Path]:
        """Find all snakemake log files, sorted newest first.

        Returns:
            List of paths sorted newest to oldest.
        """
        logs = self.find_all_logs()
        logs.reverse()
        return logs

    # =========================================================================
    # Metadata file discovery
    # =========================================================================

    def get_metadata_files(self) -> Iterator[Path]:
        """Iterate over all metadata files.

        Yields:
            Path to each metadata file.
        """
        if not _cached_exists(self.metadata_dir):
            return
        for f in self.metadata_dir.rglob("*"):
            if f.is_file():
                yield f

    def count_metadata_files(self) -> int:
        """Count the number of metadata files.

        Returns:
            Number of metadata files.
        """
        if not _cached_exists(self.metadata_dir):
            return 0
        return sum(1 for f in self.metadata_dir.rglob("*") if f.is_file())

    # =========================================================================
    # Incomplete marker handling
    # =========================================================================

    def get_incomplete_markers(self) -> Iterator[Path]:
        """Iterate over incomplete job markers.

        Yields:
            Path to each incomplete marker file.
        """
        if not _cached_exists(self.incomplete_dir):
            return
        for marker in self.incomplete_dir.rglob("*"):
            if marker.is_file() and marker.name != "migration_underway":
                yield marker

    def decode_incomplete_marker(self, marker: Path) -> Path | None:
        """Decode an incomplete marker filename to get the output path.

        Args:
            marker: Path to the marker file.

        Returns:
            Decoded output file path, or None if decoding fails.
        """
        try:
            decoded = base64.b64decode(marker.name).decode("utf-8")
            return Path(decoded)
        except (ValueError, UnicodeDecodeError):
            return None

    # =========================================================================
    # Job log discovery
    # =========================================================================

    def get_job_log(
        self,
        rule: str,
        wildcards: dict[str, str] | None = None,
        job_id: int | str | None = None,
    ) -> Path | None:
        """Find the log file for a specific job.

        Searches common log locations for a file matching the rule
        and optional wildcards/job_id.

        Args:
            rule: Name of the rule.
            wildcards: Optional wildcard values.
            job_id: Optional job ID.

        Returns:
            Path to the log file if found, None otherwise.
        """
        search_paths: list[Path] = []

        # .snakemake/log/ directory
        if _cached_exists(self.log_dir):
            search_paths.extend(self.log_dir.glob(f"*{rule}*"))
            if job_id is not None:
                search_paths.extend(self.log_dir.glob(f"*job{job_id}*"))

        # logs/ directory (common convention)
        logs_dir = self.workflow_dir / "logs"
        search_paths.extend(self._search_log_dir(logs_dir, rule, wildcards))

        # log/ directory (another common convention)
        log_dir = self.workflow_dir / "log"
        search_paths.extend(self._search_log_dir(log_dir, rule, wildcards))

        # Sort by modification time (newest first) and return first match
        # is_file() already confirms existence, no need for additional exists check
        existing_logs = [p for p in search_paths if p.is_file()]
        if existing_logs:
            existing_logs.sort(key=safe_mtime, reverse=True)
            return existing_logs[0]

        return None

    def _search_log_dir(
        self,
        log_dir: Path,
        rule: str,
        wildcards: dict[str, str] | None,
    ) -> list[Path]:
        """Search a log directory for matching logs."""
        paths: list[Path] = []
        if not _cached_exists(log_dir):
            return paths

        paths.extend(log_dir.glob(f"**/{rule}*"))

        rule_log_dir = log_dir / rule
        if _cached_exists(rule_log_dir):
            paths.extend(rule_log_dir.glob("*"))

        if wildcards:
            for wc_value in wildcards.values():
                if wc_value:
                    paths.extend(log_dir.glob(f"**/*{wc_value}*"))

        return paths

    # =========================================================================
    # Profile discovery
    # =========================================================================

    def find_profile(self, max_levels: int = 6) -> Path | None:
        """Search for a profile file in workflow and parent directories.

        Args:
            max_levels: Maximum parent levels to search (including current).

        Returns:
            Path to the found profile, or None if not found.
        """
        current = self.workflow_dir.resolve()
        for _ in range(max_levels):
            profile_path = current / DEFAULT_PROFILE_NAME
            if _cached_exists(profile_path):
                return profile_path
            if current.parent == current:
                break
            current = current.parent
        return None

    # =========================================================================
    # Validation
    # =========================================================================

    def validate(self) -> None:
        """Validate that this is a valid workflow directory.

        Raises:
            ValueError: If .snakemake directory doesn't exist.
        """
        if not _cached_exists(self.snakemake_dir):
            raise ValueError(f"No .snakemake directory found in {self.workflow_dir}")

Attributes

default_profile property
default_profile: Path

Path to default profile file (.snakesee-profile.json).

events_file property
events_file: Path

Path to snakesee events file (.snakesee_events.jsonl).

exists property
exists: bool

Check if this is a valid workflow directory.

has_events property
has_events: bool

Check if events file exists and has content.

has_incomplete property
has_incomplete: bool

Check if incomplete directory exists and contains markers.

has_locks property
has_locks: bool

Check if locks directory exists and contains files.

has_logs property
has_logs: bool

Check if log directory exists and contains logs.

has_metadata property
has_metadata: bool

Check if metadata directory exists.

has_metadata_db property
has_metadata_db: bool

Whether a SQLite metadata database exists.

incomplete_dir property
incomplete_dir: Path

Path to .snakemake/incomplete/ directory.

locks_dir property
locks_dir: Path

Path to .snakemake/locks/ directory.

log_dir property
log_dir: Path

Path to .snakemake/log/ directory.

metadata_db property
metadata_db: Path

Path to Snakemake's SQLite metadata database.

metadata_dir property
metadata_dir: Path

Path to .snakemake/metadata/ directory.

snakemake_dir property
snakemake_dir: Path

Path to .snakemake/ directory.

validation_log property
validation_log: Path

Path to validation log file (.snakesee_validation.log).

Functions

count_metadata_files
count_metadata_files() -> int

Count the number of metadata files.

Returns:

Type Description
int

Number of metadata files.

Source code in snakesee/state/paths.py
def count_metadata_files(self) -> int:
    """Count the number of metadata files.

    Returns:
        Number of metadata files.
    """
    if not _cached_exists(self.metadata_dir):
        return 0
    return sum(1 for f in self.metadata_dir.rglob("*") if f.is_file())
decode_incomplete_marker
decode_incomplete_marker(marker: Path) -> Path | None

Decode an incomplete marker filename to get the output path.

Parameters:

Name Type Description Default
marker Path

Path to the marker file.

required

Returns:

Type Description
Path | None

Decoded output file path, or None if decoding fails.

Source code in snakesee/state/paths.py
def decode_incomplete_marker(self, marker: Path) -> Path | None:
    """Decode an incomplete marker filename to get the output path.

    Args:
        marker: Path to the marker file.

    Returns:
        Decoded output file path, or None if decoding fails.
    """
    try:
        decoded = base64.b64decode(marker.name).decode("utf-8")
        return Path(decoded)
    except (ValueError, UnicodeDecodeError):
        return None
find_all_logs
find_all_logs() -> list[Path]

Find all snakemake log files, sorted by modification time.

Returns:

Type Description
list[Path]

List of paths sorted oldest to newest.

Source code in snakesee/state/paths.py
def find_all_logs(self) -> list[Path]:
    """Find all snakemake log files, sorted by modification time.

    Returns:
        List of paths sorted oldest to newest.
    """
    if not _cached_exists(self.log_dir):
        return []
    # Files from glob already exist at time of iteration; no need to re-check
    logs = list(self.log_dir.glob(LOG_GLOB_PATTERN))
    logs.sort(key=safe_mtime)
    return logs
find_latest_log
find_latest_log() -> Path | None

Find the most recent snakemake log file.

Returns:

Type Description
Path | None

Path to the most recent log file, or None if no logs exist.

Source code in snakesee/state/paths.py
def find_latest_log(self) -> Path | None:
    """Find the most recent snakemake log file.

    Returns:
        Path to the most recent log file, or None if no logs exist.
    """
    if not _cached_exists(self.log_dir):
        return None
    # Files from glob already exist at time of iteration; no need to re-check
    logs = list(self.log_dir.glob(LOG_GLOB_PATTERN))
    if not logs:
        return None
    logs.sort(key=safe_mtime)
    return logs[-1]
find_logs_sorted_newest_first
find_logs_sorted_newest_first() -> list[Path]

Find all snakemake log files, sorted newest first.

Returns:

Type Description
list[Path]

List of paths sorted newest to oldest.

Source code in snakesee/state/paths.py
def find_logs_sorted_newest_first(self) -> list[Path]:
    """Find all snakemake log files, sorted newest first.

    Returns:
        List of paths sorted newest to oldest.
    """
    logs = self.find_all_logs()
    logs.reverse()
    return logs
find_profile
find_profile(max_levels: int = 6) -> Path | None

Search for a profile file in workflow and parent directories.

Parameters:

Name Type Description Default
max_levels int

Maximum parent levels to search (including current).

6

Returns:

Type Description
Path | None

Path to the found profile, or None if not found.

Source code in snakesee/state/paths.py
def find_profile(self, max_levels: int = 6) -> Path | None:
    """Search for a profile file in workflow and parent directories.

    Args:
        max_levels: Maximum parent levels to search (including current).

    Returns:
        Path to the found profile, or None if not found.
    """
    current = self.workflow_dir.resolve()
    for _ in range(max_levels):
        profile_path = current / DEFAULT_PROFILE_NAME
        if _cached_exists(profile_path):
            return profile_path
        if current.parent == current:
            break
        current = current.parent
    return None
get_incomplete_markers
get_incomplete_markers() -> Iterator[Path]

Iterate over incomplete job markers.

Yields:

Type Description
Path

Path to each incomplete marker file.

Source code in snakesee/state/paths.py
def get_incomplete_markers(self) -> Iterator[Path]:
    """Iterate over incomplete job markers.

    Yields:
        Path to each incomplete marker file.
    """
    if not _cached_exists(self.incomplete_dir):
        return
    for marker in self.incomplete_dir.rglob("*"):
        if marker.is_file() and marker.name != "migration_underway":
            yield marker
get_job_log
get_job_log(rule: str, wildcards: dict[str, str] | None = None, job_id: int | str | None = None) -> Path | None

Find the log file for a specific job.

Searches common log locations for a file matching the rule and optional wildcards/job_id.

Parameters:

Name Type Description Default
rule str

Name of the rule.

required
wildcards dict[str, str] | None

Optional wildcard values.

None
job_id int | str | None

Optional job ID.

None

Returns:

Type Description
Path | None

Path to the log file if found, None otherwise.

Source code in snakesee/state/paths.py
def get_job_log(
    self,
    rule: str,
    wildcards: dict[str, str] | None = None,
    job_id: int | str | None = None,
) -> Path | None:
    """Find the log file for a specific job.

    Searches common log locations for a file matching the rule
    and optional wildcards/job_id.

    Args:
        rule: Name of the rule.
        wildcards: Optional wildcard values.
        job_id: Optional job ID.

    Returns:
        Path to the log file if found, None otherwise.
    """
    search_paths: list[Path] = []

    # .snakemake/log/ directory
    if _cached_exists(self.log_dir):
        search_paths.extend(self.log_dir.glob(f"*{rule}*"))
        if job_id is not None:
            search_paths.extend(self.log_dir.glob(f"*job{job_id}*"))

    # logs/ directory (common convention)
    logs_dir = self.workflow_dir / "logs"
    search_paths.extend(self._search_log_dir(logs_dir, rule, wildcards))

    # log/ directory (another common convention)
    log_dir = self.workflow_dir / "log"
    search_paths.extend(self._search_log_dir(log_dir, rule, wildcards))

    # Sort by modification time (newest first) and return first match
    # is_file() already confirms existence, no need for additional exists check
    existing_logs = [p for p in search_paths if p.is_file()]
    if existing_logs:
        existing_logs.sort(key=safe_mtime, reverse=True)
        return existing_logs[0]

    return None
get_metadata_files
get_metadata_files() -> Iterator[Path]

Iterate over all metadata files.

Yields:

Type Description
Path

Path to each metadata file.

Source code in snakesee/state/paths.py
def get_metadata_files(self) -> Iterator[Path]:
    """Iterate over all metadata files.

    Yields:
        Path to each metadata file.
    """
    if not _cached_exists(self.metadata_dir):
        return
    for f in self.metadata_dir.rglob("*"):
        if f.is_file():
            yield f
validate
validate() -> None

Validate that this is a valid workflow directory.

Raises:

Type Description
ValueError

If .snakemake directory doesn't exist.

Source code in snakesee/state/paths.py
def validate(self) -> None:
    """Validate that this is a valid workflow directory.

    Raises:
        ValueError: If .snakemake directory doesn't exist.
    """
    if not _cached_exists(self.snakemake_dir):
        raise ValueError(f"No .snakemake directory found in {self.workflow_dir}")

WorkflowState dataclass

Unified container for all workflow state.

This is the single source of truth for workflow state, consolidating what was previously scattered across IncrementalLogReader, WorkflowProgress, TUI, EventAccumulator, and TimeEstimator.

Attributes:

Name Type Description
paths WorkflowPaths

Centralized path resolution.

jobs JobRegistry

Registry of all job state.

rules RuleRegistry

Registry of all rule timing statistics.

clock Clock

Injectable time source.

config EstimationConfig

Estimation configuration.

status WorkflowStatus

Overall workflow status.

total_jobs int | None

Total number of jobs in workflow (if known).

start_time float | None

Workflow start timestamp.

current_log Path | None

Path to current log file being monitored.

Source code in snakesee/state/workflow_state.py
@dataclass
class WorkflowState:
    """Unified container for all workflow state.

    This is the single source of truth for workflow state, consolidating
    what was previously scattered across IncrementalLogReader, WorkflowProgress,
    TUI, EventAccumulator, and TimeEstimator.

    Attributes:
        paths: Centralized path resolution.
        jobs: Registry of all job state.
        rules: Registry of all rule timing statistics.
        clock: Injectable time source.
        config: Estimation configuration.
        status: Overall workflow status.
        total_jobs: Total number of jobs in workflow (if known).
        start_time: Workflow start timestamp.
        current_log: Path to current log file being monitored.
    """

    paths: WorkflowPaths
    jobs: JobRegistry
    rules: RuleRegistry
    clock: Clock
    config: EstimationConfig
    status: WorkflowStatus = field(default_factory=_default_status)
    total_jobs: int | None = None
    start_time: float | None = None
    current_log: Path | None = None

    @property
    def completed_count(self) -> int:
        """Number of completed jobs."""
        return len(self.jobs.completed())

    @property
    def failed_count(self) -> int:
        """Number of failed jobs."""
        return len(self.jobs.failed())

    @property
    def running_count(self) -> int:
        """Number of currently running jobs."""
        return len(self.jobs.running())

    @property
    def pending_count(self) -> int:
        """Number of pending jobs."""
        from snakesee.state.job_registry import JobStatus

        return len([j for j in self.jobs.all_jobs() if j.status == JobStatus.PENDING])

    @property
    def progress_fraction(self) -> float:
        """Fraction of jobs completed (0.0 to 1.0)."""
        if self.total_jobs is None or self.total_jobs == 0:
            return 0.0
        return self.completed_count / self.total_jobs

    @property
    def elapsed_seconds(self) -> float | None:
        """Elapsed time since workflow start."""
        if self.start_time is None:
            return None
        return self.clock.now() - self.start_time

    def update_status(self) -> None:
        """Update workflow status based on current state."""
        from snakesee.models import WorkflowStatus

        if self.total_jobs is None:
            # Unknown total - check if we have failures anyway
            if self.failed_count > 0:
                self.status = WorkflowStatus.FAILED
            elif self.running_count > 0:
                self.status = WorkflowStatus.RUNNING
            else:
                self.status = WorkflowStatus.UNKNOWN
        elif self.running_count > 0:
            self.status = WorkflowStatus.RUNNING
        elif self.failed_count > 0 and self.completed_count < self.total_jobs:
            self.status = WorkflowStatus.FAILED
        elif self.completed_count >= self.total_jobs:
            self.status = WorkflowStatus.COMPLETED
        elif self.completed_count == 0 and self.running_count == 0:
            self.status = WorkflowStatus.NOT_STARTED
        else:
            # Check for stale workflow
            if self.elapsed_seconds is not None:
                if self.elapsed_seconds > self.config.time.stale_workflow_threshold:
                    self.status = WorkflowStatus.STALE
                else:
                    self.status = WorkflowStatus.RUNNING
            else:
                # No start time - assume incomplete
                self.status = WorkflowStatus.INCOMPLETE

    def record_job_completion(self, job_key: str) -> None:
        """Record a job completion and update rule statistics.

        Args:
            job_key: Key of the completed job.
        """
        job = self.jobs.get(job_key)
        if job is not None and job.duration is not None:
            self.rules.record_job_completion(job)

    def to_progress(self) -> WorkflowProgress:
        """Convert to WorkflowProgress for backward compatibility.

        Returns:
            WorkflowProgress snapshot of current state.
        """
        from snakesee.models import WorkflowProgress

        return WorkflowProgress(
            workflow_dir=self.paths.workflow_dir,
            status=self.status,
            total_jobs=self.total_jobs or 0,
            completed_jobs=self.completed_count,
            running_jobs=[job.to_job_info() for job in self.jobs.running()],
            failed_jobs=self.failed_count,
            failed_jobs_list=[job.to_job_info() for job in self.jobs.failed()],
            pending_jobs_list=[job.to_job_info() for job in self.jobs.submitted()],
            start_time=self.start_time,
            log_file=self.current_log,
        )

    def clear(self) -> None:
        """Clear all state."""
        from snakesee.models import WorkflowStatus

        self.jobs.clear()
        self.rules.clear()
        self.status = WorkflowStatus.UNKNOWN
        self.total_jobs = None
        self.start_time = None
        self.current_log = None

    @classmethod
    def create(
        cls,
        workflow_dir: Path,
        clock: Clock | None = None,
        config: EstimationConfig | None = None,
    ) -> WorkflowState:
        """Create a new WorkflowState for a workflow directory.

        Args:
            workflow_dir: Path to the workflow directory.
            clock: Optional clock for time operations.
            config: Optional estimation configuration.

        Returns:
            New WorkflowState instance.
        """
        from snakesee.state.clock import get_clock
        from snakesee.state.config import DEFAULT_CONFIG
        from snakesee.state.job_registry import JobRegistry
        from snakesee.state.paths import WorkflowPaths
        from snakesee.state.rule_registry import RuleRegistry

        actual_clock = clock or get_clock()
        actual_config = config or DEFAULT_CONFIG

        return cls(
            paths=WorkflowPaths(workflow_dir=workflow_dir),
            jobs=JobRegistry(),
            rules=RuleRegistry(config=actual_config),
            clock=actual_clock,
            config=actual_config,
        )

Attributes

completed_count property
completed_count: int

Number of completed jobs.

elapsed_seconds property
elapsed_seconds: float | None

Elapsed time since workflow start.

failed_count property
failed_count: int

Number of failed jobs.

pending_count property
pending_count: int

Number of pending jobs.

progress_fraction property
progress_fraction: float

Fraction of jobs completed (0.0 to 1.0).

running_count property
running_count: int

Number of currently running jobs.

Functions

clear
clear() -> None

Clear all state.

Source code in snakesee/state/workflow_state.py
def clear(self) -> None:
    """Clear all state."""
    from snakesee.models import WorkflowStatus

    self.jobs.clear()
    self.rules.clear()
    self.status = WorkflowStatus.UNKNOWN
    self.total_jobs = None
    self.start_time = None
    self.current_log = None
create classmethod
create(workflow_dir: Path, clock: Clock | None = None, config: EstimationConfig | None = None) -> WorkflowState

Create a new WorkflowState for a workflow directory.

Parameters:

Name Type Description Default
workflow_dir Path

Path to the workflow directory.

required
clock Clock | None

Optional clock for time operations.

None
config EstimationConfig | None

Optional estimation configuration.

None

Returns:

Type Description
WorkflowState

New WorkflowState instance.

Source code in snakesee/state/workflow_state.py
@classmethod
def create(
    cls,
    workflow_dir: Path,
    clock: Clock | None = None,
    config: EstimationConfig | None = None,
) -> WorkflowState:
    """Create a new WorkflowState for a workflow directory.

    Args:
        workflow_dir: Path to the workflow directory.
        clock: Optional clock for time operations.
        config: Optional estimation configuration.

    Returns:
        New WorkflowState instance.
    """
    from snakesee.state.clock import get_clock
    from snakesee.state.config import DEFAULT_CONFIG
    from snakesee.state.job_registry import JobRegistry
    from snakesee.state.paths import WorkflowPaths
    from snakesee.state.rule_registry import RuleRegistry

    actual_clock = clock or get_clock()
    actual_config = config or DEFAULT_CONFIG

    return cls(
        paths=WorkflowPaths(workflow_dir=workflow_dir),
        jobs=JobRegistry(),
        rules=RuleRegistry(config=actual_config),
        clock=actual_clock,
        config=actual_config,
    )
record_job_completion
record_job_completion(job_key: str) -> None

Record a job completion and update rule statistics.

Parameters:

Name Type Description Default
job_key str

Key of the completed job.

required
Source code in snakesee/state/workflow_state.py
def record_job_completion(self, job_key: str) -> None:
    """Record a job completion and update rule statistics.

    Args:
        job_key: Key of the completed job.
    """
    job = self.jobs.get(job_key)
    if job is not None and job.duration is not None:
        self.rules.record_job_completion(job)
to_progress
to_progress() -> WorkflowProgress

Convert to WorkflowProgress for backward compatibility.

Returns:

Type Description
WorkflowProgress

WorkflowProgress snapshot of current state.

Source code in snakesee/state/workflow_state.py
def to_progress(self) -> WorkflowProgress:
    """Convert to WorkflowProgress for backward compatibility.

    Returns:
        WorkflowProgress snapshot of current state.
    """
    from snakesee.models import WorkflowProgress

    return WorkflowProgress(
        workflow_dir=self.paths.workflow_dir,
        status=self.status,
        total_jobs=self.total_jobs or 0,
        completed_jobs=self.completed_count,
        running_jobs=[job.to_job_info() for job in self.jobs.running()],
        failed_jobs=self.failed_count,
        failed_jobs_list=[job.to_job_info() for job in self.jobs.failed()],
        pending_jobs_list=[job.to_job_info() for job in self.jobs.submitted()],
        start_time=self.start_time,
        log_file=self.current_log,
    )
update_status
update_status() -> None

Update workflow status based on current state.

Source code in snakesee/state/workflow_state.py
def update_status(self) -> None:
    """Update workflow status based on current state."""
    from snakesee.models import WorkflowStatus

    if self.total_jobs is None:
        # Unknown total - check if we have failures anyway
        if self.failed_count > 0:
            self.status = WorkflowStatus.FAILED
        elif self.running_count > 0:
            self.status = WorkflowStatus.RUNNING
        else:
            self.status = WorkflowStatus.UNKNOWN
    elif self.running_count > 0:
        self.status = WorkflowStatus.RUNNING
    elif self.failed_count > 0 and self.completed_count < self.total_jobs:
        self.status = WorkflowStatus.FAILED
    elif self.completed_count >= self.total_jobs:
        self.status = WorkflowStatus.COMPLETED
    elif self.completed_count == 0 and self.running_count == 0:
        self.status = WorkflowStatus.NOT_STARTED
    else:
        # Check for stale workflow
        if self.elapsed_seconds is not None:
            if self.elapsed_seconds > self.config.time.stale_workflow_threshold:
                self.status = WorkflowStatus.STALE
            else:
                self.status = WorkflowStatus.RUNNING
        else:
            # No start time - assume incomplete
            self.status = WorkflowStatus.INCOMPLETE

Functions

get_clock

get_clock() -> Clock

Get the current default clock.

Returns:

Type Description
Clock

The currently configured clock instance.

Source code in snakesee/state/clock.py
def get_clock() -> Clock:
    """Get the current default clock.

    Returns:
        The currently configured clock instance.
    """
    with _clock_lock:
        return _default_clock

reset_clock

reset_clock() -> None

Reset the default clock to SystemClock.

Source code in snakesee/state/clock.py
def reset_clock() -> None:
    """Reset the default clock to SystemClock."""
    global _default_clock
    with _clock_lock:
        _default_clock = SystemClock()

set_clock

set_clock(clock: Clock) -> None

Set the default clock (primarily for testing).

Parameters:

Name Type Description Default
clock Clock

Clock instance to use as default.

required
Source code in snakesee/state/clock.py
def set_clock(clock: Clock) -> None:
    """Set the default clock (primarily for testing).

    Args:
        clock: Clock instance to use as default.
    """
    global _default_clock
    with _clock_lock:
        _default_clock = clock

Modules

clock

Injectable clock for testable time handling.

This module provides a Clock protocol and implementations that allow time-dependent code to be tested deterministically.

Example usage

Production code

from snakesee.state import get_clock

def calculate_elapsed(start_time: float) -> float: return get_clock().now() - start_time

Test code

from snakesee.state import FrozenClock, set_clock

def test_elapsed(): clock = FrozenClock(1000.0) set_clock(clock)

assert calculate_elapsed(900.0) == 100.0

clock.advance(50.0)
assert calculate_elapsed(900.0) == 150.0

Classes

Clock

Bases: Protocol

Protocol for injectable time sources.

This enables deterministic testing by allowing tests to provide a controlled time source instead of using real wall-clock time.

Source code in snakesee/state/clock.py
class Clock(Protocol):
    """Protocol for injectable time sources.

    This enables deterministic testing by allowing tests to provide
    a controlled time source instead of using real wall-clock time.
    """

    def now(self) -> float:
        """Return current time as Unix timestamp (seconds since epoch)."""
        ...

    def monotonic(self) -> float:
        """Return monotonic clock value for measuring durations.

        This is not affected by system clock adjustments and is suitable
        for measuring elapsed time.
        """
        ...
Functions
monotonic
monotonic() -> float

Return monotonic clock value for measuring durations.

This is not affected by system clock adjustments and is suitable for measuring elapsed time.

Source code in snakesee/state/clock.py
def monotonic(self) -> float:
    """Return monotonic clock value for measuring durations.

    This is not affected by system clock adjustments and is suitable
    for measuring elapsed time.
    """
    ...
now
now() -> float

Return current time as Unix timestamp (seconds since epoch).

Source code in snakesee/state/clock.py
def now(self) -> float:
    """Return current time as Unix timestamp (seconds since epoch)."""
    ...
ClockUtils

Centralized utilities for clock and duration handling.

This class provides consistent handling of duration calculations, including validation and clock skew detection. It centralizes logic that was previously duplicated in JobInfo.elapsed(), JobInfo.duration(), and RuleTimingStats._time_weighted_mean().

Example
Get validated duration

result = ClockUtils.calculate_duration(start_time, end_time, "job foo") if not result.is_valid: logger.warning("Clock skew detected for %s", result.context) return result.value

Or use the elapsed time helper

elapsed = ClockUtils.elapsed_since(start_time, context="job bar")

Source code in snakesee/state/clock.py
class ClockUtils:
    """Centralized utilities for clock and duration handling.

    This class provides consistent handling of duration calculations,
    including validation and clock skew detection. It centralizes logic
    that was previously duplicated in JobInfo.elapsed(), JobInfo.duration(),
    and RuleTimingStats._time_weighted_mean().

    Example:
        # Get validated duration
        result = ClockUtils.calculate_duration(start_time, end_time, "job foo")
        if not result.is_valid:
            logger.warning("Clock skew detected for %s", result.context)
        return result.value

        # Or use the elapsed time helper
        elapsed = ClockUtils.elapsed_since(start_time, context="job bar")
    """

    @staticmethod
    def calculate_duration(
        start_time: float,
        end_time: float,
        context: str | None = None,
    ) -> DurationResult:
        """Calculate duration between two timestamps with validation.

        This is the primary method for calculating durations. It handles
        clock skew by clamping negative durations to zero.

        Args:
            start_time: Start timestamp (Unix epoch seconds).
            end_time: End timestamp (Unix epoch seconds).
            context: Optional description for logging (e.g., "job foo").

        Returns:
            DurationResult with validated duration and status.
        """
        raw_duration = end_time - start_time
        is_valid = raw_duration >= 0
        clamped = max(0.0, raw_duration)
        return DurationResult(
            value=clamped,
            raw_value=raw_duration,
            is_valid=is_valid,
            context=context,
        )

    @staticmethod
    def elapsed_since(
        start_time: float,
        context: str | None = None,
        clock: Clock | None = None,
    ) -> DurationResult:
        """Calculate elapsed time since a start timestamp.

        Args:
            start_time: Start timestamp (Unix epoch seconds).
            context: Optional description for logging.
            clock: Optional clock instance (defaults to global clock).

        Returns:
            DurationResult with validated elapsed time and status.
        """
        if clock is None:
            clock = get_clock()
        now = clock.now()
        return ClockUtils.calculate_duration(start_time, now, context)

    @staticmethod
    def validate_duration(
        duration: float,
        context: str | None = None,
    ) -> DurationResult:
        """Validate a pre-calculated duration value.

        Args:
            duration: Duration value in seconds.
            context: Optional description for logging.

        Returns:
            DurationResult with validated duration and status.
        """
        is_valid = duration >= 0
        clamped = max(0.0, duration)
        return DurationResult(
            value=clamped,
            raw_value=duration,
            is_valid=is_valid,
            context=context,
        )

    @staticmethod
    def age_seconds(
        timestamp: float,
        clock: Clock | None = None,
    ) -> float:
        """Calculate age in seconds (time since timestamp).

        This is a convenience method for calculating how old a timestamp is.
        The result is always non-negative.

        Args:
            timestamp: Unix timestamp to calculate age for.
            clock: Optional clock instance (defaults to global clock).

        Returns:
            Age in seconds (>= 0).
        """
        if clock is None:
            clock = get_clock()
        return max(0.0, clock.now() - timestamp)
Functions
age_seconds staticmethod
age_seconds(timestamp: float, clock: Clock | None = None) -> float

Calculate age in seconds (time since timestamp).

This is a convenience method for calculating how old a timestamp is. The result is always non-negative.

Parameters:

Name Type Description Default
timestamp float

Unix timestamp to calculate age for.

required
clock Clock | None

Optional clock instance (defaults to global clock).

None

Returns:

Type Description
float

Age in seconds (>= 0).

Source code in snakesee/state/clock.py
@staticmethod
def age_seconds(
    timestamp: float,
    clock: Clock | None = None,
) -> float:
    """Calculate age in seconds (time since timestamp).

    This is a convenience method for calculating how old a timestamp is.
    The result is always non-negative.

    Args:
        timestamp: Unix timestamp to calculate age for.
        clock: Optional clock instance (defaults to global clock).

    Returns:
        Age in seconds (>= 0).
    """
    if clock is None:
        clock = get_clock()
    return max(0.0, clock.now() - timestamp)
calculate_duration staticmethod
calculate_duration(start_time: float, end_time: float, context: str | None = None) -> DurationResult

Calculate duration between two timestamps with validation.

This is the primary method for calculating durations. It handles clock skew by clamping negative durations to zero.

Parameters:

Name Type Description Default
start_time float

Start timestamp (Unix epoch seconds).

required
end_time float

End timestamp (Unix epoch seconds).

required
context str | None

Optional description for logging (e.g., "job foo").

None

Returns:

Type Description
DurationResult

DurationResult with validated duration and status.

Source code in snakesee/state/clock.py
@staticmethod
def calculate_duration(
    start_time: float,
    end_time: float,
    context: str | None = None,
) -> DurationResult:
    """Calculate duration between two timestamps with validation.

    This is the primary method for calculating durations. It handles
    clock skew by clamping negative durations to zero.

    Args:
        start_time: Start timestamp (Unix epoch seconds).
        end_time: End timestamp (Unix epoch seconds).
        context: Optional description for logging (e.g., "job foo").

    Returns:
        DurationResult with validated duration and status.
    """
    raw_duration = end_time - start_time
    is_valid = raw_duration >= 0
    clamped = max(0.0, raw_duration)
    return DurationResult(
        value=clamped,
        raw_value=raw_duration,
        is_valid=is_valid,
        context=context,
    )
elapsed_since staticmethod
elapsed_since(start_time: float, context: str | None = None, clock: Clock | None = None) -> DurationResult

Calculate elapsed time since a start timestamp.

Parameters:

Name Type Description Default
start_time float

Start timestamp (Unix epoch seconds).

required
context str | None

Optional description for logging.

None
clock Clock | None

Optional clock instance (defaults to global clock).

None

Returns:

Type Description
DurationResult

DurationResult with validated elapsed time and status.

Source code in snakesee/state/clock.py
@staticmethod
def elapsed_since(
    start_time: float,
    context: str | None = None,
    clock: Clock | None = None,
) -> DurationResult:
    """Calculate elapsed time since a start timestamp.

    Args:
        start_time: Start timestamp (Unix epoch seconds).
        context: Optional description for logging.
        clock: Optional clock instance (defaults to global clock).

    Returns:
        DurationResult with validated elapsed time and status.
    """
    if clock is None:
        clock = get_clock()
    now = clock.now()
    return ClockUtils.calculate_duration(start_time, now, context)
validate_duration staticmethod
validate_duration(duration: float, context: str | None = None) -> DurationResult

Validate a pre-calculated duration value.

Parameters:

Name Type Description Default
duration float

Duration value in seconds.

required
context str | None

Optional description for logging.

None

Returns:

Type Description
DurationResult

DurationResult with validated duration and status.

Source code in snakesee/state/clock.py
@staticmethod
def validate_duration(
    duration: float,
    context: str | None = None,
) -> DurationResult:
    """Validate a pre-calculated duration value.

    Args:
        duration: Duration value in seconds.
        context: Optional description for logging.

    Returns:
        DurationResult with validated duration and status.
    """
    is_valid = duration >= 0
    clamped = max(0.0, duration)
    return DurationResult(
        value=clamped,
        raw_value=duration,
        is_valid=is_valid,
        context=context,
    )
DurationResult

Result of a duration calculation with validation status.

This class provides a way to handle duration calculations that may encounter clock skew or other issues without raising exceptions.

Attributes:

Name Type Description
value

The calculated duration (>= 0, clamped if negative).

raw_value

The original calculated value before clamping.

is_valid

True if the duration was valid (non-negative).

context

Optional context description.

Source code in snakesee/state/clock.py
class DurationResult:
    """Result of a duration calculation with validation status.

    This class provides a way to handle duration calculations that may
    encounter clock skew or other issues without raising exceptions.

    Attributes:
        value: The calculated duration (>= 0, clamped if negative).
        raw_value: The original calculated value before clamping.
        is_valid: True if the duration was valid (non-negative).
        context: Optional context description.
    """

    __slots__ = ("value", "raw_value", "is_valid", "context")

    def __init__(
        self,
        value: float,
        raw_value: float,
        is_valid: bool,
        context: str | None = None,
    ) -> None:
        self.value = value
        self.raw_value = raw_value
        self.is_valid = is_valid
        self.context = context

    def __repr__(self) -> str:
        return (
            f"DurationResult(value={self.value}, raw_value={self.raw_value}, "
            f"is_valid={self.is_valid}, context={self.context!r})"
        )
FrozenClock

Clock frozen at a specific time for testing.

Useful for testing time-dependent logic without flakiness.

Attributes:

Name Type Description
frozen_time

The frozen Unix timestamp.

frozen_monotonic

The frozen monotonic value.

Example

clock = FrozenClock(1700000000.0) assert clock.now() == 1700000000.0

clock.advance(60.0) # Advance by 1 minute assert clock.now() == 1700000060.0

Source code in snakesee/state/clock.py
class FrozenClock:
    """Clock frozen at a specific time for testing.

    Useful for testing time-dependent logic without flakiness.

    Attributes:
        frozen_time: The frozen Unix timestamp.
        frozen_monotonic: The frozen monotonic value.

    Example:
        clock = FrozenClock(1700000000.0)
        assert clock.now() == 1700000000.0

        clock.advance(60.0)  # Advance by 1 minute
        assert clock.now() == 1700000060.0
    """

    def __init__(
        self,
        frozen_time: float | None = None,
        frozen_monotonic: float | None = None,
    ) -> None:
        """Initialize with specific frozen times.

        Args:
            frozen_time: Unix timestamp to freeze at. Defaults to current time.
            frozen_monotonic: Monotonic value to freeze at. Defaults to 0.0.
        """
        self._time = frozen_time if frozen_time is not None else _time.time()
        self._monotonic = frozen_monotonic if frozen_monotonic is not None else 0.0

    def now(self) -> float:
        """Return the frozen time."""
        return self._time

    def monotonic(self) -> float:
        """Return the frozen monotonic value."""
        return self._monotonic

    def advance(self, seconds: float) -> None:
        """Advance the frozen time by the given number of seconds.

        Args:
            seconds: Number of seconds to advance (can be negative).
        """
        self._time += seconds
        self._monotonic += seconds

    def set_time(self, timestamp: float) -> None:
        """Set the frozen time to a specific timestamp.

        Args:
            timestamp: Unix timestamp to set.
        """
        self._time = timestamp

    def set_monotonic(self, value: float) -> None:
        """Set the frozen monotonic value.

        Args:
            value: Monotonic value to set.
        """
        self._monotonic = value
Functions
__init__
__init__(frozen_time: float | None = None, frozen_monotonic: float | None = None) -> None

Initialize with specific frozen times.

Parameters:

Name Type Description Default
frozen_time float | None

Unix timestamp to freeze at. Defaults to current time.

None
frozen_monotonic float | None

Monotonic value to freeze at. Defaults to 0.0.

None
Source code in snakesee/state/clock.py
def __init__(
    self,
    frozen_time: float | None = None,
    frozen_monotonic: float | None = None,
) -> None:
    """Initialize with specific frozen times.

    Args:
        frozen_time: Unix timestamp to freeze at. Defaults to current time.
        frozen_monotonic: Monotonic value to freeze at. Defaults to 0.0.
    """
    self._time = frozen_time if frozen_time is not None else _time.time()
    self._monotonic = frozen_monotonic if frozen_monotonic is not None else 0.0
advance
advance(seconds: float) -> None

Advance the frozen time by the given number of seconds.

Parameters:

Name Type Description Default
seconds float

Number of seconds to advance (can be negative).

required
Source code in snakesee/state/clock.py
def advance(self, seconds: float) -> None:
    """Advance the frozen time by the given number of seconds.

    Args:
        seconds: Number of seconds to advance (can be negative).
    """
    self._time += seconds
    self._monotonic += seconds
monotonic
monotonic() -> float

Return the frozen monotonic value.

Source code in snakesee/state/clock.py
def monotonic(self) -> float:
    """Return the frozen monotonic value."""
    return self._monotonic
now
now() -> float

Return the frozen time.

Source code in snakesee/state/clock.py
def now(self) -> float:
    """Return the frozen time."""
    return self._time
set_monotonic
set_monotonic(value: float) -> None

Set the frozen monotonic value.

Parameters:

Name Type Description Default
value float

Monotonic value to set.

required
Source code in snakesee/state/clock.py
def set_monotonic(self, value: float) -> None:
    """Set the frozen monotonic value.

    Args:
        value: Monotonic value to set.
    """
    self._monotonic = value
set_time
set_time(timestamp: float) -> None

Set the frozen time to a specific timestamp.

Parameters:

Name Type Description Default
timestamp float

Unix timestamp to set.

required
Source code in snakesee/state/clock.py
def set_time(self, timestamp: float) -> None:
    """Set the frozen time to a specific timestamp.

    Args:
        timestamp: Unix timestamp to set.
    """
    self._time = timestamp
OffsetClock

Clock with a fixed offset from system time.

Useful for simulating time shifts without fully freezing time.

Attributes:

Name Type Description
offset

Seconds to add to system time.

Source code in snakesee/state/clock.py
class OffsetClock:
    """Clock with a fixed offset from system time.

    Useful for simulating time shifts without fully freezing time.

    Attributes:
        offset: Seconds to add to system time.
    """

    def __init__(self, offset: float = 0.0) -> None:
        """Initialize with an offset.

        Args:
            offset: Seconds to add to current time (negative for past).
        """
        self._offset = offset

    def now(self) -> float:
        """Return system time plus offset."""
        return _time.time() + self._offset

    def monotonic(self) -> float:
        """Return system monotonic (offset doesn't apply to durations)."""
        return _time.monotonic()
Functions
__init__
__init__(offset: float = 0.0) -> None

Initialize with an offset.

Parameters:

Name Type Description Default
offset float

Seconds to add to current time (negative for past).

0.0
Source code in snakesee/state/clock.py
def __init__(self, offset: float = 0.0) -> None:
    """Initialize with an offset.

    Args:
        offset: Seconds to add to current time (negative for past).
    """
    self._offset = offset
monotonic
monotonic() -> float

Return system monotonic (offset doesn't apply to durations).

Source code in snakesee/state/clock.py
def monotonic(self) -> float:
    """Return system monotonic (offset doesn't apply to durations)."""
    return _time.monotonic()
now
now() -> float

Return system time plus offset.

Source code in snakesee/state/clock.py
def now(self) -> float:
    """Return system time plus offset."""
    return _time.time() + self._offset
SystemClock

Default clock implementation using system time.

This is the production implementation that delegates to the standard library's time module.

Source code in snakesee/state/clock.py
class SystemClock:
    """Default clock implementation using system time.

    This is the production implementation that delegates to the
    standard library's time module.
    """

    def now(self) -> float:
        """Return current time as Unix timestamp."""
        return _time.time()

    def monotonic(self) -> float:
        """Return monotonic clock value."""
        return _time.monotonic()
Functions
monotonic
monotonic() -> float

Return monotonic clock value.

Source code in snakesee/state/clock.py
def monotonic(self) -> float:
    """Return monotonic clock value."""
    return _time.monotonic()
now
now() -> float

Return current time as Unix timestamp.

Source code in snakesee/state/clock.py
def now(self) -> float:
    """Return current time as Unix timestamp."""
    return _time.time()

Functions

get_clock
get_clock() -> Clock

Get the current default clock.

Returns:

Type Description
Clock

The currently configured clock instance.

Source code in snakesee/state/clock.py
def get_clock() -> Clock:
    """Get the current default clock.

    Returns:
        The currently configured clock instance.
    """
    with _clock_lock:
        return _default_clock
reset_clock
reset_clock() -> None

Reset the default clock to SystemClock.

Source code in snakesee/state/clock.py
def reset_clock() -> None:
    """Reset the default clock to SystemClock."""
    global _default_clock
    with _clock_lock:
        _default_clock = SystemClock()
set_clock
set_clock(clock: Clock) -> None

Set the default clock (primarily for testing).

Parameters:

Name Type Description Default
clock Clock

Clock instance to use as default.

required
Source code in snakesee/state/clock.py
def set_clock(clock: Clock) -> None:
    """Set the default clock (primarily for testing).

    Args:
        clock: Clock instance to use as default.
    """
    global _default_clock
    with _clock_lock:
        _default_clock = clock

config

Centralized configuration for time estimation.

This module consolidates all estimation-related configuration that was previously scattered across multiple files as magic numbers.

The EstimationConfig dataclass contains all thresholds, multipliers, and settings used by the time estimation system.

Classes

ConfidenceThresholds dataclass

Thresholds for confidence-based decisions.

Attributes:

Name Type Description
size_scaling_min float

Min confidence to use size-scaled estimate.

high_confidence float

Show simple estimate without range.

medium_confidence float

Show range.

low_confidence float

Show "rough" caveat.

max_confidence float

Cap to avoid overconfidence.

bootstrap_confidence float

Confidence when no progress yet.

Source code in snakesee/state/config.py
@dataclass(frozen=True)
class ConfidenceThresholds:
    """Thresholds for confidence-based decisions.

    Attributes:
        size_scaling_min: Min confidence to use size-scaled estimate.
        high_confidence: Show simple estimate without range.
        medium_confidence: Show range.
        low_confidence: Show "rough" caveat.
        max_confidence: Cap to avoid overconfidence.
        bootstrap_confidence: Confidence when no progress yet.
    """

    size_scaling_min: float = 0.3
    high_confidence: float = 0.7
    medium_confidence: float = 0.4
    low_confidence: float = 0.1
    max_confidence: float = 0.9
    bootstrap_confidence: float = 0.05
ConfidenceWeights dataclass

Weights for computing overall confidence score.

Used in _estimate_weighted() to combine multiple factors. All weights should sum to 1.0.

Attributes:

Name Type Description
sample_size float

Weight for historical sample count.

recency float

Weight for data recency.

consistency float

Weight for recent run consistency.

data_coverage float

Weight for fraction of rules with data.

Raises:

Type Description
ValueError

If weights do not sum to 1.0.

Source code in snakesee/state/config.py
@dataclass(frozen=True)
class ConfidenceWeights:
    """Weights for computing overall confidence score.

    Used in _estimate_weighted() to combine multiple factors.
    All weights should sum to 1.0.

    Attributes:
        sample_size: Weight for historical sample count.
        recency: Weight for data recency.
        consistency: Weight for recent run consistency.
        data_coverage: Weight for fraction of rules with data.

    Raises:
        ValueError: If weights do not sum to 1.0.
    """

    sample_size: float = 0.4
    recency: float = 0.3
    consistency: float = 0.2
    data_coverage: float = 0.1

    def __post_init__(self) -> None:
        """Validate that weights sum to 1.0."""
        total = self.sample_size + self.recency + self.consistency + self.data_coverage
        if abs(total - 1.0) > 0.001:
            raise ValueError(f"Confidence weights must sum to 1.0, got {total}")
Functions
__post_init__
__post_init__() -> None

Validate that weights sum to 1.0.

Source code in snakesee/state/config.py
def __post_init__(self) -> None:
    """Validate that weights sum to 1.0."""
    total = self.sample_size + self.recency + self.consistency + self.data_coverage
    if abs(total - 1.0) > 0.001:
        raise ValueError(f"Confidence weights must sum to 1.0, got {total}")
EstimationConfig dataclass

Central configuration for time estimation.

This class consolidates all estimation-related configuration that was previously scattered across multiple files as magic numbers.

Attributes:

Name Type Description
weighting_strategy WeightingStrategy

How to weight historical data ("time" or "index").

half_life_days float

Half-life in days for time-based weighting.

half_life_logs int

Half-life in run count for index-based weighting.

variance VarianceMultipliers

Variance multiplier settings.

confidence_weights ConfidenceWeights

Weights for confidence calculation.

confidence_thresholds ConfidenceThresholds

Decision thresholds based on confidence.

time TimeConstants

Time-related constants.

min_samples_for_conditioning int

Minimum samples for wildcard conditioning.

min_pairs_for_size_scaling int

Minimum pairs for size correlation.

high_variance_cv float

Coefficient of variation threshold for "high variance".

default_global_mean float

Fallback duration when no data available.

Raises: ValueError: If any parameter is invalid (e.g., non-positive half_life).

Source code in snakesee/state/config.py
@dataclass(frozen=True)
class EstimationConfig:
    """Central configuration for time estimation.

    This class consolidates all estimation-related configuration that was
    previously scattered across multiple files as magic numbers.

    Attributes:
        weighting_strategy: How to weight historical data ("time" or "index").
        half_life_days: Half-life in days for time-based weighting.
        half_life_logs: Half-life in run count for index-based weighting.
        variance: Variance multiplier settings.
        confidence_weights: Weights for confidence calculation.
        confidence_thresholds: Decision thresholds based on confidence.
        time: Time-related constants.
        min_samples_for_conditioning: Minimum samples for wildcard conditioning.
        min_pairs_for_size_scaling: Minimum pairs for size correlation.
        high_variance_cv: Coefficient of variation threshold for "high variance".
        default_global_mean: Fallback duration when no data available.
    Raises:
        ValueError: If any parameter is invalid (e.g., non-positive half_life).
    """

    weighting_strategy: WeightingStrategy = "index"
    half_life_days: float = 7.0
    half_life_logs: int = 10

    variance: VarianceMultipliers = field(default_factory=VarianceMultipliers)
    confidence_weights: ConfidenceWeights = field(default_factory=ConfidenceWeights)
    confidence_thresholds: ConfidenceThresholds = field(default_factory=ConfidenceThresholds)
    time: TimeConstants = field(default_factory=TimeConstants)

    # Sample size thresholds
    # Synced with constants.MIN_SAMPLES_FOR_CONDITIONING
    min_samples_for_conditioning: int = 3
    min_pairs_for_size_scaling: int = 3
    min_samples_for_confidence: int = 10

    # Variance thresholds
    high_variance_cv: float = 0.5
    variance_ratio_threshold: float = 0.1

    # Default values
    default_global_mean: float = 60.0
    default_recency_factor: float = 0.5

    # Size scaling bounds
    size_dampening_power: float = 0.5
    size_ratio_min: float = 0.5
    size_ratio_max: float = 2.0
    size_confidence_max: float = 0.8
    size_confidence_divisor: int = 10

    # Bootstrap estimation bounds (when no jobs completed)
    bootstrap_lower_multiplier: float = 0.2
    bootstrap_upper_multiplier: float = 3.0

    # Simple estimate bounds (used when no historical data)
    simple_estimate_confidence_cap: float = 0.7
    simple_estimate_jobs_divisor: int = 20

    # Fuzzy rule matching
    fuzzy_match_max_distance: int = 3

    def __post_init__(self) -> None:
        """Validate configuration parameters."""
        if self.half_life_logs <= 0:
            raise ValueError(f"half_life_logs must be > 0, got {self.half_life_logs}")
        if self.half_life_days <= 0:
            raise ValueError(f"half_life_days must be > 0, got {self.half_life_days}")
        if self.default_global_mean <= 0:
            raise ValueError(f"default_global_mean must be > 0, got {self.default_global_mean}")
Functions
__post_init__
__post_init__() -> None

Validate configuration parameters.

Source code in snakesee/state/config.py
def __post_init__(self) -> None:
    """Validate configuration parameters."""
    if self.half_life_logs <= 0:
        raise ValueError(f"half_life_logs must be > 0, got {self.half_life_logs}")
    if self.half_life_days <= 0:
        raise ValueError(f"half_life_days must be > 0, got {self.half_life_days}")
    if self.default_global_mean <= 0:
        raise ValueError(f"default_global_mean must be > 0, got {self.default_global_mean}")
TimeConstants dataclass

Time-related constants used throughout the codebase.

Attributes:

Name Type Description
seconds_per_day int

Number of seconds in a day.

seconds_per_hour int

Number of seconds in an hour.

seconds_per_minute int

Number of seconds in a minute.

stale_workflow_threshold float

Seconds before workflow is considered stale.

timing_mismatch_tolerance float

Tolerance for validation timing comparisons.

Source code in snakesee/state/config.py
@dataclass(frozen=True)
class TimeConstants:
    """Time-related constants used throughout the codebase.

    Attributes:
        seconds_per_day: Number of seconds in a day.
        seconds_per_hour: Number of seconds in an hour.
        seconds_per_minute: Number of seconds in a minute.
        stale_workflow_threshold: Seconds before workflow is considered stale.
        timing_mismatch_tolerance: Tolerance for validation timing comparisons.
    """

    seconds_per_day: int = 86400
    seconds_per_hour: int = 3600
    seconds_per_minute: int = 60
    stale_workflow_threshold: float = STALE_WORKFLOW_THRESHOLD_SECONDS
    timing_mismatch_tolerance: float = 5.0  # seconds
VarianceMultipliers dataclass

Variance multipliers for different estimation scenarios.

When only one observation exists, variance is computed as (mean * multiplier)^2. Lower multipliers = tighter confidence bounds.

Attributes:

Name Type Description
exact_thread_match float

High confidence - exact thread count match.

exact_wildcard_match float

High confidence - exact wildcard match.

size_scaled float

Medium confidence - input-size scaling.

rule_fallback float

Standard - rule-level fallback.

aggregate_fallback float

Standard - aggregate across thread counts.

fuzzy_match float

Lower confidence - fuzzy rule matching.

global_fallback float

Lowest confidence - no rule data.

bootstrap float

No progress yet - initial estimate.

Source code in snakesee/state/config.py
@dataclass(frozen=True)
class VarianceMultipliers:
    """Variance multipliers for different estimation scenarios.

    When only one observation exists, variance is computed as (mean * multiplier)^2.
    Lower multipliers = tighter confidence bounds.

    Attributes:
        exact_thread_match: High confidence - exact thread count match.
        exact_wildcard_match: High confidence - exact wildcard match.
        size_scaled: Medium confidence - input-size scaling.
        rule_fallback: Standard - rule-level fallback.
        aggregate_fallback: Standard - aggregate across thread counts.
        fuzzy_match: Lower confidence - fuzzy rule matching.
        global_fallback: Lowest confidence - no rule data.
        bootstrap: No progress yet - initial estimate.
    """

    exact_thread_match: float = 0.2
    exact_wildcard_match: float = 0.2
    size_scaled: float = 0.25
    rule_fallback: float = 0.3
    aggregate_fallback: float = 0.3
    fuzzy_match: float = 0.4
    global_fallback: float = 0.5
    bootstrap: float = 0.5

job_registry

Job registry for centralized job state management.

This module provides a single source of truth for all job state in a workflow, replacing the scattered job tracking across multiple components.

Classes

Job dataclass

Mutable job state container.

Unlike the frozen JobInfo dataclass, this class allows state updates as jobs progress through their lifecycle.

Attributes:

Name Type Description
key str

Unique key for this job (rule + output hash or job_id).

rule str

The Snakemake rule name.

status JobStatus

Current job status.

job_id str | None

Snakemake job ID (may be None initially).

start_time float | None

Unix timestamp when job started.

end_time float | None

Unix timestamp when job completed.

wildcards dict[str, str]

Dictionary of wildcard values.

threads int | None

Number of threads allocated.

input_size int | None

Total input file size in bytes.

log_file Path | None

Path to job's log file.

Source code in snakesee/state/job_registry.py
@dataclass
class Job:
    """Mutable job state container.

    Unlike the frozen JobInfo dataclass, this class allows state updates
    as jobs progress through their lifecycle.

    Attributes:
        key: Unique key for this job (rule + output hash or job_id).
        rule: The Snakemake rule name.
        status: Current job status.
        job_id: Snakemake job ID (may be None initially).
        start_time: Unix timestamp when job started.
        end_time: Unix timestamp when job completed.
        wildcards: Dictionary of wildcard values.
        threads: Number of threads allocated.
        input_size: Total input file size in bytes.
        log_file: Path to job's log file.
    """

    key: str
    rule: str
    status: JobStatus = JobStatus.PENDING
    job_id: str | None = None
    start_time: float | None = None
    end_time: float | None = None
    wildcards: dict[str, str] = field(default_factory=dict)
    threads: int | None = None
    input_size: int | None = None
    log_file: Path | None = None
    stats_recorded: bool = False

    @property
    def elapsed(self) -> float | None:
        """Elapsed time in seconds since job started."""
        if self.start_time is None:
            return None
        if self.end_time is not None:
            return self.end_time - self.start_time
        from snakesee.state.clock import get_clock

        return get_clock().now() - self.start_time

    @property
    def duration(self) -> float | None:
        """Total duration for completed jobs."""
        if self.start_time is not None and self.end_time is not None:
            return self.end_time - self.start_time
        return None

    def to_job_info(self) -> JobInfo:
        """Convert to immutable JobInfo for backward compatibility."""
        from snakesee.models import JobInfo

        return JobInfo(
            rule=self.rule,
            job_id=self.job_id,
            wildcards=self.wildcards if self.wildcards else None,
            start_time=self.start_time,
            end_time=self.end_time,
            input_size=self.input_size,
            threads=self.threads,
            log_file=self.log_file,
        )

    @classmethod
    def from_job_info(cls, job_info: JobInfo, key: str | None = None) -> Job:
        """Create a Job from a JobInfo instance."""
        # Determine status based on JobInfo fields
        if job_info.end_time is not None:
            status = JobStatus.COMPLETED
        elif job_info.start_time is not None:
            status = JobStatus.RUNNING
        else:
            status = JobStatus.PENDING

        # Generate key if not provided
        if key is None:
            # Use job_id if available, otherwise use deterministic hash
            # This matches apply_job_info() to avoid duplicate entries
            key = job_info.job_id or f"{job_info.rule}:{hash(job_info)}"

        return cls(
            key=key,
            rule=job_info.rule,
            status=status,
            job_id=job_info.job_id,
            start_time=job_info.start_time,
            end_time=job_info.end_time,
            wildcards=dict(job_info.wildcards) if job_info.wildcards else {},
            threads=job_info.threads,
            input_size=job_info.input_size,
            log_file=job_info.log_file,
        )
Attributes
duration property
duration: float | None

Total duration for completed jobs.

elapsed property
elapsed: float | None

Elapsed time in seconds since job started.

Functions
from_job_info classmethod
from_job_info(job_info: JobInfo, key: str | None = None) -> Job

Create a Job from a JobInfo instance.

Source code in snakesee/state/job_registry.py
@classmethod
def from_job_info(cls, job_info: JobInfo, key: str | None = None) -> Job:
    """Create a Job from a JobInfo instance."""
    # Determine status based on JobInfo fields
    if job_info.end_time is not None:
        status = JobStatus.COMPLETED
    elif job_info.start_time is not None:
        status = JobStatus.RUNNING
    else:
        status = JobStatus.PENDING

    # Generate key if not provided
    if key is None:
        # Use job_id if available, otherwise use deterministic hash
        # This matches apply_job_info() to avoid duplicate entries
        key = job_info.job_id or f"{job_info.rule}:{hash(job_info)}"

    return cls(
        key=key,
        rule=job_info.rule,
        status=status,
        job_id=job_info.job_id,
        start_time=job_info.start_time,
        end_time=job_info.end_time,
        wildcards=dict(job_info.wildcards) if job_info.wildcards else {},
        threads=job_info.threads,
        input_size=job_info.input_size,
        log_file=job_info.log_file,
    )
to_job_info
to_job_info() -> JobInfo

Convert to immutable JobInfo for backward compatibility.

Source code in snakesee/state/job_registry.py
def to_job_info(self) -> JobInfo:
    """Convert to immutable JobInfo for backward compatibility."""
    from snakesee.models import JobInfo

    return JobInfo(
        rule=self.rule,
        job_id=self.job_id,
        wildcards=self.wildcards if self.wildcards else None,
        start_time=self.start_time,
        end_time=self.end_time,
        input_size=self.input_size,
        threads=self.threads,
        log_file=self.log_file,
    )
JobRegistry

Central registry for all job state.

Provides O(1) lookup by job key or job_id, and efficient iteration over jobs by status.

Example

registry = JobRegistry() job = registry.get_or_create("job_1", "align") job.status = JobStatus.RUNNING job.start_time = time.time() registry.update_indexes(job) running = registry.running() # [job]

Source code in snakesee/state/job_registry.py
class JobRegistry:
    """Central registry for all job state.

    Provides O(1) lookup by job key or job_id, and efficient iteration
    over jobs by status.

    Example:
        >>> registry = JobRegistry()
        >>> job = registry.get_or_create("job_1", "align")
        >>> job.status = JobStatus.RUNNING
        >>> job.start_time = time.time()
        >>> registry.update_indexes(job)
        >>> running = registry.running()  # [job]
    """

    def __init__(self) -> None:
        """Initialize empty registry."""
        self._lock = threading.RLock()
        self._jobs: dict[str, Job] = {}
        self._by_job_id: dict[str, str] = {}  # job_id -> key
        self._by_status: dict[JobStatus, set[str]] = {status: set() for status in JobStatus}
        self._by_rule: dict[str, set[str]] = {}

    def __len__(self) -> int:
        """Return number of jobs in registry."""
        with self._lock:
            return len(self._jobs)

    def __contains__(self, key: str) -> bool:
        """Check if job exists by key."""
        with self._lock:
            return key in self._jobs

    def get(self, key: str) -> Job | None:
        """Get job by key."""
        with self._lock:
            return self._jobs.get(key)

    def get_by_job_id(self, job_id: str) -> Job | None:
        """Get job by Snakemake job_id."""
        with self._lock:
            key = self._by_job_id.get(job_id)
            return self._jobs.get(key) if key else None

    def get_or_create(self, key: str, rule: str) -> tuple[Job, bool]:
        """Get existing job or create a new one.

        Args:
            key: Unique job key.
            rule: Rule name for new job.

        Returns:
            Tuple of (job, created) where created is True if job was new.
        """
        with self._lock:
            if key in self._jobs:
                return self._jobs[key], False

            job = Job(key=key, rule=rule)
            self._add_unlocked(job)
            return job, True

    def add(self, job: Job) -> None:
        """Add a job to the registry."""
        with self._lock:
            self._add_unlocked(job)

    def _add_unlocked(self, job: Job) -> None:
        """Internal add with index updates (caller must hold lock)."""
        self._jobs[job.key] = job
        self._by_status[job.status].add(job.key)

        if job.job_id is not None:
            self._by_job_id[job.job_id] = job.key

        if job.rule not in self._by_rule:
            self._by_rule[job.rule] = set()
        self._by_rule[job.rule].add(job.key)

    def update_indexes(self, job: Job, old_status: JobStatus | None = None) -> None:
        """Update indexes after job state change.

        Args:
            job: The job that was updated.
            old_status: Previous status if status changed, for index update.
        """
        with self._lock:
            self._update_indexes_unlocked(job, old_status)

    def _update_indexes_unlocked(self, job: Job, old_status: JobStatus | None = None) -> None:
        """Update indexes without acquiring lock (caller must hold lock)."""
        # Update job_id index if needed
        if job.job_id is not None and job.job_id not in self._by_job_id:
            self._by_job_id[job.job_id] = job.key

        # Update status index if status changed
        if old_status is not None and old_status != job.status:
            self._by_status[old_status].discard(job.key)
            self._by_status[job.status].add(job.key)

    def set_status(self, job: Job, status: JobStatus) -> None:
        """Update job status and indexes."""
        with self._lock:
            old_status = job.status
            job.status = status
            self._update_indexes_unlocked(job, old_status)

    def running(self) -> list[Job]:
        """Get all running jobs."""
        with self._lock:
            return [self._jobs[key] for key in self._by_status[JobStatus.RUNNING]]

    def completed(self) -> list[Job]:
        """Get all completed jobs."""
        with self._lock:
            return [self._jobs[key] for key in self._by_status[JobStatus.COMPLETED]]

    def failed(self) -> list[Job]:
        """Get all failed jobs."""
        with self._lock:
            return [self._jobs[key] for key in self._by_status[JobStatus.FAILED]]

    def incomplete(self) -> list[Job]:
        """Get all incomplete jobs."""
        with self._lock:
            return [self._jobs[key] for key in self._by_status[JobStatus.INCOMPLETE]]

    def submitted(self) -> list[Job]:
        """Get all submitted jobs that haven't started yet."""
        with self._lock:
            return [self._jobs[key] for key in self._by_status[JobStatus.SUBMITTED]]

    def pending(self) -> list[Job]:
        """Get all pending jobs (not yet submitted)."""
        with self._lock:
            return [self._jobs[key] for key in self._by_status[JobStatus.PENDING]]

    def by_rule(self, rule: str) -> list[Job]:
        """Get all jobs for a specific rule."""
        with self._lock:
            keys = self._by_rule.get(rule, set())
            return [self._jobs[key] for key in keys]

    def all_jobs(self) -> list[Job]:
        """Get all jobs in the registry."""
        with self._lock:
            return list(self._jobs.values())

    def running_job_infos(self) -> list[JobInfo]:
        """Get running jobs as JobInfo for backward compatibility."""
        return [job.to_job_info() for job in self.running()]

    def completed_job_infos(self) -> list[JobInfo]:
        """Get completed jobs as JobInfo for backward compatibility."""
        return [job.to_job_info() for job in self.completed()]

    def failed_job_infos(self) -> list[JobInfo]:
        """Get failed jobs as JobInfo for backward compatibility."""
        return [job.to_job_info() for job in self.failed()]

    def submitted_job_infos(self) -> list[JobInfo]:
        """Get submitted jobs as JobInfo for backward compatibility."""
        return [job.to_job_info() for job in self.submitted()]

    def clear(self) -> None:
        """Clear all jobs from the registry."""
        with self._lock:
            self._jobs.clear()
            self._by_job_id.clear()
            for status_set in self._by_status.values():
                status_set.clear()
            self._by_rule.clear()

    def apply_job_info(self, job_info: JobInfo, key: str | None = None) -> Job:
        """Add or update a job from a JobInfo.

        Args:
            job_info: JobInfo to apply.
            key: Optional key. If None, uses job_id or generates one.

        Returns:
            The created or updated Job.
        """
        with self._lock:
            # Determine key
            if key is None:
                key = job_info.job_id or f"{job_info.rule}:{hash(job_info)}"

            # Look up existing job without additional lock acquisition
            existing = self._jobs.get(key)
            if existing is None and job_info.job_id:
                existing_key = self._by_job_id.get(job_info.job_id)
                if existing_key:
                    existing = self._jobs.get(existing_key)

            if existing:
                # Update existing job
                old_status = existing.status
                if job_info.start_time is not None:
                    existing.start_time = job_info.start_time
                if job_info.end_time is not None:
                    existing.end_time = job_info.end_time
                    existing.status = JobStatus.COMPLETED
                elif job_info.start_time is not None:
                    existing.status = JobStatus.RUNNING
                if job_info.threads is not None:
                    existing.threads = job_info.threads
                if job_info.wildcards:
                    existing.wildcards = dict(job_info.wildcards)
                if job_info.input_size is not None:
                    existing.input_size = job_info.input_size
                if job_info.log_file is not None:
                    existing.log_file = job_info.log_file
                if job_info.job_id is not None and existing.job_id is None:
                    existing.job_id = job_info.job_id
                self._update_indexes_unlocked(existing, old_status)
                return existing
            else:
                # Create new job
                job = Job.from_job_info(job_info, key)
                self._add_unlocked(job)
                return job

    def apply_event(self, event: SnakeseeEvent) -> Job | None:
        """Apply a snakesee event to update job state.

        Args:
            event: The event to apply.

        Returns:
            The updated Job, or None if event couldn't be applied.
        """
        from snakesee.events import EventType

        # Ignore non-job events (e.g., WORKFLOW_STARTED, PROGRESS)
        # These don't have job_id/rule_name and would create synthetic "unknown" jobs
        if event.event_type not in {
            EventType.JOB_SUBMITTED,
            EventType.JOB_STARTED,
            EventType.JOB_FINISHED,
            EventType.JOB_ERROR,
        }:
            return None

        with self._lock:
            # Get rule name from event
            rule_name = event.rule_name or "unknown"

            # Use job_id as key if available (convert int to str for key)
            job_id_str = str(event.job_id) if event.job_id is not None else None
            key = job_id_str or f"{rule_name}:{event.timestamp}"

            # Look up job without additional lock acquisition
            job = self._jobs.get(key)
            if job is None and job_id_str:
                existing_key = self._by_job_id.get(job_id_str)
                if existing_key:
                    job = self._jobs.get(existing_key)

            if job is None:
                # Create new job from event
                job = Job(key=key, rule=rule_name)
                self._add_unlocked(job)
                if job_id_str:
                    job.job_id = job_id_str
                    self._by_job_id[job_id_str] = key
                if event.wildcards:
                    job.wildcards = dict(event.wildcards)
                if event.threads:
                    job.threads = event.threads

            old_status = job.status

            # Update based on event type
            if event.event_type == EventType.JOB_SUBMITTED:
                job.status = JobStatus.SUBMITTED
            elif event.event_type == EventType.JOB_STARTED:
                job.status = JobStatus.RUNNING
                job.start_time = event.timestamp
                if event.threads:
                    job.threads = event.threads
            elif event.event_type == EventType.JOB_FINISHED:
                job.status = JobStatus.COMPLETED
                job.end_time = event.timestamp
            elif event.event_type == EventType.JOB_ERROR:
                job.status = JobStatus.FAILED
                job.end_time = event.timestamp

            self._update_indexes_unlocked(job, old_status)
            return job

    def store_threads(self, job_id: str, threads: int) -> None:
        """Store thread count for a job by job_id.

        This is used when thread info comes from events before the job
        is fully tracked.
        """
        with self._lock:
            key = self._by_job_id.get(job_id)
            if key:
                job = self._jobs.get(key)
                if job:
                    job.threads = threads
Functions
__contains__
__contains__(key: str) -> bool

Check if job exists by key.

Source code in snakesee/state/job_registry.py
def __contains__(self, key: str) -> bool:
    """Check if job exists by key."""
    with self._lock:
        return key in self._jobs
__init__
__init__() -> None

Initialize empty registry.

Source code in snakesee/state/job_registry.py
def __init__(self) -> None:
    """Initialize empty registry."""
    self._lock = threading.RLock()
    self._jobs: dict[str, Job] = {}
    self._by_job_id: dict[str, str] = {}  # job_id -> key
    self._by_status: dict[JobStatus, set[str]] = {status: set() for status in JobStatus}
    self._by_rule: dict[str, set[str]] = {}
__len__
__len__() -> int

Return number of jobs in registry.

Source code in snakesee/state/job_registry.py
def __len__(self) -> int:
    """Return number of jobs in registry."""
    with self._lock:
        return len(self._jobs)
add
add(job: Job) -> None

Add a job to the registry.

Source code in snakesee/state/job_registry.py
def add(self, job: Job) -> None:
    """Add a job to the registry."""
    with self._lock:
        self._add_unlocked(job)
all_jobs
all_jobs() -> list[Job]

Get all jobs in the registry.

Source code in snakesee/state/job_registry.py
def all_jobs(self) -> list[Job]:
    """Get all jobs in the registry."""
    with self._lock:
        return list(self._jobs.values())
apply_event
apply_event(event: SnakeseeEvent) -> Job | None

Apply a snakesee event to update job state.

Parameters:

Name Type Description Default
event SnakeseeEvent

The event to apply.

required

Returns:

Type Description
Job | None

The updated Job, or None if event couldn't be applied.

Source code in snakesee/state/job_registry.py
def apply_event(self, event: SnakeseeEvent) -> Job | None:
    """Apply a snakesee event to update job state.

    Args:
        event: The event to apply.

    Returns:
        The updated Job, or None if event couldn't be applied.
    """
    from snakesee.events import EventType

    # Ignore non-job events (e.g., WORKFLOW_STARTED, PROGRESS)
    # These don't have job_id/rule_name and would create synthetic "unknown" jobs
    if event.event_type not in {
        EventType.JOB_SUBMITTED,
        EventType.JOB_STARTED,
        EventType.JOB_FINISHED,
        EventType.JOB_ERROR,
    }:
        return None

    with self._lock:
        # Get rule name from event
        rule_name = event.rule_name or "unknown"

        # Use job_id as key if available (convert int to str for key)
        job_id_str = str(event.job_id) if event.job_id is not None else None
        key = job_id_str or f"{rule_name}:{event.timestamp}"

        # Look up job without additional lock acquisition
        job = self._jobs.get(key)
        if job is None and job_id_str:
            existing_key = self._by_job_id.get(job_id_str)
            if existing_key:
                job = self._jobs.get(existing_key)

        if job is None:
            # Create new job from event
            job = Job(key=key, rule=rule_name)
            self._add_unlocked(job)
            if job_id_str:
                job.job_id = job_id_str
                self._by_job_id[job_id_str] = key
            if event.wildcards:
                job.wildcards = dict(event.wildcards)
            if event.threads:
                job.threads = event.threads

        old_status = job.status

        # Update based on event type
        if event.event_type == EventType.JOB_SUBMITTED:
            job.status = JobStatus.SUBMITTED
        elif event.event_type == EventType.JOB_STARTED:
            job.status = JobStatus.RUNNING
            job.start_time = event.timestamp
            if event.threads:
                job.threads = event.threads
        elif event.event_type == EventType.JOB_FINISHED:
            job.status = JobStatus.COMPLETED
            job.end_time = event.timestamp
        elif event.event_type == EventType.JOB_ERROR:
            job.status = JobStatus.FAILED
            job.end_time = event.timestamp

        self._update_indexes_unlocked(job, old_status)
        return job
apply_job_info
apply_job_info(job_info: JobInfo, key: str | None = None) -> Job

Add or update a job from a JobInfo.

Parameters:

Name Type Description Default
job_info JobInfo

JobInfo to apply.

required
key str | None

Optional key. If None, uses job_id or generates one.

None

Returns:

Type Description
Job

The created or updated Job.

Source code in snakesee/state/job_registry.py
def apply_job_info(self, job_info: JobInfo, key: str | None = None) -> Job:
    """Add or update a job from a JobInfo.

    Args:
        job_info: JobInfo to apply.
        key: Optional key. If None, uses job_id or generates one.

    Returns:
        The created or updated Job.
    """
    with self._lock:
        # Determine key
        if key is None:
            key = job_info.job_id or f"{job_info.rule}:{hash(job_info)}"

        # Look up existing job without additional lock acquisition
        existing = self._jobs.get(key)
        if existing is None and job_info.job_id:
            existing_key = self._by_job_id.get(job_info.job_id)
            if existing_key:
                existing = self._jobs.get(existing_key)

        if existing:
            # Update existing job
            old_status = existing.status
            if job_info.start_time is not None:
                existing.start_time = job_info.start_time
            if job_info.end_time is not None:
                existing.end_time = job_info.end_time
                existing.status = JobStatus.COMPLETED
            elif job_info.start_time is not None:
                existing.status = JobStatus.RUNNING
            if job_info.threads is not None:
                existing.threads = job_info.threads
            if job_info.wildcards:
                existing.wildcards = dict(job_info.wildcards)
            if job_info.input_size is not None:
                existing.input_size = job_info.input_size
            if job_info.log_file is not None:
                existing.log_file = job_info.log_file
            if job_info.job_id is not None and existing.job_id is None:
                existing.job_id = job_info.job_id
            self._update_indexes_unlocked(existing, old_status)
            return existing
        else:
            # Create new job
            job = Job.from_job_info(job_info, key)
            self._add_unlocked(job)
            return job
by_rule
by_rule(rule: str) -> list[Job]

Get all jobs for a specific rule.

Source code in snakesee/state/job_registry.py
def by_rule(self, rule: str) -> list[Job]:
    """Get all jobs for a specific rule."""
    with self._lock:
        keys = self._by_rule.get(rule, set())
        return [self._jobs[key] for key in keys]
clear
clear() -> None

Clear all jobs from the registry.

Source code in snakesee/state/job_registry.py
def clear(self) -> None:
    """Clear all jobs from the registry."""
    with self._lock:
        self._jobs.clear()
        self._by_job_id.clear()
        for status_set in self._by_status.values():
            status_set.clear()
        self._by_rule.clear()
completed
completed() -> list[Job]

Get all completed jobs.

Source code in snakesee/state/job_registry.py
def completed(self) -> list[Job]:
    """Get all completed jobs."""
    with self._lock:
        return [self._jobs[key] for key in self._by_status[JobStatus.COMPLETED]]
completed_job_infos
completed_job_infos() -> list[JobInfo]

Get completed jobs as JobInfo for backward compatibility.

Source code in snakesee/state/job_registry.py
def completed_job_infos(self) -> list[JobInfo]:
    """Get completed jobs as JobInfo for backward compatibility."""
    return [job.to_job_info() for job in self.completed()]
failed
failed() -> list[Job]

Get all failed jobs.

Source code in snakesee/state/job_registry.py
def failed(self) -> list[Job]:
    """Get all failed jobs."""
    with self._lock:
        return [self._jobs[key] for key in self._by_status[JobStatus.FAILED]]
failed_job_infos
failed_job_infos() -> list[JobInfo]

Get failed jobs as JobInfo for backward compatibility.

Source code in snakesee/state/job_registry.py
def failed_job_infos(self) -> list[JobInfo]:
    """Get failed jobs as JobInfo for backward compatibility."""
    return [job.to_job_info() for job in self.failed()]
get
get(key: str) -> Job | None

Get job by key.

Source code in snakesee/state/job_registry.py
def get(self, key: str) -> Job | None:
    """Get job by key."""
    with self._lock:
        return self._jobs.get(key)
get_by_job_id
get_by_job_id(job_id: str) -> Job | None

Get job by Snakemake job_id.

Source code in snakesee/state/job_registry.py
def get_by_job_id(self, job_id: str) -> Job | None:
    """Get job by Snakemake job_id."""
    with self._lock:
        key = self._by_job_id.get(job_id)
        return self._jobs.get(key) if key else None
get_or_create
get_or_create(key: str, rule: str) -> tuple[Job, bool]

Get existing job or create a new one.

Parameters:

Name Type Description Default
key str

Unique job key.

required
rule str

Rule name for new job.

required

Returns:

Type Description
tuple[Job, bool]

Tuple of (job, created) where created is True if job was new.

Source code in snakesee/state/job_registry.py
def get_or_create(self, key: str, rule: str) -> tuple[Job, bool]:
    """Get existing job or create a new one.

    Args:
        key: Unique job key.
        rule: Rule name for new job.

    Returns:
        Tuple of (job, created) where created is True if job was new.
    """
    with self._lock:
        if key in self._jobs:
            return self._jobs[key], False

        job = Job(key=key, rule=rule)
        self._add_unlocked(job)
        return job, True
incomplete
incomplete() -> list[Job]

Get all incomplete jobs.

Source code in snakesee/state/job_registry.py
def incomplete(self) -> list[Job]:
    """Get all incomplete jobs."""
    with self._lock:
        return [self._jobs[key] for key in self._by_status[JobStatus.INCOMPLETE]]
pending
pending() -> list[Job]

Get all pending jobs (not yet submitted).

Source code in snakesee/state/job_registry.py
def pending(self) -> list[Job]:
    """Get all pending jobs (not yet submitted)."""
    with self._lock:
        return [self._jobs[key] for key in self._by_status[JobStatus.PENDING]]
running
running() -> list[Job]

Get all running jobs.

Source code in snakesee/state/job_registry.py
def running(self) -> list[Job]:
    """Get all running jobs."""
    with self._lock:
        return [self._jobs[key] for key in self._by_status[JobStatus.RUNNING]]
running_job_infos
running_job_infos() -> list[JobInfo]

Get running jobs as JobInfo for backward compatibility.

Source code in snakesee/state/job_registry.py
def running_job_infos(self) -> list[JobInfo]:
    """Get running jobs as JobInfo for backward compatibility."""
    return [job.to_job_info() for job in self.running()]
set_status
set_status(job: Job, status: JobStatus) -> None

Update job status and indexes.

Source code in snakesee/state/job_registry.py
def set_status(self, job: Job, status: JobStatus) -> None:
    """Update job status and indexes."""
    with self._lock:
        old_status = job.status
        job.status = status
        self._update_indexes_unlocked(job, old_status)
store_threads
store_threads(job_id: str, threads: int) -> None

Store thread count for a job by job_id.

This is used when thread info comes from events before the job is fully tracked.

Source code in snakesee/state/job_registry.py
def store_threads(self, job_id: str, threads: int) -> None:
    """Store thread count for a job by job_id.

    This is used when thread info comes from events before the job
    is fully tracked.
    """
    with self._lock:
        key = self._by_job_id.get(job_id)
        if key:
            job = self._jobs.get(key)
            if job:
                job.threads = threads
submitted
submitted() -> list[Job]

Get all submitted jobs that haven't started yet.

Source code in snakesee/state/job_registry.py
def submitted(self) -> list[Job]:
    """Get all submitted jobs that haven't started yet."""
    with self._lock:
        return [self._jobs[key] for key in self._by_status[JobStatus.SUBMITTED]]
submitted_job_infos
submitted_job_infos() -> list[JobInfo]

Get submitted jobs as JobInfo for backward compatibility.

Source code in snakesee/state/job_registry.py
def submitted_job_infos(self) -> list[JobInfo]:
    """Get submitted jobs as JobInfo for backward compatibility."""
    return [job.to_job_info() for job in self.submitted()]
update_indexes
update_indexes(job: Job, old_status: JobStatus | None = None) -> None

Update indexes after job state change.

Parameters:

Name Type Description Default
job Job

The job that was updated.

required
old_status JobStatus | None

Previous status if status changed, for index update.

None
Source code in snakesee/state/job_registry.py
def update_indexes(self, job: Job, old_status: JobStatus | None = None) -> None:
    """Update indexes after job state change.

    Args:
        job: The job that was updated.
        old_status: Previous status if status changed, for index update.
    """
    with self._lock:
        self._update_indexes_unlocked(job, old_status)
JobStatus

Bases: Enum

Status of a job in the workflow.

Source code in snakesee/state/job_registry.py
class JobStatus(Enum):
    """Status of a job in the workflow."""

    PENDING = "pending"
    SUBMITTED = "submitted"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    INCOMPLETE = "incomplete"

paths

Centralized path management for Snakemake workflow directories.

This module provides the WorkflowPaths class which centralizes all path construction for Snakemake workflows, eliminating ad-hoc path construction scattered across multiple functions.

Classes

WorkflowPaths dataclass

Centralized path management for Snakemake workflow directories.

This frozen dataclass provides computed paths for all standard Snakemake directory locations, eliminating ad-hoc path construction.

Attributes:

Name Type Description
workflow_dir Path

Root directory of the workflow (contains .snakemake/).

Example

paths = WorkflowPaths(Path("/my/workflow"))

Access computed paths

if paths.metadata_dir.exists(): for f in paths.get_metadata_files(): process(f)

Find logs

latest = paths.find_latest_log() all_logs = paths.find_all_logs()

Source code in snakesee/state/paths.py
@dataclass(frozen=True)
class WorkflowPaths:
    """Centralized path management for Snakemake workflow directories.

    This frozen dataclass provides computed paths for all standard
    Snakemake directory locations, eliminating ad-hoc path construction.

    Attributes:
        workflow_dir: Root directory of the workflow (contains .snakemake/).

    Example:
        paths = WorkflowPaths(Path("/my/workflow"))

        # Access computed paths
        if paths.metadata_dir.exists():
            for f in paths.get_metadata_files():
                process(f)

        # Find logs
        latest = paths.find_latest_log()
        all_logs = paths.find_all_logs()
    """

    workflow_dir: Path

    # =========================================================================
    # Core directory properties
    # =========================================================================

    @property
    def snakemake_dir(self) -> Path:
        """Path to .snakemake/ directory."""
        return self.workflow_dir / SNAKEMAKE_DIR

    @property
    def metadata_dir(self) -> Path:
        """Path to .snakemake/metadata/ directory."""
        return self.snakemake_dir / METADATA_DIR

    @property
    def log_dir(self) -> Path:
        """Path to .snakemake/log/ directory."""
        return self.snakemake_dir / LOG_DIR

    @property
    def incomplete_dir(self) -> Path:
        """Path to .snakemake/incomplete/ directory."""
        return self.snakemake_dir / INCOMPLETE_DIR

    @property
    def locks_dir(self) -> Path:
        """Path to .snakemake/locks/ directory."""
        return self.snakemake_dir / LOCKS_DIR

    @property
    def metadata_db(self) -> Path:
        """Path to Snakemake's SQLite metadata database."""
        return self.snakemake_dir / METADATA_DB_NAME

    # =========================================================================
    # Event and validation file paths
    # =========================================================================

    @property
    def events_file(self) -> Path:
        """Path to snakesee events file (.snakesee_events.jsonl)."""
        return self.workflow_dir / EVENT_FILE_NAME

    @property
    def validation_log(self) -> Path:
        """Path to validation log file (.snakesee_validation.log)."""
        return self.workflow_dir / VALIDATION_LOG_NAME

    @property
    def default_profile(self) -> Path:
        """Path to default profile file (.snakesee-profile.json)."""
        return self.workflow_dir / DEFAULT_PROFILE_NAME

    # =========================================================================
    # Existence checks
    # =========================================================================

    @property
    def exists(self) -> bool:
        """Check if this is a valid workflow directory."""
        return _cached_exists(self.snakemake_dir)

    @property
    def has_metadata(self) -> bool:
        """Check if metadata directory exists."""
        return _cached_exists(self.metadata_dir)

    @property
    def has_metadata_db(self) -> bool:
        """Whether a SQLite metadata database exists."""
        return _cached_exists(self.metadata_db)

    @property
    def has_logs(self) -> bool:
        """Check if log directory exists and contains logs."""
        if not _cached_exists(self.log_dir):
            return False
        return any(self.log_dir.glob(LOG_GLOB_PATTERN))

    @property
    def has_events(self) -> bool:
        """Check if events file exists and has content."""
        try:
            return self.events_file.stat().st_size > 0
        except OSError:
            return False

    @property
    def has_locks(self) -> bool:
        """Check if locks directory exists and contains files."""
        if not _cached_exists(self.locks_dir):
            return False
        try:
            return any(self.locks_dir.iterdir())
        except OSError:
            return False

    @property
    def has_incomplete(self) -> bool:
        """Check if incomplete directory exists and contains markers."""
        if not _cached_exists(self.incomplete_dir):
            return False
        try:
            return any(self.incomplete_dir.iterdir())
        except OSError:
            return False

    # =========================================================================
    # Log file discovery
    # =========================================================================

    def find_latest_log(self) -> Path | None:
        """Find the most recent snakemake log file.

        Returns:
            Path to the most recent log file, or None if no logs exist.
        """
        if not _cached_exists(self.log_dir):
            return None
        # Files from glob already exist at time of iteration; no need to re-check
        logs = list(self.log_dir.glob(LOG_GLOB_PATTERN))
        if not logs:
            return None
        logs.sort(key=safe_mtime)
        return logs[-1]

    def find_all_logs(self) -> list[Path]:
        """Find all snakemake log files, sorted by modification time.

        Returns:
            List of paths sorted oldest to newest.
        """
        if not _cached_exists(self.log_dir):
            return []
        # Files from glob already exist at time of iteration; no need to re-check
        logs = list(self.log_dir.glob(LOG_GLOB_PATTERN))
        logs.sort(key=safe_mtime)
        return logs

    def find_logs_sorted_newest_first(self) -> list[Path]:
        """Find all snakemake log files, sorted newest first.

        Returns:
            List of paths sorted newest to oldest.
        """
        logs = self.find_all_logs()
        logs.reverse()
        return logs

    # =========================================================================
    # Metadata file discovery
    # =========================================================================

    def get_metadata_files(self) -> Iterator[Path]:
        """Iterate over all metadata files.

        Yields:
            Path to each metadata file.
        """
        if not _cached_exists(self.metadata_dir):
            return
        for f in self.metadata_dir.rglob("*"):
            if f.is_file():
                yield f

    def count_metadata_files(self) -> int:
        """Count the number of metadata files.

        Returns:
            Number of metadata files.
        """
        if not _cached_exists(self.metadata_dir):
            return 0
        return sum(1 for f in self.metadata_dir.rglob("*") if f.is_file())

    # =========================================================================
    # Incomplete marker handling
    # =========================================================================

    def get_incomplete_markers(self) -> Iterator[Path]:
        """Iterate over incomplete job markers.

        Yields:
            Path to each incomplete marker file.
        """
        if not _cached_exists(self.incomplete_dir):
            return
        for marker in self.incomplete_dir.rglob("*"):
            if marker.is_file() and marker.name != "migration_underway":
                yield marker

    def decode_incomplete_marker(self, marker: Path) -> Path | None:
        """Decode an incomplete marker filename to get the output path.

        Args:
            marker: Path to the marker file.

        Returns:
            Decoded output file path, or None if decoding fails.
        """
        try:
            decoded = base64.b64decode(marker.name).decode("utf-8")
            return Path(decoded)
        except (ValueError, UnicodeDecodeError):
            return None

    # =========================================================================
    # Job log discovery
    # =========================================================================

    def get_job_log(
        self,
        rule: str,
        wildcards: dict[str, str] | None = None,
        job_id: int | str | None = None,
    ) -> Path | None:
        """Find the log file for a specific job.

        Searches common log locations for a file matching the rule
        and optional wildcards/job_id.

        Args:
            rule: Name of the rule.
            wildcards: Optional wildcard values.
            job_id: Optional job ID.

        Returns:
            Path to the log file if found, None otherwise.
        """
        search_paths: list[Path] = []

        # .snakemake/log/ directory
        if _cached_exists(self.log_dir):
            search_paths.extend(self.log_dir.glob(f"*{rule}*"))
            if job_id is not None:
                search_paths.extend(self.log_dir.glob(f"*job{job_id}*"))

        # logs/ directory (common convention)
        logs_dir = self.workflow_dir / "logs"
        search_paths.extend(self._search_log_dir(logs_dir, rule, wildcards))

        # log/ directory (another common convention)
        log_dir = self.workflow_dir / "log"
        search_paths.extend(self._search_log_dir(log_dir, rule, wildcards))

        # Sort by modification time (newest first) and return first match
        # is_file() already confirms existence, no need for additional exists check
        existing_logs = [p for p in search_paths if p.is_file()]
        if existing_logs:
            existing_logs.sort(key=safe_mtime, reverse=True)
            return existing_logs[0]

        return None

    def _search_log_dir(
        self,
        log_dir: Path,
        rule: str,
        wildcards: dict[str, str] | None,
    ) -> list[Path]:
        """Search a log directory for matching logs."""
        paths: list[Path] = []
        if not _cached_exists(log_dir):
            return paths

        paths.extend(log_dir.glob(f"**/{rule}*"))

        rule_log_dir = log_dir / rule
        if _cached_exists(rule_log_dir):
            paths.extend(rule_log_dir.glob("*"))

        if wildcards:
            for wc_value in wildcards.values():
                if wc_value:
                    paths.extend(log_dir.glob(f"**/*{wc_value}*"))

        return paths

    # =========================================================================
    # Profile discovery
    # =========================================================================

    def find_profile(self, max_levels: int = 6) -> Path | None:
        """Search for a profile file in workflow and parent directories.

        Args:
            max_levels: Maximum parent levels to search (including current).

        Returns:
            Path to the found profile, or None if not found.
        """
        current = self.workflow_dir.resolve()
        for _ in range(max_levels):
            profile_path = current / DEFAULT_PROFILE_NAME
            if _cached_exists(profile_path):
                return profile_path
            if current.parent == current:
                break
            current = current.parent
        return None

    # =========================================================================
    # Validation
    # =========================================================================

    def validate(self) -> None:
        """Validate that this is a valid workflow directory.

        Raises:
            ValueError: If .snakemake directory doesn't exist.
        """
        if not _cached_exists(self.snakemake_dir):
            raise ValueError(f"No .snakemake directory found in {self.workflow_dir}")
Attributes
default_profile property
default_profile: Path

Path to default profile file (.snakesee-profile.json).

events_file property
events_file: Path

Path to snakesee events file (.snakesee_events.jsonl).

exists property
exists: bool

Check if this is a valid workflow directory.

has_events property
has_events: bool

Check if events file exists and has content.

has_incomplete property
has_incomplete: bool

Check if incomplete directory exists and contains markers.

has_locks property
has_locks: bool

Check if locks directory exists and contains files.

has_logs property
has_logs: bool

Check if log directory exists and contains logs.

has_metadata property
has_metadata: bool

Check if metadata directory exists.

has_metadata_db property
has_metadata_db: bool

Whether a SQLite metadata database exists.

incomplete_dir property
incomplete_dir: Path

Path to .snakemake/incomplete/ directory.

locks_dir property
locks_dir: Path

Path to .snakemake/locks/ directory.

log_dir property
log_dir: Path

Path to .snakemake/log/ directory.

metadata_db property
metadata_db: Path

Path to Snakemake's SQLite metadata database.

metadata_dir property
metadata_dir: Path

Path to .snakemake/metadata/ directory.

snakemake_dir property
snakemake_dir: Path

Path to .snakemake/ directory.

validation_log property
validation_log: Path

Path to validation log file (.snakesee_validation.log).

Functions
count_metadata_files
count_metadata_files() -> int

Count the number of metadata files.

Returns:

Type Description
int

Number of metadata files.

Source code in snakesee/state/paths.py
def count_metadata_files(self) -> int:
    """Count the number of metadata files.

    Returns:
        Number of metadata files.
    """
    if not _cached_exists(self.metadata_dir):
        return 0
    return sum(1 for f in self.metadata_dir.rglob("*") if f.is_file())
decode_incomplete_marker
decode_incomplete_marker(marker: Path) -> Path | None

Decode an incomplete marker filename to get the output path.

Parameters:

Name Type Description Default
marker Path

Path to the marker file.

required

Returns:

Type Description
Path | None

Decoded output file path, or None if decoding fails.

Source code in snakesee/state/paths.py
def decode_incomplete_marker(self, marker: Path) -> Path | None:
    """Decode an incomplete marker filename to get the output path.

    Args:
        marker: Path to the marker file.

    Returns:
        Decoded output file path, or None if decoding fails.
    """
    try:
        decoded = base64.b64decode(marker.name).decode("utf-8")
        return Path(decoded)
    except (ValueError, UnicodeDecodeError):
        return None
find_all_logs
find_all_logs() -> list[Path]

Find all snakemake log files, sorted by modification time.

Returns:

Type Description
list[Path]

List of paths sorted oldest to newest.

Source code in snakesee/state/paths.py
def find_all_logs(self) -> list[Path]:
    """Find all snakemake log files, sorted by modification time.

    Returns:
        List of paths sorted oldest to newest.
    """
    if not _cached_exists(self.log_dir):
        return []
    # Files from glob already exist at time of iteration; no need to re-check
    logs = list(self.log_dir.glob(LOG_GLOB_PATTERN))
    logs.sort(key=safe_mtime)
    return logs
find_latest_log
find_latest_log() -> Path | None

Find the most recent snakemake log file.

Returns:

Type Description
Path | None

Path to the most recent log file, or None if no logs exist.

Source code in snakesee/state/paths.py
def find_latest_log(self) -> Path | None:
    """Find the most recent snakemake log file.

    Returns:
        Path to the most recent log file, or None if no logs exist.
    """
    if not _cached_exists(self.log_dir):
        return None
    # Files from glob already exist at time of iteration; no need to re-check
    logs = list(self.log_dir.glob(LOG_GLOB_PATTERN))
    if not logs:
        return None
    logs.sort(key=safe_mtime)
    return logs[-1]
find_logs_sorted_newest_first
find_logs_sorted_newest_first() -> list[Path]

Find all snakemake log files, sorted newest first.

Returns:

Type Description
list[Path]

List of paths sorted newest to oldest.

Source code in snakesee/state/paths.py
def find_logs_sorted_newest_first(self) -> list[Path]:
    """Find all snakemake log files, sorted newest first.

    Returns:
        List of paths sorted newest to oldest.
    """
    logs = self.find_all_logs()
    logs.reverse()
    return logs
find_profile
find_profile(max_levels: int = 6) -> Path | None

Search for a profile file in workflow and parent directories.

Parameters:

Name Type Description Default
max_levels int

Maximum parent levels to search (including current).

6

Returns:

Type Description
Path | None

Path to the found profile, or None if not found.

Source code in snakesee/state/paths.py
def find_profile(self, max_levels: int = 6) -> Path | None:
    """Search for a profile file in workflow and parent directories.

    Args:
        max_levels: Maximum parent levels to search (including current).

    Returns:
        Path to the found profile, or None if not found.
    """
    current = self.workflow_dir.resolve()
    for _ in range(max_levels):
        profile_path = current / DEFAULT_PROFILE_NAME
        if _cached_exists(profile_path):
            return profile_path
        if current.parent == current:
            break
        current = current.parent
    return None
get_incomplete_markers
get_incomplete_markers() -> Iterator[Path]

Iterate over incomplete job markers.

Yields:

Type Description
Path

Path to each incomplete marker file.

Source code in snakesee/state/paths.py
def get_incomplete_markers(self) -> Iterator[Path]:
    """Iterate over incomplete job markers.

    Yields:
        Path to each incomplete marker file.
    """
    if not _cached_exists(self.incomplete_dir):
        return
    for marker in self.incomplete_dir.rglob("*"):
        if marker.is_file() and marker.name != "migration_underway":
            yield marker
get_job_log
get_job_log(rule: str, wildcards: dict[str, str] | None = None, job_id: int | str | None = None) -> Path | None

Find the log file for a specific job.

Searches common log locations for a file matching the rule and optional wildcards/job_id.

Parameters:

Name Type Description Default
rule str

Name of the rule.

required
wildcards dict[str, str] | None

Optional wildcard values.

None
job_id int | str | None

Optional job ID.

None

Returns:

Type Description
Path | None

Path to the log file if found, None otherwise.

Source code in snakesee/state/paths.py
def get_job_log(
    self,
    rule: str,
    wildcards: dict[str, str] | None = None,
    job_id: int | str | None = None,
) -> Path | None:
    """Find the log file for a specific job.

    Searches common log locations for a file matching the rule
    and optional wildcards/job_id.

    Args:
        rule: Name of the rule.
        wildcards: Optional wildcard values.
        job_id: Optional job ID.

    Returns:
        Path to the log file if found, None otherwise.
    """
    search_paths: list[Path] = []

    # .snakemake/log/ directory
    if _cached_exists(self.log_dir):
        search_paths.extend(self.log_dir.glob(f"*{rule}*"))
        if job_id is not None:
            search_paths.extend(self.log_dir.glob(f"*job{job_id}*"))

    # logs/ directory (common convention)
    logs_dir = self.workflow_dir / "logs"
    search_paths.extend(self._search_log_dir(logs_dir, rule, wildcards))

    # log/ directory (another common convention)
    log_dir = self.workflow_dir / "log"
    search_paths.extend(self._search_log_dir(log_dir, rule, wildcards))

    # Sort by modification time (newest first) and return first match
    # is_file() already confirms existence, no need for additional exists check
    existing_logs = [p for p in search_paths if p.is_file()]
    if existing_logs:
        existing_logs.sort(key=safe_mtime, reverse=True)
        return existing_logs[0]

    return None
get_metadata_files
get_metadata_files() -> Iterator[Path]

Iterate over all metadata files.

Yields:

Type Description
Path

Path to each metadata file.

Source code in snakesee/state/paths.py
def get_metadata_files(self) -> Iterator[Path]:
    """Iterate over all metadata files.

    Yields:
        Path to each metadata file.
    """
    if not _cached_exists(self.metadata_dir):
        return
    for f in self.metadata_dir.rglob("*"):
        if f.is_file():
            yield f
validate
validate() -> None

Validate that this is a valid workflow directory.

Raises:

Type Description
ValueError

If .snakemake directory doesn't exist.

Source code in snakesee/state/paths.py
def validate(self) -> None:
    """Validate that this is a valid workflow directory.

    Raises:
        ValueError: If .snakemake directory doesn't exist.
    """
    if not _cached_exists(self.snakemake_dir):
        raise ValueError(f"No .snakemake directory found in {self.workflow_dir}")

Functions

clear_exists_cache
clear_exists_cache() -> None

Clear the filesystem existence cache.

Useful for testing or after significant filesystem operations.

Source code in snakesee/state/paths.py
def clear_exists_cache() -> None:
    """Clear the filesystem existence cache.

    Useful for testing or after significant filesystem operations.
    """
    _exists_cache.clear()

rule_registry

Rule registry for centralized rule timing statistics.

This module provides a single source of truth for all rule timing data, consolidating aggregate, thread-level, and wildcard-level statistics.

Classes

RuleRegistry

Central registry for all rule timing statistics.

Consolidates what was previously scattered across TimeEstimator's rule_stats, thread_stats, and wildcard_stats dictionaries.

Example

registry = RuleRegistry() stats = registry.get_or_create("align") stats.record_completion(duration=100.0, timestamp=time.time(), threads=4) mean, var = registry.get_estimate("align", threads=4)

Source code in snakesee/state/rule_registry.py
class RuleRegistry:
    """Central registry for all rule timing statistics.

    Consolidates what was previously scattered across TimeEstimator's
    rule_stats, thread_stats, and wildcard_stats dictionaries.

    Example:
        >>> registry = RuleRegistry()
        >>> stats = registry.get_or_create("align")
        >>> stats.record_completion(duration=100.0, timestamp=time.time(), threads=4)
        >>> mean, var = registry.get_estimate("align", threads=4)
    """

    def __init__(self, config: EstimationConfig | None = None) -> None:
        """Initialize empty registry.

        Args:
            config: Optional estimation configuration.
        """
        from snakesee.state.config import DEFAULT_CONFIG

        self.config = config or DEFAULT_CONFIG
        self._lock = threading.RLock()
        self._rules: dict[str, RuleStatistics] = {}
        self._global_mean_cache: float | None = None
        self._cache_valid: bool = False

    def __len__(self) -> int:
        """Return number of rules in registry."""
        with self._lock:
            return len(self._rules)

    def __contains__(self, rule: str) -> bool:
        """Check if rule exists in registry."""
        with self._lock:
            return rule in self._rules

    def get(self, rule: str) -> RuleStatistics | None:
        """Get statistics for a rule."""
        with self._lock:
            return self._rules.get(rule)

    def get_or_create(self, rule: str) -> RuleStatistics:
        """Get existing statistics or create new ones.

        Args:
            rule: Rule name.

        Returns:
            RuleStatistics for the rule.
        """
        with self._lock:
            if rule not in self._rules:
                self._rules[rule] = RuleStatistics(rule=rule)
                self._cache_valid = False
            return self._rules[rule]

    def record_completion(
        self,
        rule: str,
        duration: float,
        timestamp: float,
        threads: int | None = None,
        wildcards: dict[str, str] | None = None,
        input_size: int | None = None,
    ) -> None:
        """Record a job completion.

        Args:
            rule: Rule name.
            duration: Job duration in seconds.
            timestamp: Completion timestamp.
            threads: Number of threads used.
            wildcards: Wildcard values.
            input_size: Input file size.
        """
        with self._lock:
            if rule not in self._rules:
                self._rules[rule] = RuleStatistics(rule=rule)
            stats = self._rules[rule]
            stats.record_completion(duration, timestamp, threads, wildcards, input_size)
            self._cache_valid = False

    def record_job_completion(self, job: Job) -> None:
        """Record completion from a Job object.

        Args:
            job: Completed job with timing data.
        """
        if job.duration is None:
            return

        # record_completion already acquires lock
        self.record_completion(
            rule=job.rule,
            duration=job.duration,
            timestamp=job.end_time or 0.0,
            threads=job.threads,
            wildcards=job.wildcards if job.wildcards else None,
            input_size=job.input_size,
        )

    def global_mean_duration(self) -> float:
        """Get global mean duration across all rules.

        Returns:
            Mean duration in seconds, or config default if no data.
        """
        with self._lock:
            if self._cache_valid and self._global_mean_cache is not None:
                return self._global_mean_cache

            all_durations: list[float] = []
            for stats in self._rules.values():
                all_durations.extend(stats.aggregate.durations)

            if all_durations:
                self._global_mean_cache = sum(all_durations) / len(all_durations)
            else:
                self._global_mean_cache = self.config.default_global_mean

            self._cache_valid = True
            return self._global_mean_cache

    def set_expected_counts(self, counts: dict[str, int]) -> None:
        """Set expected job counts for rules.

        Args:
            counts: Dictionary of rule name to expected count.
        """
        with self._lock:
            for rule, count in counts.items():
                if rule not in self._rules:
                    self._rules[rule] = RuleStatistics(rule=rule)
                self._rules[rule].expected_count = count

    def clear(self) -> None:
        """Clear all statistics."""
        with self._lock:
            self._rules.clear()
            self._cache_valid = False
            self._global_mean_cache = None

    # Backward compatibility methods

    def to_rule_stats_dict(self) -> dict[str, RuleTimingStats]:
        """Convert to dict for backward compatibility with TimeEstimator."""
        with self._lock:
            return {name: stats.aggregate for name, stats in self._rules.items()}

    def to_thread_stats_dict(self) -> dict[str, ThreadTimingStats]:
        """Convert to thread stats dict for backward compatibility."""
        with self._lock:
            result: dict[str, ThreadTimingStats] = {}
            for name, stats in self._rules.items():
                if stats.by_threads is not None:
                    result[name] = stats.by_threads
            return result

    def to_wildcard_stats_dict(self) -> dict[str, dict[str, WildcardTimingStats]]:
        """Convert to wildcard stats dict for backward compatibility."""
        with self._lock:
            result: dict[str, dict[str, WildcardTimingStats]] = {}
            for name, stats in self._rules.items():
                if stats.by_wildcard:
                    result[name] = stats.by_wildcard
            return result

    def load_from_rule_stats(
        self,
        rule_stats: dict[str, RuleTimingStats],
        thread_stats: dict[str, ThreadTimingStats] | None = None,
        wildcard_stats: dict[str, dict[str, WildcardTimingStats]] | None = None,
    ) -> None:
        """Load from existing stats dictionaries.

        Args:
            rule_stats: Aggregate rule timing stats.
            thread_stats: Thread-specific stats.
            wildcard_stats: Wildcard-specific stats.
        """
        with self._lock:
            for rule, stats in rule_stats.items():
                if rule not in self._rules:
                    self._rules[rule] = RuleStatistics(rule=rule)
                self._rules[rule].aggregate = stats

            if thread_stats:
                for rule, tstats in thread_stats.items():
                    if rule not in self._rules:
                        self._rules[rule] = RuleStatistics(rule=rule)
                    self._rules[rule].by_threads = tstats

            if wildcard_stats:
                for rule, wstats in wildcard_stats.items():
                    if rule not in self._rules:
                        self._rules[rule] = RuleStatistics(rule=rule)
                    self._rules[rule].by_wildcard = wstats

            self._cache_valid = False

    def all_rules(self) -> list[str]:
        """Get all rule names."""
        with self._lock:
            return list(self._rules.keys())

    def get_combination_stats(
        self,
        rule: str,
        wildcards: dict[str, str] | None,
        threads: int | None,
        min_samples: int | None = None,
    ) -> RuleTimingStats | None:
        """Get timing stats for a specific wildcard+threads combination.

        Args:
            rule: Rule name.
            wildcards: Wildcard values dict.
            threads: Thread count.
            min_samples: Minimum samples required. If None, uses MIN_SAMPLES_FOR_CONDITIONING.
                        Set to 1 to get stats even with a single historical run.

        Returns:
            RuleTimingStats for the combination, or None if not found or insufficient samples.
        """
        from snakesee.constants import MIN_SAMPLES_FOR_CONDITIONING

        if min_samples is None:
            min_samples = MIN_SAMPLES_FOR_CONDITIONING

        combo_key = _make_combination_key(wildcards, threads)
        if combo_key is None:
            return None

        with self._lock:
            stats = self._rules.get(rule)
            if stats is None:
                return None

            combo_stats = stats.by_combination.get(combo_key)
            if combo_stats is None:
                return None

            # Require minimum samples for conditioning
            if combo_stats.count < min_samples:
                return None

            return combo_stats

    def typical_threads(self, rule: str) -> float:
        """Get the typical thread count for a rule.

        Args:
            rule: Rule name.

        Returns:
            Weighted average thread count, or 1.0 if no thread data.
        """
        with self._lock:
            stats = self._rules.get(rule)
            if stats is None:
                return 1.0
            return stats.typical_threads

    def total_sample_count(self) -> int:
        """Get total number of samples across all rules."""
        with self._lock:
            return sum(stats.aggregate.count for stats in self._rules.values())

    def rule_count(self) -> int:
        """Get number of rules in the registry."""
        with self._lock:
            return len(self._rules)
Functions
__contains__
__contains__(rule: str) -> bool

Check if rule exists in registry.

Source code in snakesee/state/rule_registry.py
def __contains__(self, rule: str) -> bool:
    """Check if rule exists in registry."""
    with self._lock:
        return rule in self._rules
__init__
__init__(config: EstimationConfig | None = None) -> None

Initialize empty registry.

Parameters:

Name Type Description Default
config EstimationConfig | None

Optional estimation configuration.

None
Source code in snakesee/state/rule_registry.py
def __init__(self, config: EstimationConfig | None = None) -> None:
    """Initialize empty registry.

    Args:
        config: Optional estimation configuration.
    """
    from snakesee.state.config import DEFAULT_CONFIG

    self.config = config or DEFAULT_CONFIG
    self._lock = threading.RLock()
    self._rules: dict[str, RuleStatistics] = {}
    self._global_mean_cache: float | None = None
    self._cache_valid: bool = False
__len__
__len__() -> int

Return number of rules in registry.

Source code in snakesee/state/rule_registry.py
def __len__(self) -> int:
    """Return number of rules in registry."""
    with self._lock:
        return len(self._rules)
all_rules
all_rules() -> list[str]

Get all rule names.

Source code in snakesee/state/rule_registry.py
def all_rules(self) -> list[str]:
    """Get all rule names."""
    with self._lock:
        return list(self._rules.keys())
clear
clear() -> None

Clear all statistics.

Source code in snakesee/state/rule_registry.py
def clear(self) -> None:
    """Clear all statistics."""
    with self._lock:
        self._rules.clear()
        self._cache_valid = False
        self._global_mean_cache = None
get
get(rule: str) -> RuleStatistics | None

Get statistics for a rule.

Source code in snakesee/state/rule_registry.py
def get(self, rule: str) -> RuleStatistics | None:
    """Get statistics for a rule."""
    with self._lock:
        return self._rules.get(rule)
get_combination_stats
get_combination_stats(rule: str, wildcards: dict[str, str] | None, threads: int | None, min_samples: int | None = None) -> RuleTimingStats | None

Get timing stats for a specific wildcard+threads combination.

Parameters:

Name Type Description Default
rule str

Rule name.

required
wildcards dict[str, str] | None

Wildcard values dict.

required
threads int | None

Thread count.

required
min_samples int | None

Minimum samples required. If None, uses MIN_SAMPLES_FOR_CONDITIONING. Set to 1 to get stats even with a single historical run.

None

Returns:

Type Description
RuleTimingStats | None

RuleTimingStats for the combination, or None if not found or insufficient samples.

Source code in snakesee/state/rule_registry.py
def get_combination_stats(
    self,
    rule: str,
    wildcards: dict[str, str] | None,
    threads: int | None,
    min_samples: int | None = None,
) -> RuleTimingStats | None:
    """Get timing stats for a specific wildcard+threads combination.

    Args:
        rule: Rule name.
        wildcards: Wildcard values dict.
        threads: Thread count.
        min_samples: Minimum samples required. If None, uses MIN_SAMPLES_FOR_CONDITIONING.
                    Set to 1 to get stats even with a single historical run.

    Returns:
        RuleTimingStats for the combination, or None if not found or insufficient samples.
    """
    from snakesee.constants import MIN_SAMPLES_FOR_CONDITIONING

    if min_samples is None:
        min_samples = MIN_SAMPLES_FOR_CONDITIONING

    combo_key = _make_combination_key(wildcards, threads)
    if combo_key is None:
        return None

    with self._lock:
        stats = self._rules.get(rule)
        if stats is None:
            return None

        combo_stats = stats.by_combination.get(combo_key)
        if combo_stats is None:
            return None

        # Require minimum samples for conditioning
        if combo_stats.count < min_samples:
            return None

        return combo_stats
get_or_create
get_or_create(rule: str) -> RuleStatistics

Get existing statistics or create new ones.

Parameters:

Name Type Description Default
rule str

Rule name.

required

Returns:

Type Description
RuleStatistics

RuleStatistics for the rule.

Source code in snakesee/state/rule_registry.py
def get_or_create(self, rule: str) -> RuleStatistics:
    """Get existing statistics or create new ones.

    Args:
        rule: Rule name.

    Returns:
        RuleStatistics for the rule.
    """
    with self._lock:
        if rule not in self._rules:
            self._rules[rule] = RuleStatistics(rule=rule)
            self._cache_valid = False
        return self._rules[rule]
global_mean_duration
global_mean_duration() -> float

Get global mean duration across all rules.

Returns:

Type Description
float

Mean duration in seconds, or config default if no data.

Source code in snakesee/state/rule_registry.py
def global_mean_duration(self) -> float:
    """Get global mean duration across all rules.

    Returns:
        Mean duration in seconds, or config default if no data.
    """
    with self._lock:
        if self._cache_valid and self._global_mean_cache is not None:
            return self._global_mean_cache

        all_durations: list[float] = []
        for stats in self._rules.values():
            all_durations.extend(stats.aggregate.durations)

        if all_durations:
            self._global_mean_cache = sum(all_durations) / len(all_durations)
        else:
            self._global_mean_cache = self.config.default_global_mean

        self._cache_valid = True
        return self._global_mean_cache
load_from_rule_stats
load_from_rule_stats(rule_stats: dict[str, RuleTimingStats], thread_stats: dict[str, ThreadTimingStats] | None = None, wildcard_stats: dict[str, dict[str, WildcardTimingStats]] | None = None) -> None

Load from existing stats dictionaries.

Parameters:

Name Type Description Default
rule_stats dict[str, RuleTimingStats]

Aggregate rule timing stats.

required
thread_stats dict[str, ThreadTimingStats] | None

Thread-specific stats.

None
wildcard_stats dict[str, dict[str, WildcardTimingStats]] | None

Wildcard-specific stats.

None
Source code in snakesee/state/rule_registry.py
def load_from_rule_stats(
    self,
    rule_stats: dict[str, RuleTimingStats],
    thread_stats: dict[str, ThreadTimingStats] | None = None,
    wildcard_stats: dict[str, dict[str, WildcardTimingStats]] | None = None,
) -> None:
    """Load from existing stats dictionaries.

    Args:
        rule_stats: Aggregate rule timing stats.
        thread_stats: Thread-specific stats.
        wildcard_stats: Wildcard-specific stats.
    """
    with self._lock:
        for rule, stats in rule_stats.items():
            if rule not in self._rules:
                self._rules[rule] = RuleStatistics(rule=rule)
            self._rules[rule].aggregate = stats

        if thread_stats:
            for rule, tstats in thread_stats.items():
                if rule not in self._rules:
                    self._rules[rule] = RuleStatistics(rule=rule)
                self._rules[rule].by_threads = tstats

        if wildcard_stats:
            for rule, wstats in wildcard_stats.items():
                if rule not in self._rules:
                    self._rules[rule] = RuleStatistics(rule=rule)
                self._rules[rule].by_wildcard = wstats

        self._cache_valid = False
record_completion
record_completion(rule: str, duration: float, timestamp: float, threads: int | None = None, wildcards: dict[str, str] | None = None, input_size: int | None = None) -> None

Record a job completion.

Parameters:

Name Type Description Default
rule str

Rule name.

required
duration float

Job duration in seconds.

required
timestamp float

Completion timestamp.

required
threads int | None

Number of threads used.

None
wildcards dict[str, str] | None

Wildcard values.

None
input_size int | None

Input file size.

None
Source code in snakesee/state/rule_registry.py
def record_completion(
    self,
    rule: str,
    duration: float,
    timestamp: float,
    threads: int | None = None,
    wildcards: dict[str, str] | None = None,
    input_size: int | None = None,
) -> None:
    """Record a job completion.

    Args:
        rule: Rule name.
        duration: Job duration in seconds.
        timestamp: Completion timestamp.
        threads: Number of threads used.
        wildcards: Wildcard values.
        input_size: Input file size.
    """
    with self._lock:
        if rule not in self._rules:
            self._rules[rule] = RuleStatistics(rule=rule)
        stats = self._rules[rule]
        stats.record_completion(duration, timestamp, threads, wildcards, input_size)
        self._cache_valid = False
record_job_completion
record_job_completion(job: Job) -> None

Record completion from a Job object.

Parameters:

Name Type Description Default
job Job

Completed job with timing data.

required
Source code in snakesee/state/rule_registry.py
def record_job_completion(self, job: Job) -> None:
    """Record completion from a Job object.

    Args:
        job: Completed job with timing data.
    """
    if job.duration is None:
        return

    # record_completion already acquires lock
    self.record_completion(
        rule=job.rule,
        duration=job.duration,
        timestamp=job.end_time or 0.0,
        threads=job.threads,
        wildcards=job.wildcards if job.wildcards else None,
        input_size=job.input_size,
    )
rule_count
rule_count() -> int

Get number of rules in the registry.

Source code in snakesee/state/rule_registry.py
def rule_count(self) -> int:
    """Get number of rules in the registry."""
    with self._lock:
        return len(self._rules)
set_expected_counts
set_expected_counts(counts: dict[str, int]) -> None

Set expected job counts for rules.

Parameters:

Name Type Description Default
counts dict[str, int]

Dictionary of rule name to expected count.

required
Source code in snakesee/state/rule_registry.py
def set_expected_counts(self, counts: dict[str, int]) -> None:
    """Set expected job counts for rules.

    Args:
        counts: Dictionary of rule name to expected count.
    """
    with self._lock:
        for rule, count in counts.items():
            if rule not in self._rules:
                self._rules[rule] = RuleStatistics(rule=rule)
            self._rules[rule].expected_count = count
to_rule_stats_dict
to_rule_stats_dict() -> dict[str, RuleTimingStats]

Convert to dict for backward compatibility with TimeEstimator.

Source code in snakesee/state/rule_registry.py
def to_rule_stats_dict(self) -> dict[str, RuleTimingStats]:
    """Convert to dict for backward compatibility with TimeEstimator."""
    with self._lock:
        return {name: stats.aggregate for name, stats in self._rules.items()}
to_thread_stats_dict
to_thread_stats_dict() -> dict[str, ThreadTimingStats]

Convert to thread stats dict for backward compatibility.

Source code in snakesee/state/rule_registry.py
def to_thread_stats_dict(self) -> dict[str, ThreadTimingStats]:
    """Convert to thread stats dict for backward compatibility."""
    with self._lock:
        result: dict[str, ThreadTimingStats] = {}
        for name, stats in self._rules.items():
            if stats.by_threads is not None:
                result[name] = stats.by_threads
        return result
to_wildcard_stats_dict
to_wildcard_stats_dict() -> dict[str, dict[str, WildcardTimingStats]]

Convert to wildcard stats dict for backward compatibility.

Source code in snakesee/state/rule_registry.py
def to_wildcard_stats_dict(self) -> dict[str, dict[str, WildcardTimingStats]]:
    """Convert to wildcard stats dict for backward compatibility."""
    with self._lock:
        result: dict[str, dict[str, WildcardTimingStats]] = {}
        for name, stats in self._rules.items():
            if stats.by_wildcard:
                result[name] = stats.by_wildcard
        return result
total_sample_count
total_sample_count() -> int

Get total number of samples across all rules.

Source code in snakesee/state/rule_registry.py
def total_sample_count(self) -> int:
    """Get total number of samples across all rules."""
    with self._lock:
        return sum(stats.aggregate.count for stats in self._rules.values())
typical_threads
typical_threads(rule: str) -> float

Get the typical thread count for a rule.

Parameters:

Name Type Description Default
rule str

Rule name.

required

Returns:

Type Description
float

Weighted average thread count, or 1.0 if no thread data.

Source code in snakesee/state/rule_registry.py
def typical_threads(self, rule: str) -> float:
    """Get the typical thread count for a rule.

    Args:
        rule: Rule name.

    Returns:
        Weighted average thread count, or 1.0 if no thread data.
    """
    with self._lock:
        stats = self._rules.get(rule)
        if stats is None:
            return 1.0
        return stats.typical_threads
RuleStatistics dataclass

Complete timing statistics for a single rule.

Combines aggregate, thread-level, and wildcard-level statistics in a single container.

Attributes:

Name Type Description
rule str

The rule name.

aggregate RuleTimingStats

Overall timing statistics for the rule.

by_threads ThreadTimingStats | None

Thread-specific timing statistics.

by_wildcard dict[str, WildcardTimingStats]

Wildcard-specific timing statistics.

by_combination dict[str, RuleTimingStats]

Full wildcard+threads combination statistics.

expected_count int | None

Expected number of jobs for this rule (if known).

Source code in snakesee/state/rule_registry.py
@dataclass
class RuleStatistics:
    """Complete timing statistics for a single rule.

    Combines aggregate, thread-level, and wildcard-level statistics
    in a single container.

    Attributes:
        rule: The rule name.
        aggregate: Overall timing statistics for the rule.
        by_threads: Thread-specific timing statistics.
        by_wildcard: Wildcard-specific timing statistics.
        by_combination: Full wildcard+threads combination statistics.
        expected_count: Expected number of jobs for this rule (if known).
    """

    rule: str
    aggregate: RuleTimingStats = field(default_factory=_make_empty_rule_stats)
    by_threads: ThreadTimingStats | None = None
    by_wildcard: dict[str, WildcardTimingStats] = field(default_factory=dict)
    by_combination: dict[str, RuleTimingStats] = field(default_factory=dict)
    expected_count: int | None = None

    def __post_init__(self) -> None:
        """Ensure aggregate has correct rule name."""
        from snakesee.models import RuleTimingStats

        if self.aggregate.rule != self.rule:
            # Create new RuleTimingStats with correct rule name
            self.aggregate = RuleTimingStats(
                rule=self.rule,
                durations=list(self.aggregate.durations),
                timestamps=list(self.aggregate.timestamps),
                input_sizes=list(self.aggregate.input_sizes),
            )

    @property
    def typical_threads(self) -> float:
        """Weighted average of thread counts from historical data.

        Uses the number of observations at each thread count as weights.
        Defaults to 1.0 if no thread data is available.
        """
        if self.by_threads is None or not self.by_threads.stats_by_threads:
            return 1.0

        total_weight = 0
        weighted_sum = 0.0
        for thread_count, stats in self.by_threads.stats_by_threads.items():
            n = stats.count
            weighted_sum += thread_count * n
            total_weight += n

        return weighted_sum / total_weight if total_weight > 0 else 1.0

    def record_completion(
        self,
        duration: float,
        timestamp: float,
        threads: int | None = None,
        wildcards: dict[str, str] | None = None,
        input_size: int | None = None,
    ) -> None:
        """Record a job completion.

        Args:
            duration: Job duration in seconds.
            timestamp: Completion timestamp.
            threads: Number of threads used.
            wildcards: Wildcard values for the job.
            input_size: Input file size in bytes.
        """
        from snakesee.models import RuleTimingStats
        from snakesee.models import ThreadTimingStats
        from snakesee.models import WildcardTimingStats

        # Update aggregate stats
        self.aggregate.durations.append(duration)
        self.aggregate.timestamps.append(timestamp)
        self.aggregate.input_sizes.append(input_size)

        # Update thread-specific stats
        if threads is not None:
            if self.by_threads is None:
                self.by_threads = ThreadTimingStats(rule=self.rule, stats_by_threads={})
            # Get or create stats for this thread count
            if threads not in self.by_threads.stats_by_threads:
                self.by_threads.stats_by_threads[threads] = RuleTimingStats(
                    rule=f"{self.rule}@{threads}t"
                )
            thread_stats = self.by_threads.stats_by_threads[threads]
            thread_stats.durations.append(duration)
            thread_stats.timestamps.append(timestamp)

        # Update wildcard-specific stats (for each wildcard key)
        if wildcards:
            for wc_key, wc_value in wildcards.items():
                if wc_key not in self.by_wildcard:
                    self.by_wildcard[wc_key] = WildcardTimingStats(
                        rule=self.rule,
                        wildcard_key=wc_key,
                        stats_by_value={},
                    )
                wts = self.by_wildcard[wc_key]
                # Access stats_by_value directly (get_stats_for_value has min sample check)
                if wc_value not in wts.stats_by_value:
                    wts.stats_by_value[wc_value] = RuleTimingStats(
                        rule=f"{self.rule}:{wc_key}={wc_value}"
                    )
                value_stats = wts.stats_by_value[wc_value]
                value_stats.durations.append(duration)
                value_stats.timestamps.append(timestamp)

        # Update combination stats (full wildcard+threads key)
        combo_key = _make_combination_key(wildcards, threads)
        if combo_key:
            if combo_key not in self.by_combination:
                self.by_combination[combo_key] = RuleTimingStats(rule=f"{self.rule}@{combo_key}")
            combo_stats = self.by_combination[combo_key]
            combo_stats.durations.append(duration)
            combo_stats.timestamps.append(timestamp)
Attributes
typical_threads property
typical_threads: float

Weighted average of thread counts from historical data.

Uses the number of observations at each thread count as weights. Defaults to 1.0 if no thread data is available.

Functions
__post_init__
__post_init__() -> None

Ensure aggregate has correct rule name.

Source code in snakesee/state/rule_registry.py
def __post_init__(self) -> None:
    """Ensure aggregate has correct rule name."""
    from snakesee.models import RuleTimingStats

    if self.aggregate.rule != self.rule:
        # Create new RuleTimingStats with correct rule name
        self.aggregate = RuleTimingStats(
            rule=self.rule,
            durations=list(self.aggregate.durations),
            timestamps=list(self.aggregate.timestamps),
            input_sizes=list(self.aggregate.input_sizes),
        )
record_completion
record_completion(duration: float, timestamp: float, threads: int | None = None, wildcards: dict[str, str] | None = None, input_size: int | None = None) -> None

Record a job completion.

Parameters:

Name Type Description Default
duration float

Job duration in seconds.

required
timestamp float

Completion timestamp.

required
threads int | None

Number of threads used.

None
wildcards dict[str, str] | None

Wildcard values for the job.

None
input_size int | None

Input file size in bytes.

None
Source code in snakesee/state/rule_registry.py
def record_completion(
    self,
    duration: float,
    timestamp: float,
    threads: int | None = None,
    wildcards: dict[str, str] | None = None,
    input_size: int | None = None,
) -> None:
    """Record a job completion.

    Args:
        duration: Job duration in seconds.
        timestamp: Completion timestamp.
        threads: Number of threads used.
        wildcards: Wildcard values for the job.
        input_size: Input file size in bytes.
    """
    from snakesee.models import RuleTimingStats
    from snakesee.models import ThreadTimingStats
    from snakesee.models import WildcardTimingStats

    # Update aggregate stats
    self.aggregate.durations.append(duration)
    self.aggregate.timestamps.append(timestamp)
    self.aggregate.input_sizes.append(input_size)

    # Update thread-specific stats
    if threads is not None:
        if self.by_threads is None:
            self.by_threads = ThreadTimingStats(rule=self.rule, stats_by_threads={})
        # Get or create stats for this thread count
        if threads not in self.by_threads.stats_by_threads:
            self.by_threads.stats_by_threads[threads] = RuleTimingStats(
                rule=f"{self.rule}@{threads}t"
            )
        thread_stats = self.by_threads.stats_by_threads[threads]
        thread_stats.durations.append(duration)
        thread_stats.timestamps.append(timestamp)

    # Update wildcard-specific stats (for each wildcard key)
    if wildcards:
        for wc_key, wc_value in wildcards.items():
            if wc_key not in self.by_wildcard:
                self.by_wildcard[wc_key] = WildcardTimingStats(
                    rule=self.rule,
                    wildcard_key=wc_key,
                    stats_by_value={},
                )
            wts = self.by_wildcard[wc_key]
            # Access stats_by_value directly (get_stats_for_value has min sample check)
            if wc_value not in wts.stats_by_value:
                wts.stats_by_value[wc_value] = RuleTimingStats(
                    rule=f"{self.rule}:{wc_key}={wc_value}"
                )
            value_stats = wts.stats_by_value[wc_value]
            value_stats.durations.append(duration)
            value_stats.timestamps.append(timestamp)

    # Update combination stats (full wildcard+threads key)
    combo_key = _make_combination_key(wildcards, threads)
    if combo_key:
        if combo_key not in self.by_combination:
            self.by_combination[combo_key] = RuleTimingStats(rule=f"{self.rule}@{combo_key}")
        combo_stats = self.by_combination[combo_key]
        combo_stats.durations.append(duration)
        combo_stats.timestamps.append(timestamp)

workflow_state

Unified workflow state container.

This module provides the top-level WorkflowState class that serves as the single source of truth for all workflow state, combining job tracking, rule statistics, paths, and configuration.

Classes

WorkflowState dataclass

Unified container for all workflow state.

This is the single source of truth for workflow state, consolidating what was previously scattered across IncrementalLogReader, WorkflowProgress, TUI, EventAccumulator, and TimeEstimator.

Attributes:

Name Type Description
paths WorkflowPaths

Centralized path resolution.

jobs JobRegistry

Registry of all job state.

rules RuleRegistry

Registry of all rule timing statistics.

clock Clock

Injectable time source.

config EstimationConfig

Estimation configuration.

status WorkflowStatus

Overall workflow status.

total_jobs int | None

Total number of jobs in workflow (if known).

start_time float | None

Workflow start timestamp.

current_log Path | None

Path to current log file being monitored.

Source code in snakesee/state/workflow_state.py
@dataclass
class WorkflowState:
    """Unified container for all workflow state.

    This is the single source of truth for workflow state, consolidating
    what was previously scattered across IncrementalLogReader, WorkflowProgress,
    TUI, EventAccumulator, and TimeEstimator.

    Attributes:
        paths: Centralized path resolution.
        jobs: Registry of all job state.
        rules: Registry of all rule timing statistics.
        clock: Injectable time source.
        config: Estimation configuration.
        status: Overall workflow status.
        total_jobs: Total number of jobs in workflow (if known).
        start_time: Workflow start timestamp.
        current_log: Path to current log file being monitored.
    """

    paths: WorkflowPaths
    jobs: JobRegistry
    rules: RuleRegistry
    clock: Clock
    config: EstimationConfig
    status: WorkflowStatus = field(default_factory=_default_status)
    total_jobs: int | None = None
    start_time: float | None = None
    current_log: Path | None = None

    @property
    def completed_count(self) -> int:
        """Number of completed jobs."""
        return len(self.jobs.completed())

    @property
    def failed_count(self) -> int:
        """Number of failed jobs."""
        return len(self.jobs.failed())

    @property
    def running_count(self) -> int:
        """Number of currently running jobs."""
        return len(self.jobs.running())

    @property
    def pending_count(self) -> int:
        """Number of pending jobs."""
        from snakesee.state.job_registry import JobStatus

        return len([j for j in self.jobs.all_jobs() if j.status == JobStatus.PENDING])

    @property
    def progress_fraction(self) -> float:
        """Fraction of jobs completed (0.0 to 1.0)."""
        if self.total_jobs is None or self.total_jobs == 0:
            return 0.0
        return self.completed_count / self.total_jobs

    @property
    def elapsed_seconds(self) -> float | None:
        """Elapsed time since workflow start."""
        if self.start_time is None:
            return None
        return self.clock.now() - self.start_time

    def update_status(self) -> None:
        """Update workflow status based on current state."""
        from snakesee.models import WorkflowStatus

        if self.total_jobs is None:
            # Unknown total - check if we have failures anyway
            if self.failed_count > 0:
                self.status = WorkflowStatus.FAILED
            elif self.running_count > 0:
                self.status = WorkflowStatus.RUNNING
            else:
                self.status = WorkflowStatus.UNKNOWN
        elif self.running_count > 0:
            self.status = WorkflowStatus.RUNNING
        elif self.failed_count > 0 and self.completed_count < self.total_jobs:
            self.status = WorkflowStatus.FAILED
        elif self.completed_count >= self.total_jobs:
            self.status = WorkflowStatus.COMPLETED
        elif self.completed_count == 0 and self.running_count == 0:
            self.status = WorkflowStatus.NOT_STARTED
        else:
            # Check for stale workflow
            if self.elapsed_seconds is not None:
                if self.elapsed_seconds > self.config.time.stale_workflow_threshold:
                    self.status = WorkflowStatus.STALE
                else:
                    self.status = WorkflowStatus.RUNNING
            else:
                # No start time - assume incomplete
                self.status = WorkflowStatus.INCOMPLETE

    def record_job_completion(self, job_key: str) -> None:
        """Record a job completion and update rule statistics.

        Args:
            job_key: Key of the completed job.
        """
        job = self.jobs.get(job_key)
        if job is not None and job.duration is not None:
            self.rules.record_job_completion(job)

    def to_progress(self) -> WorkflowProgress:
        """Convert to WorkflowProgress for backward compatibility.

        Returns:
            WorkflowProgress snapshot of current state.
        """
        from snakesee.models import WorkflowProgress

        return WorkflowProgress(
            workflow_dir=self.paths.workflow_dir,
            status=self.status,
            total_jobs=self.total_jobs or 0,
            completed_jobs=self.completed_count,
            running_jobs=[job.to_job_info() for job in self.jobs.running()],
            failed_jobs=self.failed_count,
            failed_jobs_list=[job.to_job_info() for job in self.jobs.failed()],
            pending_jobs_list=[job.to_job_info() for job in self.jobs.submitted()],
            start_time=self.start_time,
            log_file=self.current_log,
        )

    def clear(self) -> None:
        """Clear all state."""
        from snakesee.models import WorkflowStatus

        self.jobs.clear()
        self.rules.clear()
        self.status = WorkflowStatus.UNKNOWN
        self.total_jobs = None
        self.start_time = None
        self.current_log = None

    @classmethod
    def create(
        cls,
        workflow_dir: Path,
        clock: Clock | None = None,
        config: EstimationConfig | None = None,
    ) -> WorkflowState:
        """Create a new WorkflowState for a workflow directory.

        Args:
            workflow_dir: Path to the workflow directory.
            clock: Optional clock for time operations.
            config: Optional estimation configuration.

        Returns:
            New WorkflowState instance.
        """
        from snakesee.state.clock import get_clock
        from snakesee.state.config import DEFAULT_CONFIG
        from snakesee.state.job_registry import JobRegistry
        from snakesee.state.paths import WorkflowPaths
        from snakesee.state.rule_registry import RuleRegistry

        actual_clock = clock or get_clock()
        actual_config = config or DEFAULT_CONFIG

        return cls(
            paths=WorkflowPaths(workflow_dir=workflow_dir),
            jobs=JobRegistry(),
            rules=RuleRegistry(config=actual_config),
            clock=actual_clock,
            config=actual_config,
        )
Attributes
completed_count property
completed_count: int

Number of completed jobs.

elapsed_seconds property
elapsed_seconds: float | None

Elapsed time since workflow start.

failed_count property
failed_count: int

Number of failed jobs.

pending_count property
pending_count: int

Number of pending jobs.

progress_fraction property
progress_fraction: float

Fraction of jobs completed (0.0 to 1.0).

running_count property
running_count: int

Number of currently running jobs.

Functions
clear
clear() -> None

Clear all state.

Source code in snakesee/state/workflow_state.py
def clear(self) -> None:
    """Clear all state."""
    from snakesee.models import WorkflowStatus

    self.jobs.clear()
    self.rules.clear()
    self.status = WorkflowStatus.UNKNOWN
    self.total_jobs = None
    self.start_time = None
    self.current_log = None
create classmethod
create(workflow_dir: Path, clock: Clock | None = None, config: EstimationConfig | None = None) -> WorkflowState

Create a new WorkflowState for a workflow directory.

Parameters:

Name Type Description Default
workflow_dir Path

Path to the workflow directory.

required
clock Clock | None

Optional clock for time operations.

None
config EstimationConfig | None

Optional estimation configuration.

None

Returns:

Type Description
WorkflowState

New WorkflowState instance.

Source code in snakesee/state/workflow_state.py
@classmethod
def create(
    cls,
    workflow_dir: Path,
    clock: Clock | None = None,
    config: EstimationConfig | None = None,
) -> WorkflowState:
    """Create a new WorkflowState for a workflow directory.

    Args:
        workflow_dir: Path to the workflow directory.
        clock: Optional clock for time operations.
        config: Optional estimation configuration.

    Returns:
        New WorkflowState instance.
    """
    from snakesee.state.clock import get_clock
    from snakesee.state.config import DEFAULT_CONFIG
    from snakesee.state.job_registry import JobRegistry
    from snakesee.state.paths import WorkflowPaths
    from snakesee.state.rule_registry import RuleRegistry

    actual_clock = clock or get_clock()
    actual_config = config or DEFAULT_CONFIG

    return cls(
        paths=WorkflowPaths(workflow_dir=workflow_dir),
        jobs=JobRegistry(),
        rules=RuleRegistry(config=actual_config),
        clock=actual_clock,
        config=actual_config,
    )
record_job_completion
record_job_completion(job_key: str) -> None

Record a job completion and update rule statistics.

Parameters:

Name Type Description Default
job_key str

Key of the completed job.

required
Source code in snakesee/state/workflow_state.py
def record_job_completion(self, job_key: str) -> None:
    """Record a job completion and update rule statistics.

    Args:
        job_key: Key of the completed job.
    """
    job = self.jobs.get(job_key)
    if job is not None and job.duration is not None:
        self.rules.record_job_completion(job)
to_progress
to_progress() -> WorkflowProgress

Convert to WorkflowProgress for backward compatibility.

Returns:

Type Description
WorkflowProgress

WorkflowProgress snapshot of current state.

Source code in snakesee/state/workflow_state.py
def to_progress(self) -> WorkflowProgress:
    """Convert to WorkflowProgress for backward compatibility.

    Returns:
        WorkflowProgress snapshot of current state.
    """
    from snakesee.models import WorkflowProgress

    return WorkflowProgress(
        workflow_dir=self.paths.workflow_dir,
        status=self.status,
        total_jobs=self.total_jobs or 0,
        completed_jobs=self.completed_count,
        running_jobs=[job.to_job_info() for job in self.jobs.running()],
        failed_jobs=self.failed_count,
        failed_jobs_list=[job.to_job_info() for job in self.jobs.failed()],
        pending_jobs_list=[job.to_job_info() for job in self.jobs.submitted()],
        start_time=self.start_time,
        log_file=self.current_log,
    )
update_status
update_status() -> None

Update workflow status based on current state.

Source code in snakesee/state/workflow_state.py
def update_status(self) -> None:
    """Update workflow status based on current state."""
    from snakesee.models import WorkflowStatus

    if self.total_jobs is None:
        # Unknown total - check if we have failures anyway
        if self.failed_count > 0:
            self.status = WorkflowStatus.FAILED
        elif self.running_count > 0:
            self.status = WorkflowStatus.RUNNING
        else:
            self.status = WorkflowStatus.UNKNOWN
    elif self.running_count > 0:
        self.status = WorkflowStatus.RUNNING
    elif self.failed_count > 0 and self.completed_count < self.total_jobs:
        self.status = WorkflowStatus.FAILED
    elif self.completed_count >= self.total_jobs:
        self.status = WorkflowStatus.COMPLETED
    elif self.completed_count == 0 and self.running_count == 0:
        self.status = WorkflowStatus.NOT_STARTED
    else:
        # Check for stale workflow
        if self.elapsed_seconds is not None:
            if self.elapsed_seconds > self.config.time.stale_workflow_threshold:
                self.status = WorkflowStatus.STALE
            else:
                self.status = WorkflowStatus.RUNNING
        else:
            # No start time - assume incomplete
            self.status = WorkflowStatus.INCOMPLETE