Skip to content

rule_registry

Rule registry for centralized rule timing statistics.

This module provides a single source of truth for all rule timing data, consolidating aggregate, thread-level, and wildcard-level statistics.

Classes

RuleRegistry

Central registry for all rule timing statistics.

Consolidates what was previously scattered across TimeEstimator's rule_stats, thread_stats, and wildcard_stats dictionaries.

Example

registry = RuleRegistry() stats = registry.get_or_create("align") stats.record_completion(duration=100.0, timestamp=time.time(), threads=4) mean, var = registry.get_estimate("align", threads=4)

Source code in snakesee/state/rule_registry.py
class RuleRegistry:
    """Central registry for all rule timing statistics.

    Consolidates what was previously scattered across TimeEstimator's
    rule_stats, thread_stats, and wildcard_stats dictionaries.

    Example:
        >>> registry = RuleRegistry()
        >>> stats = registry.get_or_create("align")
        >>> stats.record_completion(duration=100.0, timestamp=time.time(), threads=4)
        >>> mean, var = registry.get_estimate("align", threads=4)
    """

    def __init__(self, config: EstimationConfig | None = None) -> None:
        """Initialize empty registry.

        Args:
            config: Optional estimation configuration.
        """
        from snakesee.state.config import DEFAULT_CONFIG

        self.config = config or DEFAULT_CONFIG
        self._lock = threading.RLock()
        self._rules: dict[str, RuleStatistics] = {}
        self._global_mean_cache: float | None = None
        self._cache_valid: bool = False

    def __len__(self) -> int:
        """Return number of rules in registry."""
        with self._lock:
            return len(self._rules)

    def __contains__(self, rule: str) -> bool:
        """Check if rule exists in registry."""
        with self._lock:
            return rule in self._rules

    def get(self, rule: str) -> RuleStatistics | None:
        """Get statistics for a rule."""
        with self._lock:
            return self._rules.get(rule)

    def get_or_create(self, rule: str) -> RuleStatistics:
        """Get existing statistics or create new ones.

        Args:
            rule: Rule name.

        Returns:
            RuleStatistics for the rule.
        """
        with self._lock:
            if rule not in self._rules:
                self._rules[rule] = RuleStatistics(rule=rule)
                self._cache_valid = False
            return self._rules[rule]

    def record_completion(
        self,
        rule: str,
        duration: float,
        timestamp: float,
        threads: int | None = None,
        wildcards: dict[str, str] | None = None,
        input_size: int | None = None,
    ) -> None:
        """Record a job completion.

        Args:
            rule: Rule name.
            duration: Job duration in seconds.
            timestamp: Completion timestamp.
            threads: Number of threads used.
            wildcards: Wildcard values.
            input_size: Input file size.
        """
        with self._lock:
            if rule not in self._rules:
                self._rules[rule] = RuleStatistics(rule=rule)
            stats = self._rules[rule]
            stats.record_completion(duration, timestamp, threads, wildcards, input_size)
            self._cache_valid = False

    def record_job_completion(self, job: Job) -> None:
        """Record completion from a Job object.

        Args:
            job: Completed job with timing data.
        """
        if job.duration is None:
            return

        # record_completion already acquires lock
        self.record_completion(
            rule=job.rule,
            duration=job.duration,
            timestamp=job.end_time or 0.0,
            threads=job.threads,
            wildcards=job.wildcards if job.wildcards else None,
            input_size=job.input_size,
        )

    def global_mean_duration(self) -> float:
        """Get global mean duration across all rules.

        Returns:
            Mean duration in seconds, or config default if no data.
        """
        with self._lock:
            if self._cache_valid and self._global_mean_cache is not None:
                return self._global_mean_cache

            all_durations: list[float] = []
            for stats in self._rules.values():
                all_durations.extend(stats.aggregate.durations)

            if all_durations:
                self._global_mean_cache = sum(all_durations) / len(all_durations)
            else:
                self._global_mean_cache = self.config.default_global_mean

            self._cache_valid = True
            return self._global_mean_cache

    def set_expected_counts(self, counts: dict[str, int]) -> None:
        """Set expected job counts for rules.

        Args:
            counts: Dictionary of rule name to expected count.
        """
        with self._lock:
            for rule, count in counts.items():
                if rule not in self._rules:
                    self._rules[rule] = RuleStatistics(rule=rule)
                self._rules[rule].expected_count = count

    def clear(self) -> None:
        """Clear all statistics."""
        with self._lock:
            self._rules.clear()
            self._cache_valid = False
            self._global_mean_cache = None

    # Backward compatibility methods

    def to_rule_stats_dict(self) -> dict[str, RuleTimingStats]:
        """Convert to dict for backward compatibility with TimeEstimator."""
        with self._lock:
            return {name: stats.aggregate for name, stats in self._rules.items()}

    def to_thread_stats_dict(self) -> dict[str, ThreadTimingStats]:
        """Convert to thread stats dict for backward compatibility."""
        with self._lock:
            result: dict[str, ThreadTimingStats] = {}
            for name, stats in self._rules.items():
                if stats.by_threads is not None:
                    result[name] = stats.by_threads
            return result

    def to_wildcard_stats_dict(self) -> dict[str, dict[str, WildcardTimingStats]]:
        """Convert to wildcard stats dict for backward compatibility."""
        with self._lock:
            result: dict[str, dict[str, WildcardTimingStats]] = {}
            for name, stats in self._rules.items():
                if stats.by_wildcard:
                    result[name] = stats.by_wildcard
            return result

    def load_from_rule_stats(
        self,
        rule_stats: dict[str, RuleTimingStats],
        thread_stats: dict[str, ThreadTimingStats] | None = None,
        wildcard_stats: dict[str, dict[str, WildcardTimingStats]] | None = None,
    ) -> None:
        """Load from existing stats dictionaries.

        Args:
            rule_stats: Aggregate rule timing stats.
            thread_stats: Thread-specific stats.
            wildcard_stats: Wildcard-specific stats.
        """
        with self._lock:
            for rule, stats in rule_stats.items():
                if rule not in self._rules:
                    self._rules[rule] = RuleStatistics(rule=rule)
                self._rules[rule].aggregate = stats

            if thread_stats:
                for rule, tstats in thread_stats.items():
                    if rule not in self._rules:
                        self._rules[rule] = RuleStatistics(rule=rule)
                    self._rules[rule].by_threads = tstats

            if wildcard_stats:
                for rule, wstats in wildcard_stats.items():
                    if rule not in self._rules:
                        self._rules[rule] = RuleStatistics(rule=rule)
                    self._rules[rule].by_wildcard = wstats

            self._cache_valid = False

    def all_rules(self) -> list[str]:
        """Get all rule names."""
        with self._lock:
            return list(self._rules.keys())

    def get_combination_stats(
        self,
        rule: str,
        wildcards: dict[str, str] | None,
        threads: int | None,
        min_samples: int | None = None,
    ) -> RuleTimingStats | None:
        """Get timing stats for a specific wildcard+threads combination.

        Args:
            rule: Rule name.
            wildcards: Wildcard values dict.
            threads: Thread count.
            min_samples: Minimum samples required. If None, uses MIN_SAMPLES_FOR_CONDITIONING.
                        Set to 1 to get stats even with a single historical run.

        Returns:
            RuleTimingStats for the combination, or None if not found or insufficient samples.
        """
        from snakesee.constants import MIN_SAMPLES_FOR_CONDITIONING

        if min_samples is None:
            min_samples = MIN_SAMPLES_FOR_CONDITIONING

        combo_key = _make_combination_key(wildcards, threads)
        if combo_key is None:
            return None

        with self._lock:
            stats = self._rules.get(rule)
            if stats is None:
                return None

            combo_stats = stats.by_combination.get(combo_key)
            if combo_stats is None:
                return None

            # Require minimum samples for conditioning
            if combo_stats.count < min_samples:
                return None

            return combo_stats

    def typical_threads(self, rule: str) -> float:
        """Get the typical thread count for a rule.

        Args:
            rule: Rule name.

        Returns:
            Weighted average thread count, or 1.0 if no thread data.
        """
        with self._lock:
            stats = self._rules.get(rule)
            if stats is None:
                return 1.0
            return stats.typical_threads

    def total_sample_count(self) -> int:
        """Get total number of samples across all rules."""
        with self._lock:
            return sum(stats.aggregate.count for stats in self._rules.values())

    def rule_count(self) -> int:
        """Get number of rules in the registry."""
        with self._lock:
            return len(self._rules)

Functions

__contains__
__contains__(rule: str) -> bool

Check if rule exists in registry.

Source code in snakesee/state/rule_registry.py
def __contains__(self, rule: str) -> bool:
    """Check if rule exists in registry."""
    with self._lock:
        return rule in self._rules
__init__
__init__(config: EstimationConfig | None = None) -> None

Initialize empty registry.

Parameters:

Name Type Description Default
config EstimationConfig | None

Optional estimation configuration.

None
Source code in snakesee/state/rule_registry.py
def __init__(self, config: EstimationConfig | None = None) -> None:
    """Initialize empty registry.

    Args:
        config: Optional estimation configuration.
    """
    from snakesee.state.config import DEFAULT_CONFIG

    self.config = config or DEFAULT_CONFIG
    self._lock = threading.RLock()
    self._rules: dict[str, RuleStatistics] = {}
    self._global_mean_cache: float | None = None
    self._cache_valid: bool = False
__len__
__len__() -> int

Return number of rules in registry.

Source code in snakesee/state/rule_registry.py
def __len__(self) -> int:
    """Return number of rules in registry."""
    with self._lock:
        return len(self._rules)
all_rules
all_rules() -> list[str]

Get all rule names.

Source code in snakesee/state/rule_registry.py
def all_rules(self) -> list[str]:
    """Get all rule names."""
    with self._lock:
        return list(self._rules.keys())
clear
clear() -> None

Clear all statistics.

Source code in snakesee/state/rule_registry.py
def clear(self) -> None:
    """Clear all statistics."""
    with self._lock:
        self._rules.clear()
        self._cache_valid = False
        self._global_mean_cache = None
get
get(rule: str) -> RuleStatistics | None

Get statistics for a rule.

Source code in snakesee/state/rule_registry.py
def get(self, rule: str) -> RuleStatistics | None:
    """Get statistics for a rule."""
    with self._lock:
        return self._rules.get(rule)
get_combination_stats
get_combination_stats(rule: str, wildcards: dict[str, str] | None, threads: int | None, min_samples: int | None = None) -> RuleTimingStats | None

Get timing stats for a specific wildcard+threads combination.

Parameters:

Name Type Description Default
rule str

Rule name.

required
wildcards dict[str, str] | None

Wildcard values dict.

required
threads int | None

Thread count.

required
min_samples int | None

Minimum samples required. If None, uses MIN_SAMPLES_FOR_CONDITIONING. Set to 1 to get stats even with a single historical run.

None

Returns:

Type Description
RuleTimingStats | None

RuleTimingStats for the combination, or None if not found or insufficient samples.

Source code in snakesee/state/rule_registry.py
def get_combination_stats(
    self,
    rule: str,
    wildcards: dict[str, str] | None,
    threads: int | None,
    min_samples: int | None = None,
) -> RuleTimingStats | None:
    """Get timing stats for a specific wildcard+threads combination.

    Args:
        rule: Rule name.
        wildcards: Wildcard values dict.
        threads: Thread count.
        min_samples: Minimum samples required. If None, uses MIN_SAMPLES_FOR_CONDITIONING.
                    Set to 1 to get stats even with a single historical run.

    Returns:
        RuleTimingStats for the combination, or None if not found or insufficient samples.
    """
    from snakesee.constants import MIN_SAMPLES_FOR_CONDITIONING

    if min_samples is None:
        min_samples = MIN_SAMPLES_FOR_CONDITIONING

    combo_key = _make_combination_key(wildcards, threads)
    if combo_key is None:
        return None

    with self._lock:
        stats = self._rules.get(rule)
        if stats is None:
            return None

        combo_stats = stats.by_combination.get(combo_key)
        if combo_stats is None:
            return None

        # Require minimum samples for conditioning
        if combo_stats.count < min_samples:
            return None

        return combo_stats
get_or_create
get_or_create(rule: str) -> RuleStatistics

Get existing statistics or create new ones.

Parameters:

Name Type Description Default
rule str

Rule name.

required

Returns:

Type Description
RuleStatistics

RuleStatistics for the rule.

Source code in snakesee/state/rule_registry.py
def get_or_create(self, rule: str) -> RuleStatistics:
    """Get existing statistics or create new ones.

    Args:
        rule: Rule name.

    Returns:
        RuleStatistics for the rule.
    """
    with self._lock:
        if rule not in self._rules:
            self._rules[rule] = RuleStatistics(rule=rule)
            self._cache_valid = False
        return self._rules[rule]
global_mean_duration
global_mean_duration() -> float

Get global mean duration across all rules.

Returns:

Type Description
float

Mean duration in seconds, or config default if no data.

Source code in snakesee/state/rule_registry.py
def global_mean_duration(self) -> float:
    """Get global mean duration across all rules.

    Returns:
        Mean duration in seconds, or config default if no data.
    """
    with self._lock:
        if self._cache_valid and self._global_mean_cache is not None:
            return self._global_mean_cache

        all_durations: list[float] = []
        for stats in self._rules.values():
            all_durations.extend(stats.aggregate.durations)

        if all_durations:
            self._global_mean_cache = sum(all_durations) / len(all_durations)
        else:
            self._global_mean_cache = self.config.default_global_mean

        self._cache_valid = True
        return self._global_mean_cache
load_from_rule_stats
load_from_rule_stats(rule_stats: dict[str, RuleTimingStats], thread_stats: dict[str, ThreadTimingStats] | None = None, wildcard_stats: dict[str, dict[str, WildcardTimingStats]] | None = None) -> None

Load from existing stats dictionaries.

Parameters:

Name Type Description Default
rule_stats dict[str, RuleTimingStats]

Aggregate rule timing stats.

required
thread_stats dict[str, ThreadTimingStats] | None

Thread-specific stats.

None
wildcard_stats dict[str, dict[str, WildcardTimingStats]] | None

Wildcard-specific stats.

None
Source code in snakesee/state/rule_registry.py
def load_from_rule_stats(
    self,
    rule_stats: dict[str, RuleTimingStats],
    thread_stats: dict[str, ThreadTimingStats] | None = None,
    wildcard_stats: dict[str, dict[str, WildcardTimingStats]] | None = None,
) -> None:
    """Load from existing stats dictionaries.

    Args:
        rule_stats: Aggregate rule timing stats.
        thread_stats: Thread-specific stats.
        wildcard_stats: Wildcard-specific stats.
    """
    with self._lock:
        for rule, stats in rule_stats.items():
            if rule not in self._rules:
                self._rules[rule] = RuleStatistics(rule=rule)
            self._rules[rule].aggregate = stats

        if thread_stats:
            for rule, tstats in thread_stats.items():
                if rule not in self._rules:
                    self._rules[rule] = RuleStatistics(rule=rule)
                self._rules[rule].by_threads = tstats

        if wildcard_stats:
            for rule, wstats in wildcard_stats.items():
                if rule not in self._rules:
                    self._rules[rule] = RuleStatistics(rule=rule)
                self._rules[rule].by_wildcard = wstats

        self._cache_valid = False
record_completion
record_completion(rule: str, duration: float, timestamp: float, threads: int | None = None, wildcards: dict[str, str] | None = None, input_size: int | None = None) -> None

Record a job completion.

Parameters:

Name Type Description Default
rule str

Rule name.

required
duration float

Job duration in seconds.

required
timestamp float

Completion timestamp.

required
threads int | None

Number of threads used.

None
wildcards dict[str, str] | None

Wildcard values.

None
input_size int | None

Input file size.

None
Source code in snakesee/state/rule_registry.py
def record_completion(
    self,
    rule: str,
    duration: float,
    timestamp: float,
    threads: int | None = None,
    wildcards: dict[str, str] | None = None,
    input_size: int | None = None,
) -> None:
    """Record a job completion.

    Args:
        rule: Rule name.
        duration: Job duration in seconds.
        timestamp: Completion timestamp.
        threads: Number of threads used.
        wildcards: Wildcard values.
        input_size: Input file size.
    """
    with self._lock:
        if rule not in self._rules:
            self._rules[rule] = RuleStatistics(rule=rule)
        stats = self._rules[rule]
        stats.record_completion(duration, timestamp, threads, wildcards, input_size)
        self._cache_valid = False
record_job_completion
record_job_completion(job: Job) -> None

Record completion from a Job object.

Parameters:

Name Type Description Default
job Job

Completed job with timing data.

required
Source code in snakesee/state/rule_registry.py
def record_job_completion(self, job: Job) -> None:
    """Record completion from a Job object.

    Args:
        job: Completed job with timing data.
    """
    if job.duration is None:
        return

    # record_completion already acquires lock
    self.record_completion(
        rule=job.rule,
        duration=job.duration,
        timestamp=job.end_time or 0.0,
        threads=job.threads,
        wildcards=job.wildcards if job.wildcards else None,
        input_size=job.input_size,
    )
rule_count
rule_count() -> int

Get number of rules in the registry.

Source code in snakesee/state/rule_registry.py
def rule_count(self) -> int:
    """Get number of rules in the registry."""
    with self._lock:
        return len(self._rules)
set_expected_counts
set_expected_counts(counts: dict[str, int]) -> None

Set expected job counts for rules.

Parameters:

Name Type Description Default
counts dict[str, int]

Dictionary of rule name to expected count.

required
Source code in snakesee/state/rule_registry.py
def set_expected_counts(self, counts: dict[str, int]) -> None:
    """Set expected job counts for rules.

    Args:
        counts: Dictionary of rule name to expected count.
    """
    with self._lock:
        for rule, count in counts.items():
            if rule not in self._rules:
                self._rules[rule] = RuleStatistics(rule=rule)
            self._rules[rule].expected_count = count
to_rule_stats_dict
to_rule_stats_dict() -> dict[str, RuleTimingStats]

Convert to dict for backward compatibility with TimeEstimator.

Source code in snakesee/state/rule_registry.py
def to_rule_stats_dict(self) -> dict[str, RuleTimingStats]:
    """Convert to dict for backward compatibility with TimeEstimator."""
    with self._lock:
        return {name: stats.aggregate for name, stats in self._rules.items()}
to_thread_stats_dict
to_thread_stats_dict() -> dict[str, ThreadTimingStats]

Convert to thread stats dict for backward compatibility.

Source code in snakesee/state/rule_registry.py
def to_thread_stats_dict(self) -> dict[str, ThreadTimingStats]:
    """Convert to thread stats dict for backward compatibility."""
    with self._lock:
        result: dict[str, ThreadTimingStats] = {}
        for name, stats in self._rules.items():
            if stats.by_threads is not None:
                result[name] = stats.by_threads
        return result
to_wildcard_stats_dict
to_wildcard_stats_dict() -> dict[str, dict[str, WildcardTimingStats]]

Convert to wildcard stats dict for backward compatibility.

Source code in snakesee/state/rule_registry.py
def to_wildcard_stats_dict(self) -> dict[str, dict[str, WildcardTimingStats]]:
    """Convert to wildcard stats dict for backward compatibility."""
    with self._lock:
        result: dict[str, dict[str, WildcardTimingStats]] = {}
        for name, stats in self._rules.items():
            if stats.by_wildcard:
                result[name] = stats.by_wildcard
        return result
total_sample_count
total_sample_count() -> int

Get total number of samples across all rules.

Source code in snakesee/state/rule_registry.py
def total_sample_count(self) -> int:
    """Get total number of samples across all rules."""
    with self._lock:
        return sum(stats.aggregate.count for stats in self._rules.values())
typical_threads
typical_threads(rule: str) -> float

Get the typical thread count for a rule.

Parameters:

Name Type Description Default
rule str

Rule name.

required

Returns:

Type Description
float

Weighted average thread count, or 1.0 if no thread data.

Source code in snakesee/state/rule_registry.py
def typical_threads(self, rule: str) -> float:
    """Get the typical thread count for a rule.

    Args:
        rule: Rule name.

    Returns:
        Weighted average thread count, or 1.0 if no thread data.
    """
    with self._lock:
        stats = self._rules.get(rule)
        if stats is None:
            return 1.0
        return stats.typical_threads

RuleStatistics dataclass

Complete timing statistics for a single rule.

Combines aggregate, thread-level, and wildcard-level statistics in a single container.

Attributes:

Name Type Description
rule str

The rule name.

aggregate RuleTimingStats

Overall timing statistics for the rule.

by_threads ThreadTimingStats | None

Thread-specific timing statistics.

by_wildcard dict[str, WildcardTimingStats]

Wildcard-specific timing statistics.

by_combination dict[str, RuleTimingStats]

Full wildcard+threads combination statistics.

expected_count int | None

Expected number of jobs for this rule (if known).

Source code in snakesee/state/rule_registry.py
@dataclass
class RuleStatistics:
    """Complete timing statistics for a single rule.

    Combines aggregate, thread-level, and wildcard-level statistics
    in a single container.

    Attributes:
        rule: The rule name.
        aggregate: Overall timing statistics for the rule.
        by_threads: Thread-specific timing statistics.
        by_wildcard: Wildcard-specific timing statistics.
        by_combination: Full wildcard+threads combination statistics.
        expected_count: Expected number of jobs for this rule (if known).
    """

    rule: str
    aggregate: RuleTimingStats = field(default_factory=_make_empty_rule_stats)
    by_threads: ThreadTimingStats | None = None
    by_wildcard: dict[str, WildcardTimingStats] = field(default_factory=dict)
    by_combination: dict[str, RuleTimingStats] = field(default_factory=dict)
    expected_count: int | None = None

    def __post_init__(self) -> None:
        """Ensure aggregate has correct rule name."""
        from snakesee.models import RuleTimingStats

        if self.aggregate.rule != self.rule:
            # Create new RuleTimingStats with correct rule name
            self.aggregate = RuleTimingStats(
                rule=self.rule,
                durations=list(self.aggregate.durations),
                timestamps=list(self.aggregate.timestamps),
                input_sizes=list(self.aggregate.input_sizes),
            )

    @property
    def typical_threads(self) -> float:
        """Weighted average of thread counts from historical data.

        Uses the number of observations at each thread count as weights.
        Defaults to 1.0 if no thread data is available.
        """
        if self.by_threads is None or not self.by_threads.stats_by_threads:
            return 1.0

        total_weight = 0
        weighted_sum = 0.0
        for thread_count, stats in self.by_threads.stats_by_threads.items():
            n = stats.count
            weighted_sum += thread_count * n
            total_weight += n

        return weighted_sum / total_weight if total_weight > 0 else 1.0

    def record_completion(
        self,
        duration: float,
        timestamp: float,
        threads: int | None = None,
        wildcards: dict[str, str] | None = None,
        input_size: int | None = None,
    ) -> None:
        """Record a job completion.

        Args:
            duration: Job duration in seconds.
            timestamp: Completion timestamp.
            threads: Number of threads used.
            wildcards: Wildcard values for the job.
            input_size: Input file size in bytes.
        """
        from snakesee.models import RuleTimingStats
        from snakesee.models import ThreadTimingStats
        from snakesee.models import WildcardTimingStats

        # Update aggregate stats
        self.aggregate.durations.append(duration)
        self.aggregate.timestamps.append(timestamp)
        self.aggregate.input_sizes.append(input_size)

        # Update thread-specific stats
        if threads is not None:
            if self.by_threads is None:
                self.by_threads = ThreadTimingStats(rule=self.rule, stats_by_threads={})
            # Get or create stats for this thread count
            if threads not in self.by_threads.stats_by_threads:
                self.by_threads.stats_by_threads[threads] = RuleTimingStats(
                    rule=f"{self.rule}@{threads}t"
                )
            thread_stats = self.by_threads.stats_by_threads[threads]
            thread_stats.durations.append(duration)
            thread_stats.timestamps.append(timestamp)

        # Update wildcard-specific stats (for each wildcard key)
        if wildcards:
            for wc_key, wc_value in wildcards.items():
                if wc_key not in self.by_wildcard:
                    self.by_wildcard[wc_key] = WildcardTimingStats(
                        rule=self.rule,
                        wildcard_key=wc_key,
                        stats_by_value={},
                    )
                wts = self.by_wildcard[wc_key]
                # Access stats_by_value directly (get_stats_for_value has min sample check)
                if wc_value not in wts.stats_by_value:
                    wts.stats_by_value[wc_value] = RuleTimingStats(
                        rule=f"{self.rule}:{wc_key}={wc_value}"
                    )
                value_stats = wts.stats_by_value[wc_value]
                value_stats.durations.append(duration)
                value_stats.timestamps.append(timestamp)

        # Update combination stats (full wildcard+threads key)
        combo_key = _make_combination_key(wildcards, threads)
        if combo_key:
            if combo_key not in self.by_combination:
                self.by_combination[combo_key] = RuleTimingStats(rule=f"{self.rule}@{combo_key}")
            combo_stats = self.by_combination[combo_key]
            combo_stats.durations.append(duration)
            combo_stats.timestamps.append(timestamp)

Attributes

typical_threads property
typical_threads: float

Weighted average of thread counts from historical data.

Uses the number of observations at each thread count as weights. Defaults to 1.0 if no thread data is available.

Functions

__post_init__
__post_init__() -> None

Ensure aggregate has correct rule name.

Source code in snakesee/state/rule_registry.py
def __post_init__(self) -> None:
    """Ensure aggregate has correct rule name."""
    from snakesee.models import RuleTimingStats

    if self.aggregate.rule != self.rule:
        # Create new RuleTimingStats with correct rule name
        self.aggregate = RuleTimingStats(
            rule=self.rule,
            durations=list(self.aggregate.durations),
            timestamps=list(self.aggregate.timestamps),
            input_sizes=list(self.aggregate.input_sizes),
        )
record_completion
record_completion(duration: float, timestamp: float, threads: int | None = None, wildcards: dict[str, str] | None = None, input_size: int | None = None) -> None

Record a job completion.

Parameters:

Name Type Description Default
duration float

Job duration in seconds.

required
timestamp float

Completion timestamp.

required
threads int | None

Number of threads used.

None
wildcards dict[str, str] | None

Wildcard values for the job.

None
input_size int | None

Input file size in bytes.

None
Source code in snakesee/state/rule_registry.py
def record_completion(
    self,
    duration: float,
    timestamp: float,
    threads: int | None = None,
    wildcards: dict[str, str] | None = None,
    input_size: int | None = None,
) -> None:
    """Record a job completion.

    Args:
        duration: Job duration in seconds.
        timestamp: Completion timestamp.
        threads: Number of threads used.
        wildcards: Wildcard values for the job.
        input_size: Input file size in bytes.
    """
    from snakesee.models import RuleTimingStats
    from snakesee.models import ThreadTimingStats
    from snakesee.models import WildcardTimingStats

    # Update aggregate stats
    self.aggregate.durations.append(duration)
    self.aggregate.timestamps.append(timestamp)
    self.aggregate.input_sizes.append(input_size)

    # Update thread-specific stats
    if threads is not None:
        if self.by_threads is None:
            self.by_threads = ThreadTimingStats(rule=self.rule, stats_by_threads={})
        # Get or create stats for this thread count
        if threads not in self.by_threads.stats_by_threads:
            self.by_threads.stats_by_threads[threads] = RuleTimingStats(
                rule=f"{self.rule}@{threads}t"
            )
        thread_stats = self.by_threads.stats_by_threads[threads]
        thread_stats.durations.append(duration)
        thread_stats.timestamps.append(timestamp)

    # Update wildcard-specific stats (for each wildcard key)
    if wildcards:
        for wc_key, wc_value in wildcards.items():
            if wc_key not in self.by_wildcard:
                self.by_wildcard[wc_key] = WildcardTimingStats(
                    rule=self.rule,
                    wildcard_key=wc_key,
                    stats_by_value={},
                )
            wts = self.by_wildcard[wc_key]
            # Access stats_by_value directly (get_stats_for_value has min sample check)
            if wc_value not in wts.stats_by_value:
                wts.stats_by_value[wc_value] = RuleTimingStats(
                    rule=f"{self.rule}:{wc_key}={wc_value}"
                )
            value_stats = wts.stats_by_value[wc_value]
            value_stats.durations.append(duration)
            value_stats.timestamps.append(timestamp)

    # Update combination stats (full wildcard+threads key)
    combo_key = _make_combination_key(wildcards, threads)
    if combo_key:
        if combo_key not in self.by_combination:
            self.by_combination[combo_key] = RuleTimingStats(rule=f"{self.rule}@{combo_key}")
        combo_stats = self.by_combination[combo_key]
        combo_stats.durations.append(duration)
        combo_stats.timestamps.append(timestamp)