Skip to content

estimation

Time estimation package for Snakemake workflow progress.

This package provides modular components for estimating remaining workflow time: - TimeEstimator: Main coordinator class - RuleMatchingEngine: Fuzzy rule matching by name/code hash - HistoricalDataLoader: Load timing data from metadata/events - PendingRuleInferrer: Infer pending job distribution

Classes

HistoricalDataLoader

Loads timing data from metadata and events files.

Provides methods to load historical execution data from: - .snakemake/metadata/ directory (from previous Snakemake runs) - .snakesee_events.jsonl file (from snakesee monitoring)

Source code in snakesee/estimation/data_loader.py
class HistoricalDataLoader:
    """Loads timing data from metadata and events files.

    Provides methods to load historical execution data from:
    - .snakemake/metadata/ directory (from previous Snakemake runs)
    - .snakesee_events.jsonl file (from snakesee monitoring)
    """

    def __init__(
        self,
        registry: "RuleRegistry",
        use_wildcard_conditioning: bool = False,
    ) -> None:
        """Initialize the loader.

        Args:
            registry: RuleRegistry to load data into.
            use_wildcard_conditioning: Whether to record wildcard-specific stats.
        """
        self._registry = registry
        self.use_wildcard_conditioning = use_wildcard_conditioning
        self.code_hash_to_rules: dict[str, set[str]] = {}

    def load_from_metadata(
        self,
        metadata_dir: Path,
        progress_callback: "ProgressCallback | None" = None,
    ) -> None:
        """Load historical execution times from .snakemake/metadata/.

        Uses a single-pass parser for efficiency - reads each metadata file
        only once to collect timing stats, code hashes, and wildcard stats.

        Args:
            metadata_dir: Path to .snakemake/metadata/ directory.
            progress_callback: Optional callback(current, total) for progress.
        """
        self._consume_metadata_records(parse_metadata_files_full(metadata_dir, progress_callback))

    def load_from_backend(
        self,
        backend: "PersistenceBackend",
        progress_callback: "ProgressCallback | None" = None,
    ) -> None:
        """Load historical execution times from a persistence backend.

        Works with both filesystem and SQLite backends via the
        PersistenceBackend protocol.

        Args:
            backend: Persistence backend to read from.
            progress_callback: Optional callback(current, total) for progress.
        """
        self._consume_metadata_records(backend.iterate_metadata(progress_callback))

    def _consume_metadata_records(
        self,
        records: "Iterator[MetadataRecord]",
    ) -> None:
        """Process metadata records into the registry and code hash map.

        Args:
            records: Iterator of MetadataRecord instances.
        """
        hash_to_rules: dict[str, set[str]] = {}

        for record in records:
            duration = record.duration
            end_time = record.end_time

            if duration is not None and end_time is not None:
                wildcards = record.wildcards if self.use_wildcard_conditioning else None
                self._registry.record_completion(
                    rule=record.rule,
                    duration=duration,
                    timestamp=end_time,
                    wildcards=wildcards,
                    input_size=record.input_size,
                )

            if record.code_hash:
                if record.code_hash not in hash_to_rules:
                    hash_to_rules[record.code_hash] = set()
                hash_to_rules[record.code_hash].add(record.rule)

        self.code_hash_to_rules = hash_to_rules

    def load_from_events(self, events_file: Path) -> bool:
        """Load historical execution times from a snakesee events file.

        Streams the events file line by line for memory efficiency.

        Args:
            events_file: Path to .snakesee_events.jsonl file.

        Returns:
            True if any wildcard data was found.
        """
        if not events_file.exists():
            return False

        has_wildcards = False

        try:
            with open(events_file, "r", encoding="utf-8") as f:
                for line in f:
                    if not line.strip():
                        continue

                    # Skip excessively long lines to prevent memory issues
                    if len(line) > MAX_EVENTS_LINE_LENGTH:
                        logger.debug(
                            "Skipping oversized line in events file: %d bytes (max %d)",
                            len(line),
                            MAX_EVENTS_LINE_LENGTH,
                        )
                        continue

                    try:
                        event = orjson.loads(line)
                    except orjson.JSONDecodeError:
                        continue

                    if event.get("event_type") != "job_finished":
                        continue

                    duration = event.get("duration")
                    timestamp = event.get("timestamp")
                    rule_name = event.get("rule_name")
                    wildcards = event.get("wildcards")
                    threads = event.get("threads")

                    if duration is None or timestamp is None or rule_name is None:
                        continue

                    wc_dict = wildcards if isinstance(wildcards, dict) else None
                    threads_int = None
                    if threads is not None:
                        try:
                            candidate = int(threads)
                        except (TypeError, ValueError):
                            logger.debug(
                                "Ignoring invalid thread count in events file: %r",
                                threads,
                            )
                        else:
                            if candidate > 0:
                                threads_int = candidate
                    self._registry.record_completion(
                        rule=rule_name,
                        duration=duration,
                        timestamp=timestamp,
                        wildcards=wc_dict,
                        threads=threads_int,
                    )

                    if wc_dict:
                        has_wildcards = True

        except OSError as e:
            logger.warning("Error reading events file %s: %s", events_file, e)
            return False

        return has_wildcards

Functions

__init__
__init__(registry: RuleRegistry, use_wildcard_conditioning: bool = False) -> None

Initialize the loader.

Parameters:

Name Type Description Default
registry RuleRegistry

RuleRegistry to load data into.

required
use_wildcard_conditioning bool

Whether to record wildcard-specific stats.

False
Source code in snakesee/estimation/data_loader.py
def __init__(
    self,
    registry: "RuleRegistry",
    use_wildcard_conditioning: bool = False,
) -> None:
    """Initialize the loader.

    Args:
        registry: RuleRegistry to load data into.
        use_wildcard_conditioning: Whether to record wildcard-specific stats.
    """
    self._registry = registry
    self.use_wildcard_conditioning = use_wildcard_conditioning
    self.code_hash_to_rules: dict[str, set[str]] = {}
load_from_backend
load_from_backend(backend: PersistenceBackend, progress_callback: ProgressCallback | None = None) -> None

Load historical execution times from a persistence backend.

Works with both filesystem and SQLite backends via the PersistenceBackend protocol.

Parameters:

Name Type Description Default
backend PersistenceBackend

Persistence backend to read from.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress.

None
Source code in snakesee/estimation/data_loader.py
def load_from_backend(
    self,
    backend: "PersistenceBackend",
    progress_callback: "ProgressCallback | None" = None,
) -> None:
    """Load historical execution times from a persistence backend.

    Works with both filesystem and SQLite backends via the
    PersistenceBackend protocol.

    Args:
        backend: Persistence backend to read from.
        progress_callback: Optional callback(current, total) for progress.
    """
    self._consume_metadata_records(backend.iterate_metadata(progress_callback))
load_from_events
load_from_events(events_file: Path) -> bool

Load historical execution times from a snakesee events file.

Streams the events file line by line for memory efficiency.

Parameters:

Name Type Description Default
events_file Path

Path to .snakesee_events.jsonl file.

required

Returns:

Type Description
bool

True if any wildcard data was found.

Source code in snakesee/estimation/data_loader.py
def load_from_events(self, events_file: Path) -> bool:
    """Load historical execution times from a snakesee events file.

    Streams the events file line by line for memory efficiency.

    Args:
        events_file: Path to .snakesee_events.jsonl file.

    Returns:
        True if any wildcard data was found.
    """
    if not events_file.exists():
        return False

    has_wildcards = False

    try:
        with open(events_file, "r", encoding="utf-8") as f:
            for line in f:
                if not line.strip():
                    continue

                # Skip excessively long lines to prevent memory issues
                if len(line) > MAX_EVENTS_LINE_LENGTH:
                    logger.debug(
                        "Skipping oversized line in events file: %d bytes (max %d)",
                        len(line),
                        MAX_EVENTS_LINE_LENGTH,
                    )
                    continue

                try:
                    event = orjson.loads(line)
                except orjson.JSONDecodeError:
                    continue

                if event.get("event_type") != "job_finished":
                    continue

                duration = event.get("duration")
                timestamp = event.get("timestamp")
                rule_name = event.get("rule_name")
                wildcards = event.get("wildcards")
                threads = event.get("threads")

                if duration is None or timestamp is None or rule_name is None:
                    continue

                wc_dict = wildcards if isinstance(wildcards, dict) else None
                threads_int = None
                if threads is not None:
                    try:
                        candidate = int(threads)
                    except (TypeError, ValueError):
                        logger.debug(
                            "Ignoring invalid thread count in events file: %r",
                            threads,
                        )
                    else:
                        if candidate > 0:
                            threads_int = candidate
                self._registry.record_completion(
                    rule=rule_name,
                    duration=duration,
                    timestamp=timestamp,
                    wildcards=wc_dict,
                    threads=threads_int,
                )

                if wc_dict:
                    has_wildcards = True

    except OSError as e:
        logger.warning("Error reading events file %s: %s", events_file, e)
        return False

    return has_wildcards
load_from_metadata
load_from_metadata(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> None

Load historical execution times from .snakemake/metadata/.

Uses a single-pass parser for efficiency - reads each metadata file only once to collect timing stats, code hashes, and wildcard stats.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress.

None
Source code in snakesee/estimation/data_loader.py
def load_from_metadata(
    self,
    metadata_dir: Path,
    progress_callback: "ProgressCallback | None" = None,
) -> None:
    """Load historical execution times from .snakemake/metadata/.

    Uses a single-pass parser for efficiency - reads each metadata file
    only once to collect timing stats, code hashes, and wildcard stats.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress.
    """
    self._consume_metadata_records(parse_metadata_files_full(metadata_dir, progress_callback))

PendingRuleInferrer

Infers the distribution of pending jobs by rule.

When we know the total pending count but not the breakdown by rule, this class infers the distribution based on: 1. Expected job counts (from Snakemake's Job stats table) if available 2. Proportional inference from completed job distribution otherwise

Source code in snakesee/estimation/pending_inferrer.py
class PendingRuleInferrer:
    """Infers the distribution of pending jobs by rule.

    When we know the total pending count but not the breakdown by rule,
    this class infers the distribution based on:
    1. Expected job counts (from Snakemake's Job stats table) if available
    2. Proportional inference from completed job distribution otherwise
    """

    def infer(
        self,
        completed_by_rule: dict[str, int],
        pending_count: int,
        expected_job_counts: dict[str, int] | None = None,
        current_rules: set[str] | None = None,
        running_by_rule: dict[str, int] | None = None,
    ) -> dict[str, int]:
        """Infer the distribution of pending jobs by rule.

        Args:
            completed_by_rule: Count of completed jobs per rule.
            pending_count: Total number of pending jobs.
            expected_job_counts: Expected counts from Job stats table (most accurate).
            current_rules: Set of rules in current workflow (filters deleted rules).
            running_by_rule: Count of running jobs per rule.

        Returns:
            Estimated count of pending jobs per rule.
        """
        if pending_count <= 0:
            return {}

        running_by_rule = running_by_rule or {}

        # Use exact calculation if we have expected job counts
        if expected_job_counts:
            return self._exact_calculation(
                expected_job_counts,
                completed_by_rule,
                running_by_rule,
            )

        # Fall back to proportional inference
        return self._proportional_inference(
            completed_by_rule,
            pending_count,
            current_rules,
        )

    def _exact_calculation(
        self,
        expected_job_counts: dict[str, int],
        completed_by_rule: dict[str, int],
        running_by_rule: dict[str, int],
    ) -> dict[str, int]:
        """Calculate pending using expected - completed - running."""
        pending_rules: dict[str, int] = {}

        for rule, expected in expected_job_counts.items():
            completed = completed_by_rule.get(rule, 0)
            running = running_by_rule.get(rule, 0)
            remaining = expected - completed - running
            if remaining > 0:
                pending_rules[rule] = remaining

        return pending_rules

    def _proportional_inference(
        self,
        completed_by_rule: dict[str, int],
        pending_count: int,
        current_rules: set[str] | None,
    ) -> dict[str, int]:
        """Infer pending distribution proportionally to completed jobs.

        Note: Due to rounding, the sum of returned values may not exactly
        equal pending_count. This is expected and the estimation handles
        this gracefully.
        """
        if not completed_by_rule:
            return {}

        # Filter out deleted rules if current_rules is provided
        if current_rules is not None:
            completed_by_rule = {r: c for r, c in completed_by_rule.items() if r in current_rules}

        total_completed = sum(completed_by_rule.values())
        if total_completed == 0:
            return {}

        pending_rules: dict[str, int] = {}
        for rule, count in completed_by_rule.items():
            proportion = count / total_completed
            estimated = round(pending_count * proportion)
            if estimated > 0:
                pending_rules[rule] = estimated

        return pending_rules

Functions

infer
infer(completed_by_rule: dict[str, int], pending_count: int, expected_job_counts: dict[str, int] | None = None, current_rules: set[str] | None = None, running_by_rule: dict[str, int] | None = None) -> dict[str, int]

Infer the distribution of pending jobs by rule.

Parameters:

Name Type Description Default
completed_by_rule dict[str, int]

Count of completed jobs per rule.

required
pending_count int

Total number of pending jobs.

required
expected_job_counts dict[str, int] | None

Expected counts from Job stats table (most accurate).

None
current_rules set[str] | None

Set of rules in current workflow (filters deleted rules).

None
running_by_rule dict[str, int] | None

Count of running jobs per rule.

None

Returns:

Type Description
dict[str, int]

Estimated count of pending jobs per rule.

Source code in snakesee/estimation/pending_inferrer.py
def infer(
    self,
    completed_by_rule: dict[str, int],
    pending_count: int,
    expected_job_counts: dict[str, int] | None = None,
    current_rules: set[str] | None = None,
    running_by_rule: dict[str, int] | None = None,
) -> dict[str, int]:
    """Infer the distribution of pending jobs by rule.

    Args:
        completed_by_rule: Count of completed jobs per rule.
        pending_count: Total number of pending jobs.
        expected_job_counts: Expected counts from Job stats table (most accurate).
        current_rules: Set of rules in current workflow (filters deleted rules).
        running_by_rule: Count of running jobs per rule.

    Returns:
        Estimated count of pending jobs per rule.
    """
    if pending_count <= 0:
        return {}

    running_by_rule = running_by_rule or {}

    # Use exact calculation if we have expected job counts
    if expected_job_counts:
        return self._exact_calculation(
            expected_job_counts,
            completed_by_rule,
            running_by_rule,
        )

    # Fall back to proportional inference
    return self._proportional_inference(
        completed_by_rule,
        pending_count,
        current_rules,
    )

RuleMatchingEngine

Matches rules by name similarity and code hash.

Used to find timing data for renamed rules or similar rules when exact matches aren't available.

Source code in snakesee/estimation/rule_matcher.py
class RuleMatchingEngine:
    """Matches rules by name similarity and code hash.

    Used to find timing data for renamed rules or similar rules
    when exact matches aren't available.
    """

    def __init__(self, max_distance: int = 3) -> None:
        """Initialize the matcher.

        Args:
            max_distance: Maximum Levenshtein distance for fuzzy matches.
        """
        self.max_distance = max_distance

    def find_by_code_hash(
        self,
        rule: str,
        code_hash_to_rules: dict[str, set[str]],
        known_rules: set[str],
    ) -> str | None:
        """Find a rule with matching code hash.

        When multiple rules share the same code hash and are in known_rules,
        returns the lexicographically smallest rule name for deterministic behavior.

        Args:
            rule: Rule name to look up.
            code_hash_to_rules: Mapping of code hashes to rule sets.
            known_rules: Set of rules with available stats.

        Returns:
            Name of matching rule (lexicographically smallest if multiple),
            or None if not found.
        """
        for _hash, rules in code_hash_to_rules.items():
            if rule in rules:
                # Find all candidate rules that match criteria
                candidates = {r for r in rules if r != rule and r in known_rules}
                if candidates:
                    # Return lexicographically smallest for deterministic selection
                    return min(candidates)
        return None

    def find_similar(
        self,
        rule: str,
        known_rules: set[str],
        max_distance: int | None = None,
    ) -> tuple[str, int] | None:
        """Find similar rule by Levenshtein distance.

        When multiple rules have the same distance, returns the lexicographically
        smallest one for deterministic behavior.

        Args:
            rule: Rule name to match.
            known_rules: Set of rules to search.
            max_distance: Maximum distance (uses instance default if None).

        Returns:
            Tuple of (matched_rule, distance), or None if no match.
        """
        if max_distance is None:
            max_distance = self.max_distance

        best_match: str | None = None
        best_distance = max_distance + 1

        for known_rule in known_rules:
            distance = levenshtein_distance(rule, known_rule)
            # Prefer lower distance, then lexicographically smaller name for ties
            if distance < best_distance or (
                distance == best_distance and best_match is not None and known_rule < best_match
            ):
                best_distance = distance
                best_match = known_rule

        if best_match is not None and best_distance <= max_distance:
            return best_match, best_distance

        return None

    def find_best_match(
        self,
        rule: str,
        code_hash_to_rules: dict[str, set[str]],
        rule_stats: dict[str, RuleTimingStats],
        max_distance: int | None = None,
    ) -> tuple[str, RuleTimingStats] | None:
        """Find the best matching rule using code hash then fuzzy matching.

        Args:
            rule: Rule name to match.
            code_hash_to_rules: Mapping of code hashes to rule sets.
            rule_stats: Available rule statistics.
            max_distance: Maximum Levenshtein distance.

        Returns:
            Tuple of (matched_rule, stats), or None if no match.
        """
        known_rules = set(rule_stats.keys())

        # Try code hash first (exact code = renamed rule)
        hash_match = self.find_by_code_hash(rule, code_hash_to_rules, known_rules)
        if hash_match is not None:
            return hash_match, rule_stats[hash_match]

        # Fall back to fuzzy name matching
        similar = self.find_similar(rule, known_rules, max_distance)
        if similar is not None:
            matched_rule, _distance = similar
            return matched_rule, rule_stats[matched_rule]

        return None

Functions

__init__
__init__(max_distance: int = 3) -> None

Initialize the matcher.

Parameters:

Name Type Description Default
max_distance int

Maximum Levenshtein distance for fuzzy matches.

3
Source code in snakesee/estimation/rule_matcher.py
def __init__(self, max_distance: int = 3) -> None:
    """Initialize the matcher.

    Args:
        max_distance: Maximum Levenshtein distance for fuzzy matches.
    """
    self.max_distance = max_distance
find_best_match
find_best_match(rule: str, code_hash_to_rules: dict[str, set[str]], rule_stats: dict[str, RuleTimingStats], max_distance: int | None = None) -> tuple[str, RuleTimingStats] | None

Find the best matching rule using code hash then fuzzy matching.

Parameters:

Name Type Description Default
rule str

Rule name to match.

required
code_hash_to_rules dict[str, set[str]]

Mapping of code hashes to rule sets.

required
rule_stats dict[str, RuleTimingStats]

Available rule statistics.

required
max_distance int | None

Maximum Levenshtein distance.

None

Returns:

Type Description
tuple[str, RuleTimingStats] | None

Tuple of (matched_rule, stats), or None if no match.

Source code in snakesee/estimation/rule_matcher.py
def find_best_match(
    self,
    rule: str,
    code_hash_to_rules: dict[str, set[str]],
    rule_stats: dict[str, RuleTimingStats],
    max_distance: int | None = None,
) -> tuple[str, RuleTimingStats] | None:
    """Find the best matching rule using code hash then fuzzy matching.

    Args:
        rule: Rule name to match.
        code_hash_to_rules: Mapping of code hashes to rule sets.
        rule_stats: Available rule statistics.
        max_distance: Maximum Levenshtein distance.

    Returns:
        Tuple of (matched_rule, stats), or None if no match.
    """
    known_rules = set(rule_stats.keys())

    # Try code hash first (exact code = renamed rule)
    hash_match = self.find_by_code_hash(rule, code_hash_to_rules, known_rules)
    if hash_match is not None:
        return hash_match, rule_stats[hash_match]

    # Fall back to fuzzy name matching
    similar = self.find_similar(rule, known_rules, max_distance)
    if similar is not None:
        matched_rule, _distance = similar
        return matched_rule, rule_stats[matched_rule]

    return None
find_by_code_hash
find_by_code_hash(rule: str, code_hash_to_rules: dict[str, set[str]], known_rules: set[str]) -> str | None

Find a rule with matching code hash.

When multiple rules share the same code hash and are in known_rules, returns the lexicographically smallest rule name for deterministic behavior.

Parameters:

Name Type Description Default
rule str

Rule name to look up.

required
code_hash_to_rules dict[str, set[str]]

Mapping of code hashes to rule sets.

required
known_rules set[str]

Set of rules with available stats.

required

Returns:

Type Description
str | None

Name of matching rule (lexicographically smallest if multiple),

str | None

or None if not found.

Source code in snakesee/estimation/rule_matcher.py
def find_by_code_hash(
    self,
    rule: str,
    code_hash_to_rules: dict[str, set[str]],
    known_rules: set[str],
) -> str | None:
    """Find a rule with matching code hash.

    When multiple rules share the same code hash and are in known_rules,
    returns the lexicographically smallest rule name for deterministic behavior.

    Args:
        rule: Rule name to look up.
        code_hash_to_rules: Mapping of code hashes to rule sets.
        known_rules: Set of rules with available stats.

    Returns:
        Name of matching rule (lexicographically smallest if multiple),
        or None if not found.
    """
    for _hash, rules in code_hash_to_rules.items():
        if rule in rules:
            # Find all candidate rules that match criteria
            candidates = {r for r in rules if r != rule and r in known_rules}
            if candidates:
                # Return lexicographically smallest for deterministic selection
                return min(candidates)
    return None
find_similar
find_similar(rule: str, known_rules: set[str], max_distance: int | None = None) -> tuple[str, int] | None

Find similar rule by Levenshtein distance.

When multiple rules have the same distance, returns the lexicographically smallest one for deterministic behavior.

Parameters:

Name Type Description Default
rule str

Rule name to match.

required
known_rules set[str]

Set of rules to search.

required
max_distance int | None

Maximum distance (uses instance default if None).

None

Returns:

Type Description
tuple[str, int] | None

Tuple of (matched_rule, distance), or None if no match.

Source code in snakesee/estimation/rule_matcher.py
def find_similar(
    self,
    rule: str,
    known_rules: set[str],
    max_distance: int | None = None,
) -> tuple[str, int] | None:
    """Find similar rule by Levenshtein distance.

    When multiple rules have the same distance, returns the lexicographically
    smallest one for deterministic behavior.

    Args:
        rule: Rule name to match.
        known_rules: Set of rules to search.
        max_distance: Maximum distance (uses instance default if None).

    Returns:
        Tuple of (matched_rule, distance), or None if no match.
    """
    if max_distance is None:
        max_distance = self.max_distance

    best_match: str | None = None
    best_distance = max_distance + 1

    for known_rule in known_rules:
        distance = levenshtein_distance(rule, known_rule)
        # Prefer lower distance, then lexicographically smaller name for ties
        if distance < best_distance or (
            distance == best_distance and best_match is not None and known_rule < best_match
        ):
            best_distance = distance
            best_match = known_rule

    if best_match is not None and best_distance <= max_distance:
        return best_match, best_distance

    return None

TimeEstimator

Estimates remaining workflow time from historical data.

Uses per-rule timing statistics from previous workflow runs to estimate how long the remaining jobs will take. Falls back to simple linear estimation when historical data is unavailable.

Attributes:

Name Type Description
rule_stats dict[str, RuleTimingStats]

Dictionary mapping rule names to their timing statistics.

thread_stats dict[str, ThreadTimingStats]

Dictionary mapping rule names to thread-conditioned timing stats.

wildcard_stats dict[str, dict[str, WildcardTimingStats]]

Nested dict of wildcard-conditioned timing stats.

use_wildcard_conditioning

Whether to use wildcard-specific estimates.

config

Centralized estimation configuration.

Source code in snakesee/estimation/estimator.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
class TimeEstimator:
    """
    Estimates remaining workflow time from historical data.

    Uses per-rule timing statistics from previous workflow runs to estimate
    how long the remaining jobs will take. Falls back to simple linear
    estimation when historical data is unavailable.

    Attributes:
        rule_stats: Dictionary mapping rule names to their timing statistics.
        thread_stats: Dictionary mapping rule names to thread-conditioned timing stats.
        wildcard_stats: Nested dict of wildcard-conditioned timing stats.
        use_wildcard_conditioning: Whether to use wildcard-specific estimates.
        config: Centralized estimation configuration.
    """

    def __init__(
        self,
        use_wildcard_conditioning: bool = False,
        half_life_days: float | None = None,
        weighting_strategy: WeightingStrategy | None = None,
        half_life_logs: int | None = None,
        config: EstimationConfig | None = None,
        rule_registry: "RuleRegistry | None" = None,
    ) -> None:
        """
        Initialize the estimator.

        Args:
            use_wildcard_conditioning: Whether to use wildcard-specific timing.
            half_life_days: Half-life in days for time-based weighting.
                           After this many days, a run's weight is halved.
                           Only used when weighting_strategy="time".
            weighting_strategy: Strategy for weighting historical data.
                              "time" - decay based on wall-clock time (good for stable pipelines)
                              "index" - decay based on run count (good for active development)
            half_life_logs: Half-life in log count for index-based weighting.
                           After this many runs, a run's weight is halved.
                           Only used when weighting_strategy="index".
            config: Centralized estimation configuration. If not provided, uses
                   DEFAULT_CONFIG with any explicit parameters overriding it.
            rule_registry: RuleRegistry for centralized statistics. If not provided,
                          creates an internal registry.
        """
        from snakesee.state.rule_registry import RuleRegistry

        # Use provided config or default
        self.config = config if config is not None else DEFAULT_CONFIG

        # Centralized registry - create internal one if not provided
        self._rule_registry: RuleRegistry = rule_registry or RuleRegistry(config=self.config)

        # Cache for global_mean_duration (invalidated when sample count or rule count changes)
        self._global_mean_cache: float | None = None
        self._global_mean_cache_sample_count: int = 0
        self._global_mean_cache_rule_count: int = 0

        self.current_rules: set[str] | None = None  # Rules in current workflow (for filtering)
        self.code_hash_to_rules: dict[str, set[str]] = {}  # For renamed rule detection
        self.expected_job_counts: dict[str, int] | None = None  # Expected counts from Job stats
        self.use_wildcard_conditioning = use_wildcard_conditioning
        self._wildcard_conditioning_explicit = use_wildcard_conditioning  # Track if explicitly set

        # Use explicit params if provided, otherwise use config values
        self.weighting_strategy: WeightingStrategy = (
            weighting_strategy if weighting_strategy is not None else self.config.weighting_strategy
        )
        self.half_life_days = (
            half_life_days if half_life_days is not None else self.config.half_life_days
        )
        self.half_life_logs = (
            half_life_logs if half_life_logs is not None else self.config.half_life_logs
        )

        self._max_observed_thread_sum: float = 0.0
        self._provided_cores: int | None = None

        # Helper components
        self._rule_matcher = RuleMatchingEngine(max_distance=self.config.fuzzy_match_max_distance)
        self._pending_inferrer = PendingRuleInferrer()
        self._data_loader: HistoricalDataLoader | None = None

    @property
    def rule_stats(self) -> dict[str, RuleTimingStats]:
        """Get rule stats dict from the registry.

        Returns a dict view for backward compatibility with code that reads rule_stats.
        """
        return self._rule_registry.to_rule_stats_dict()

    @rule_stats.setter
    def rule_stats(self, value: dict[str, RuleTimingStats]) -> None:
        """Set rule stats by loading into the registry.

        Supports backward compatibility with code that sets rule_stats directly.
        """
        self._rule_registry.load_from_rule_stats(value)
        # Invalidate cache when rule_stats is set directly
        self._global_mean_cache = None

    @property
    def thread_stats(self) -> dict[str, ThreadTimingStats]:
        """Get thread stats dict from the registry."""
        return self._rule_registry.to_thread_stats_dict()

    @property
    def wildcard_stats(self) -> dict[str, dict[str, WildcardTimingStats]]:
        """Get wildcard stats dict from the registry."""
        return self._rule_registry.to_wildcard_stats_dict()

    def _get_data_loader(self) -> HistoricalDataLoader:
        """Get or create the data loader."""
        if self._data_loader is None:
            self._data_loader = HistoricalDataLoader(
                registry=self._rule_registry,
                use_wildcard_conditioning=self.use_wildcard_conditioning,
            )
        return self._data_loader

    def load_from_metadata(
        self,
        metadata_dir: Path,
        progress_callback: "ProgressCallback | None" = None,
    ) -> None:
        """
        Load historical execution times from .snakemake/metadata/.

        Uses a single-pass parser for efficiency - reads each metadata file
        only once to collect timing stats, code hashes, and wildcard stats.
        Data is recorded directly into the RuleRegistry.

        Args:
            metadata_dir: Path to .snakemake/metadata/ directory.
            progress_callback: Optional callback(current, total) for progress reporting.
        """
        loader = self._get_data_loader()
        loader.load_from_metadata(metadata_dir, progress_callback)
        self.code_hash_to_rules = loader.code_hash_to_rules

    def load_from_backend(
        self,
        backend: "PersistenceBackend",
        progress_callback: "ProgressCallback | None" = None,
    ) -> None:
        """Load historical execution times from a persistence backend.

        Supports both filesystem and SQLite backends via the PersistenceBackend
        protocol. Delegates to HistoricalDataLoader.load_from_backend().

        Args:
            backend: Persistence backend to read metadata from.
            progress_callback: Optional callback(current, total) for progress reporting.
        """
        loader = self._get_data_loader()
        loader.load_from_backend(backend, progress_callback)
        self.code_hash_to_rules = loader.code_hash_to_rules

    def load_from_events(self, events_file: Path) -> None:
        """
        Load historical execution times from a snakesee events file.

        Parses the .snakesee_events.jsonl file to extract job durations from
        job_finished events. Data is recorded directly into the RuleRegistry.

        Args:
            events_file: Path to .snakesee_events.jsonl file.
        """
        loader = self._get_data_loader()
        has_wildcards = loader.load_from_events(events_file)

        # Auto-enable wildcard conditioning if we have wildcard data,
        # but only if user didn't explicitly set it to False
        if has_wildcards and not self._wildcard_conditioning_explicit:
            self.use_wildcard_conditioning = True

    def _find_similar_rule(
        self, rule: str, max_distance: int | None = None
    ) -> tuple[str, RuleTimingStats] | None:
        """
        Find the most similar known rule using code hash and Levenshtein distance.

        First checks if any known rule shares the same code hash (renamed rule).
        Then falls back to Levenshtein distance for similar names.

        Args:
            rule: The unknown rule name to match.
            max_distance: Maximum edit distance to consider a match.
                          Defaults to config.fuzzy_match_max_distance.

        Returns:
            Tuple of (matched_rule_name, stats) if a similar rule is found,
            None otherwise.
        """
        effective_stats = self.rule_stats
        if not effective_stats:
            return None

        return self._rule_matcher.find_best_match(
            rule=rule,
            code_hash_to_rules=self.code_hash_to_rules,
            rule_stats=effective_stats,
            max_distance=max_distance,
        )

    def _get_weighted_mean(self, stats: RuleTimingStats) -> float:
        """Get weighted mean using configured strategy."""
        return stats.weighted_mean(
            half_life_days=self.half_life_days,
            strategy=self.weighting_strategy,
            half_life_logs=self.half_life_logs,
        )

    def _get_size_scaled_estimate(
        self, stats: RuleTimingStats, input_size: int
    ) -> tuple[float, float]:
        """Get size-scaled estimate using configured strategy."""
        return stats.size_scaled_estimate(
            input_size=input_size,
            half_life_days=self.half_life_days,
            strategy=self.weighting_strategy,
            half_life_logs=self.half_life_logs,
        )

    def _get_recency_factor(self, stats: RuleTimingStats) -> float:
        """Get recency factor using configured strategy."""
        return stats.recency_factor(
            half_life_days=self.half_life_days,
            strategy=self.weighting_strategy,
            half_life_logs=self.half_life_logs,
        )

    def _try_combination_estimate(
        self, rule: str, wildcards: dict[str, str] | None, threads: int | None
    ) -> tuple[float, float] | None:
        """Try to get estimate from full wildcard+threads combination stats.

        This provides the most precise estimate when we have historical data
        for the exact same combination of wildcards and threads.

        Uses a two-tier approach:
        1. First try with standard MIN_SAMPLES threshold (3+) for high confidence
        2. Fall back to any historical data (1+ samples) with higher variance

        Args:
            rule: The rule name.
            wildcards: Wildcard values for the job.
            threads: Thread count for the job.

        Returns:
            Tuple of (expected_duration, variance) if combination stats available, None otherwise.
        """
        # First try with standard threshold for high-confidence estimate
        combo_stats = self._rule_registry.get_combination_stats(rule, wildcards, threads)

        if combo_stats is None:
            # Fall back to any historical data (even 1-2 samples)
            # This is better than using the rule average which may be completely wrong
            combo_stats = self._rule_registry.get_combination_stats(
                rule, wildcards, threads, min_samples=1
            )
            if combo_stats is None:
                return None

            # Use higher variance for low-sample estimates
            combo_mean = self._get_weighted_mean(combo_stats)
            # With 1-2 samples, use 50% variance multiplier (wider bounds)
            combo_var = (combo_mean * 0.5) ** 2
            return combo_mean, combo_var

        combo_mean = self._get_weighted_mean(combo_stats)
        # Tight variance for high-confidence combination match (3+ samples)
        combo_var = (
            combo_stats.std_dev**2
            if combo_stats.count > 1
            else (combo_mean * self.config.variance.exact_wildcard_match) ** 2
        )
        return combo_mean, combo_var

    def _try_thread_estimate(self, rule: str, threads: int) -> tuple[float, float] | None:
        """Try to get estimate from thread-specific stats.

        Args:
            rule: The rule name.
            threads: Thread count for the job.

        Returns:
            Tuple of (expected_duration, variance) if thread stats available, None otherwise.
        """
        effective_thread_stats = self.thread_stats
        if rule not in effective_thread_stats:
            return None

        thread_stats, matched_threads = effective_thread_stats[rule].get_best_match(threads)
        if thread_stats is None:
            return None

        thread_mean = self._get_weighted_mean(thread_stats)
        var_mult = self.config.variance

        # Tighter variance for exact thread match, wider for aggregate fallback
        if matched_threads is not None:
            thread_var = (
                thread_stats.std_dev**2
                if thread_stats.count > 1
                else (thread_mean * var_mult.exact_thread_match) ** 2
            )
        else:
            # Aggregate fallback - use standard variance
            thread_var = (
                thread_stats.std_dev**2
                if thread_stats.count > 1
                else (thread_mean * var_mult.aggregate_fallback) ** 2
            )
        return thread_mean, thread_var

    def _try_wildcard_estimate(
        self, rule: str, wildcards: dict[str, str]
    ) -> tuple[float, float] | None:
        """Try to get estimate from wildcard-specific stats.

        Args:
            rule: The rule name.
            wildcards: Wildcard values for the job.

        Returns:
            Tuple of (expected_duration, variance) if wildcard stats available, None otherwise.
        """
        effective_wildcard_stats = self.wildcard_stats
        if rule not in effective_wildcard_stats:
            return None

        rule_wc_stats = effective_wildcard_stats[rule]

        # Find the most predictive wildcard key for this rule
        best_key = WildcardTimingStats.get_most_predictive_key(rule_wc_stats)

        if best_key is None or best_key not in wildcards:
            return None

        wc_value = wildcards[best_key]
        wts = rule_wc_stats[best_key]
        value_stats = wts.get_stats_for_value(wc_value)

        if value_stats is None:
            return None

        # Use wildcard-specific statistics
        rule_mean = self._get_weighted_mean(value_stats)
        rule_var = (
            value_stats.std_dev**2
            if value_stats.count > 1
            else (rule_mean * self.config.variance.exact_wildcard_match) ** 2
        )
        return rule_mean, rule_var

    def _try_rule_estimate(self, rule: str, input_size: int | None) -> tuple[float, float] | None:
        """Try to get estimate from rule-level stats.

        Args:
            rule: The rule name.
            input_size: Optional input file size in bytes for size-scaled estimates.

        Returns:
            Tuple of (expected_duration, variance) if rule stats available, None otherwise.
        """
        effective_stats = self.rule_stats
        if rule not in effective_stats:
            return None

        stats = effective_stats[rule]

        # Try size-scaled estimate if input size is available
        if input_size is not None and input_size > 0:
            scaled_est, confidence = self._get_size_scaled_estimate(stats, input_size)
            size_conf_threshold = self.config.confidence_thresholds.size_scaling_min
            if confidence > size_conf_threshold:
                # Reduce variance for size-scaled estimates
                rule_var = (
                    stats.std_dev**2 * (1 - confidence * 0.5)
                    if stats.count > 1
                    else (scaled_est * self.config.variance.size_scaled) ** 2
                )
                return scaled_est, rule_var

        # Standard rule-level estimate
        rule_mean = self._get_weighted_mean(stats)
        rule_var = (
            stats.std_dev**2
            if stats.count > 1
            else (rule_mean * self.config.variance.rule_fallback) ** 2
        )
        return rule_mean, rule_var

    def _try_fuzzy_match_estimate(self, rule: str) -> tuple[float, float] | None:
        """Try to get estimate from fuzzy-matched similar rules.

        Args:
            rule: The unknown rule name to match.

        Returns:
            Tuple of (expected_duration, variance) if similar rule found, None otherwise.
        """
        similar = self._find_similar_rule(rule)
        if similar is None:
            return None

        _matched_rule, stats = similar
        rule_mean = self._get_weighted_mean(stats)
        # Wider variance for fuzzy matches due to uncertainty
        rule_var = (
            stats.std_dev**2
            if stats.count > 1
            else (rule_mean * self.config.variance.fuzzy_match) ** 2
        )
        return rule_mean, rule_var

    def get_estimate_for_job(
        self,
        rule: str,
        wildcards: dict[str, str] | None = None,
        input_size: int | None = None,
        threads: int | None = None,
    ) -> tuple[float, float]:
        """
        Get expected duration and variance for a specific job.

        Uses a cascade of estimation strategies in priority order:
        1. Full combination stats (wildcards + threads together)
        2. Thread-specific stats
        3. Wildcard-specific stats (if enabled)
        4. Rule-level stats (with optional size scaling)
        5. Fuzzy matching for renamed/similar rules
        6. Global mean fallback

        Args:
            rule: The rule name.
            wildcards: Optional wildcard values for the job.
            input_size: Optional input file size in bytes for size-scaled estimates.
            threads: Optional thread count for thread-specific estimates.

        Returns:
            Tuple of (expected_duration, variance).  The duration is always
            wall-clock seconds, even when thread-specific stats are used.
            Callers that need thread-seconds should multiply by the job's
            thread count: ``duration * threads = thread_seconds``.
        """
        # Strategy 1: Full combination stats (wildcards + threads together)
        # This is the most precise when we have data for the exact combination
        if self.use_wildcard_conditioning and (wildcards or (threads is not None)):
            result = self._try_combination_estimate(rule, wildcards, threads)
            if result is not None:
                return result

        # Strategy 2: Thread-specific stats
        if threads is not None:
            result = self._try_thread_estimate(rule, threads)
            if result is not None:
                return result

        # Strategy 3: Wildcard-specific stats (if enabled)
        if self.use_wildcard_conditioning and wildcards:
            result = self._try_wildcard_estimate(rule, wildcards)
            if result is not None:
                return result

        # Strategy 4: Rule-level stats (with optional size scaling)
        result = self._try_rule_estimate(rule, input_size)
        if result is not None:
            return result

        # Strategy 5: Fuzzy matching for renamed/similar rules
        result = self._try_fuzzy_match_estimate(rule)
        if result is not None:
            return result

        # Strategy 6: Global mean fallback
        global_mean = self.global_mean_duration()
        return global_mean, (global_mean * self.config.variance.global_fallback) ** 2

    def global_mean_duration(self) -> float:
        """
        Get the global average duration across all known rules.

        Used as a fallback when a specific rule has no historical data.
        Result is cached and invalidated when sample count or rule count changes.

        Returns:
            Average duration in seconds, or config.default_global_mean if no data.
        """
        # Check cache validity using sample count and rule count as version indicators
        current_sample_count = self._rule_registry.total_sample_count()
        current_rule_count = self._rule_registry.rule_count()
        if (
            self._global_mean_cache is not None
            and current_sample_count == self._global_mean_cache_sample_count
            and current_rule_count == self._global_mean_cache_rule_count
        ):
            return self._global_mean_cache

        # Recalculate and cache
        effective_stats = self.rule_stats
        all_durations = [d for stats in effective_stats.values() for d in stats.durations]
        self._global_mean_cache = (
            mean(all_durations) if all_durations else self.config.default_global_mean
        )
        self._global_mean_cache_sample_count = current_sample_count
        self._global_mean_cache_rule_count = current_rule_count
        return self._global_mean_cache

    def estimate_remaining(
        self,
        progress: WorkflowProgress,
    ) -> TimeEstimate:
        """
        Estimate remaining time for a workflow.

        Uses one of several estimation strategies depending on available data:
        1. "weighted" - Uses per-rule historical timing with exponential weighting
        2. "simple" - Falls back to average time per completed step

        Args:
            progress: Current workflow progress state.

        Returns:
            TimeEstimate with expected time, confidence bounds, and method.
        """

        # Update peak observed thread sum from currently running jobs.
        # Only use explicitly declared threads (not historical defaults) so
        # the peak stays grounded in runtime data.
        if progress.running_jobs:
            current = sum(
                float(job.threads) if job.threads is not None else 1.0
                for job in progress.running_jobs
            )
            self._max_observed_thread_sum = max(self._max_observed_thread_sum, current)

        # Handle edge case: no jobs to do
        if progress.total_jobs == 0:
            return TimeEstimate(
                seconds_remaining=0.0,
                lower_bound=0.0,
                upper_bound=0.0,
                confidence=1.0,
                method="complete",
            )

        # Handle edge case: workflow complete
        if progress.completed_jobs >= progress.total_jobs:
            return TimeEstimate(
                seconds_remaining=0.0,
                lower_bound=0.0,
                upper_bound=0.0,
                confidence=1.0,
                method="complete",
            )

        # Handle edge case: nothing completed yet
        if progress.completed_jobs == 0:
            return self._estimate_no_progress(progress)

        # Try weighted estimation with historical data
        if self.rule_stats:
            return self._estimate_weighted(progress)

        # Fall back to simple estimation
        return self._estimate_simple(progress)

    def _estimate_no_progress(self, progress: WorkflowProgress) -> TimeEstimate:
        """
        Estimate when no jobs have completed yet.

        Uses thread-seconds / inferred-cores model for parallelism adjustment.

        Args:
            progress: Current workflow progress.

        Returns:
            TimeEstimate with very low confidence.
        """
        global_mean = self.global_mean_duration()

        # Compute total thread-seconds from known jobs when available.
        # When we have a job list we can use per-job thread counts;
        # otherwise we assume 1 thread per job (no thread info available).
        has_job_list = bool(progress.pending_jobs_list or progress.running_jobs)
        if has_job_list:
            total_thread_seconds = sum(
                global_mean * self._job_threads(job)
                for job in chain(progress.pending_jobs_list, progress.running_jobs)
            )
        else:
            total_thread_seconds = global_mean * progress.total_jobs

        cores = self.estimated_cores
        estimate = total_thread_seconds / cores

        return TimeEstimate(
            seconds_remaining=estimate,
            lower_bound=estimate * self.config.bootstrap_lower_multiplier,
            upper_bound=estimate * self.config.bootstrap_upper_multiplier,
            confidence=self.config.confidence_thresholds.bootstrap_confidence,
            method="bootstrap",
            inferred_cores=cores,
        )

    def _estimate_simple(self, progress: WorkflowProgress) -> TimeEstimate:
        """
        Simple linear estimation using thread-seconds when possible.

        When pending/running job lists are available, computes an observed
        thread-second rate from completed work (elapsed × cores) and
        estimates remaining thread-seconds from pending job threads.  This
        handles mixed-thread workloads better than plain job-count
        extrapolation.

        When no job lists are available, falls back to the classic
        elapsed/completed × remaining linear extrapolation.

        Args:
            progress: Current workflow progress.

        Returns:
            TimeEstimate using simple extrapolation.
        """
        elapsed = progress.elapsed_seconds
        if elapsed is None or elapsed <= 0:
            return self._estimate_no_progress(progress)

        if progress.completed_jobs <= 0:
            return self._estimate_no_progress(progress)

        cores = self.estimated_cores

        # When we have pending/running job lists, use thread-seconds model
        # (same pattern as _estimate_weighted and _estimate_no_progress):
        # compute total remaining thread-seconds, then divide by cores.
        has_job_list = bool(progress.pending_jobs_list or progress.running_jobs)
        if has_job_list:
            # Observed wall-clock rate per job → thread-seconds per job
            avg_wall_per_job = elapsed / progress.completed_jobs
            avg_thread_seconds_per_job = avg_wall_per_job * cores

            # Sum remaining thread-seconds across pending + running jobs
            total_thread_seconds = sum(
                avg_thread_seconds_per_job * self._job_threads(job)
                for job in chain(progress.pending_jobs_list, progress.running_jobs)
            )
            # Subtract elapsed thread-seconds for running jobs
            for job in progress.running_jobs:
                job_elapsed = job.elapsed or 0.0
                t = self._job_threads(job)
                total_thread_seconds = max(0.0, total_thread_seconds - job_elapsed * t)

            estimate = total_thread_seconds / cores
        else:
            # Classic linear extrapolation (no thread info available)
            avg_time_per_step = elapsed / progress.completed_jobs
            remaining_steps = progress.total_jobs - progress.completed_jobs
            estimate = avg_time_per_step * remaining_steps

        # Confidence grows with more completed jobs
        confidence = min(
            self.config.simple_estimate_confidence_cap,
            progress.completed_jobs / self.config.simple_estimate_jobs_divisor,
        )

        # Wider bounds for fewer samples
        uncertainty = max(0.3, 1.0 - (progress.completed_jobs / progress.total_jobs))

        return TimeEstimate(
            seconds_remaining=estimate,
            lower_bound=estimate * (1 - uncertainty),
            upper_bound=estimate * (1 + uncertainty * 2),
            confidence=confidence,
            method="simple",
            inferred_cores=cores,
        )

    def _job_threads(self, job: JobInfo) -> float:
        """Resolve effective thread count for a job.

        Uses the job's declared threads if available, otherwise falls back
        to the weighted-average thread count from historical data for the rule.
        """
        if job.threads is not None:
            return float(job.threads)
        return self._rule_registry.typical_threads(job.rule)

    def set_provided_cores(self, cores: int) -> None:
        """Set the Snakemake ``-j``/``--cores`` value parsed from the log.

        When available, this provides a definitive upper bound for
        parallelism rather than relying solely on peak observed thread sums.

        Args:
            cores: Resolved core count (integer) from ``Provided cores: N``
                log line.  Snakemake resolves ``--cores all`` to the machine's
                CPU count before logging, so this is always an integer.
        """
        self._provided_cores = cores if cores >= 1 else None

    @property
    def estimated_cores(self) -> float:
        """Inferred core count used to convert thread-seconds to wall-clock time.

        Priority:
        1. ``Provided cores: N`` parsed from the Snakemake log (definitive).
        2. Peak observed sum of running-job threads (approximation of ``-j``).
        3. Fallback to 1.0 when no data is available.

        Returns:
            Estimated core count (>= 1.0).
        """
        if self._provided_cores is not None:
            return max(1.0, float(self._provided_cores))
        return max(1.0, self._max_observed_thread_sum)

    def _calculate_confidence_scores(
        self,
        rule_completed: dict[str, int],
        effective_stats: dict[str, RuleTimingStats],
    ) -> tuple[float, float, float, float]:
        """Calculate confidence component scores.

        Args:
            rule_completed: Dict of rule name to completion count.
            effective_stats: Dict of rule name to timing stats.

        Returns:
            Tuple of (sample_confidence, avg_recency, avg_consistency, data_coverage).
        """
        data_coverage = len(rule_completed) / max(1, len(effective_stats))
        total_samples = sum(s.count for s in effective_stats.values())
        sample_confidence = min(1.0, total_samples / self.config.min_samples_for_confidence)

        # Calculate average recency and consistency across rules with data
        recency_scores: list[float] = []
        consistency_scores: list[float] = []
        for rule in rule_completed:
            if rule in effective_stats:
                stats = effective_stats[rule]
                recency_scores.append(self._get_recency_factor(stats))
                consistency_scores.append(stats.recent_consistency())

        avg_recency = sum(recency_scores) / len(recency_scores) if recency_scores else 0.5
        avg_consistency = (
            sum(consistency_scores) / len(consistency_scores) if consistency_scores else 0.5
        )

        return sample_confidence, avg_recency, avg_consistency, data_coverage

    def _estimate_weighted(
        self,
        progress: WorkflowProgress,
    ) -> TimeEstimate:
        """Weighted estimation using per-rule historical timing.

        Algorithm Overview:
        ------------------
        This method implements an Exponentially Weighted Moving Average (EWMA)
        approach for time estimation, which is well-suited for workflows that
        evolve over time (e.g., code changes, data size variations).

        Key Design Decisions:

        1. **Exponential Weighting**: Recent executions are weighted more heavily
           than older ones. This is based on the observation that:
           - Pipeline code changes frequently during development
           - Recent runs better reflect current performance characteristics
           - Older timing data may reflect outdated code or different conditions

           The decay is controlled by half-life parameters:
           - `half_life_logs`: After N runs, a run's weight is halved (index-based)
           - `half_life_days`: After N days, a run's weight is halved (time-based)

        2. **Median Input Size for Size Scaling**: When scaling estimates by input
           file size, we use the median historical input size as the reference
           point rather than the mean. This provides robustness to outliers
           (e.g., test files that are much smaller than production files).

        3. **Thread-Aware Core Estimation**: Rather than using a naive sqrt(parallelism)
           formula, we compute total thread-seconds for remaining work and divide by
           the inferred number of available cores. Cores are estimated from the peak
           observed sum of running job threads (approximating snakemake's -j value).

        4. **Confidence Scoring**: Confidence is a weighted combination of:
           - Sample size (more historical data = more confidence)
           - Recency (fresher data = more confidence)
           - Consistency (lower variance = more confidence)
           - Data coverage (more rules with data = more confidence)

        Args:
            progress: Current workflow progress.

        Returns:
            TimeEstimate using weighted historical data.

        References:
            - EWMA: Standard statistical technique for time series smoothing
            - Half-life decay: Common in recommendation systems and caching
        """
        # Get rule completion counts from recent completions
        rule_completed: dict[str, int] = {}
        for job in progress.recent_completions:
            rule_completed[job.rule] = rule_completed.get(job.rule, 0) + 1

        # Count running jobs by rule
        running_by_rule: dict[str, int] = {}
        for job in progress.running_jobs:
            running_by_rule[job.rule] = running_by_rule.get(job.rule, 0) + 1

        # Only augment with historical counts if we don't have expected_job_counts
        # (to avoid skewing proportional inference with historical execution counts)
        effective_stats = self.rule_stats
        if not self.expected_job_counts:
            for rule, stats in effective_stats.items():
                if rule not in rule_completed:
                    rule_completed[rule] = stats.count

        # Estimate pending rule distribution
        # If expected_job_counts is set, _infer_pending_rules will use exact calculation.
        # Use WorkflowProgress.pending_jobs so failed and incomplete jobs are not double-counted.
        pending = progress.pending_jobs
        pending_rules = self._infer_pending_rules(
            rule_completed, pending, self.current_rules, running_by_rule
        )

        # Calculate expected remaining thread-seconds and variance
        total_thread_seconds = 0.0
        total_variance = 0.0
        global_mean = self.global_mean_duration()

        # Use wildcard-conditioned estimates for pending jobs if we have a COMPLETE job list
        # If pending_jobs_list is incomplete, fall back to rule-count-based estimation
        pending_list_len = len(progress.pending_jobs_list) if progress.pending_jobs_list else 0
        use_pending_list = pending_list_len > 0 and pending_list_len >= pending

        if use_pending_list:
            # We have actual pending jobs with wildcards - use per-job estimates
            for job in progress.pending_jobs_list:
                expected, variance = self.get_estimate_for_job(
                    rule=job.rule,
                    wildcards=job.wildcards,
                    input_size=job.input_size,
                    threads=job.threads,
                )
                t = self._job_threads(job)
                total_thread_seconds += expected * t
                # Variance in seconds² → thread-seconds²: Var(t·T) = T²·Var(t)
                total_variance += variance * t * t
        else:
            # Fall back to rule-count-based estimation
            for rule, count in pending_rules.items():
                if rule in effective_stats:
                    stats = effective_stats[rule]
                    rule_mean = self._get_weighted_mean(stats)
                    rule_var = (
                        stats.std_dev**2
                        if stats.count > 1
                        else (rule_mean * self.config.variance.rule_fallback) ** 2
                    )
                else:
                    # Unknown rule: use global mean with higher uncertainty
                    rule_mean = global_mean
                    rule_var = (global_mean * self.config.variance.global_fallback) ** 2

                threads = self._rule_registry.typical_threads(rule)
                total_thread_seconds += count * rule_mean * threads
                total_variance += count * rule_var * threads * threads

        # Add time for running jobs - use wildcard-specific estimates when available
        for job in progress.running_jobs:
            # Use get_estimate_for_job which handles wildcard conditioning
            expected, variance = self.get_estimate_for_job(
                rule=job.rule,
                wildcards=job.wildcards,
                input_size=job.input_size,
                threads=job.threads,
            )

            # Use actual elapsed time if available
            elapsed = job.elapsed or 0.0
            remaining = max(0, expected - elapsed)
            t = self._job_threads(job)
            total_thread_seconds += remaining * t
            total_variance += variance * t * t

        # Convert thread-seconds to wall-clock time using inferred cores
        cores = self.estimated_cores
        effective_time = total_thread_seconds / cores

        # Calculate confidence bounds (variance scales with cores^2)
        fallback_std = effective_time * self.config.variance.rule_fallback
        std_dev = (math.sqrt(total_variance) / cores) if total_variance > 0 else fallback_std
        lower = max(0, effective_time - 2 * std_dev)
        upper = effective_time + 2 * std_dev

        # Calculate confidence from multiple factors
        sample_conf, avg_recency, avg_consistency, data_coverage = (
            self._calculate_confidence_scores(rule_completed, effective_stats)
        )
        weights = self.config.confidence_weights
        base_confidence = (
            weights.sample_size * sample_conf
            + weights.recency * avg_recency
            + weights.consistency * avg_consistency
            + weights.data_coverage * data_coverage
        )
        confidence = min(self.config.confidence_thresholds.max_confidence, base_confidence)

        return TimeEstimate(
            seconds_remaining=effective_time,
            lower_bound=lower,
            upper_bound=upper,
            confidence=confidence,
            method="weighted",
            inferred_cores=cores,
        )

    def _infer_pending_rules(
        self,
        completed_by_rule: dict[str, int],
        pending_count: int,
        current_rules: set[str] | None = None,
        running_by_rule: dict[str, int] | None = None,
    ) -> dict[str, int]:
        """
        Infer the composition of pending rules.

        If expected_job_counts is available (from Job stats table), uses exact
        calculation: pending = expected - completed - running.
        Otherwise falls back to proportional inference.

        Args:
            completed_by_rule: Count of completed jobs per rule.
            pending_count: Total number of pending jobs.
            current_rules: Optional set of rules that exist in the current workflow.
                If provided, only rules in this set will be included in inference.
                This filters out deleted rules from historical data.
            running_by_rule: Optional count of running jobs per rule.

        Returns:
            Estimated count of pending jobs per rule.
        """
        return self._pending_inferrer.infer(
            completed_by_rule=completed_by_rule,
            pending_count=pending_count,
            expected_job_counts=self.expected_job_counts,
            current_rules=current_rules,
            running_by_rule=running_by_rule,
        )

Attributes

estimated_cores property
estimated_cores: float

Inferred core count used to convert thread-seconds to wall-clock time.

Priority: 1. Provided cores: N parsed from the Snakemake log (definitive). 2. Peak observed sum of running-job threads (approximation of -j). 3. Fallback to 1.0 when no data is available.

Returns:

Type Description
float

Estimated core count (>= 1.0).

rule_stats property writable
rule_stats: dict[str, RuleTimingStats]

Get rule stats dict from the registry.

Returns a dict view for backward compatibility with code that reads rule_stats.

thread_stats property
thread_stats: dict[str, ThreadTimingStats]

Get thread stats dict from the registry.

wildcard_stats property
wildcard_stats: dict[str, dict[str, WildcardTimingStats]]

Get wildcard stats dict from the registry.

Functions

__init__
__init__(use_wildcard_conditioning: bool = False, half_life_days: float | None = None, weighting_strategy: WeightingStrategy | None = None, half_life_logs: int | None = None, config: EstimationConfig | None = None, rule_registry: RuleRegistry | None = None) -> None

Initialize the estimator.

Parameters:

Name Type Description Default
use_wildcard_conditioning bool

Whether to use wildcard-specific timing.

False
half_life_days float | None

Half-life in days for time-based weighting. After this many days, a run's weight is halved. Only used when weighting_strategy="time".

None
weighting_strategy WeightingStrategy | None

Strategy for weighting historical data. "time" - decay based on wall-clock time (good for stable pipelines) "index" - decay based on run count (good for active development)

None
half_life_logs int | None

Half-life in log count for index-based weighting. After this many runs, a run's weight is halved. Only used when weighting_strategy="index".

None
config EstimationConfig | None

Centralized estimation configuration. If not provided, uses DEFAULT_CONFIG with any explicit parameters overriding it.

None
rule_registry RuleRegistry | None

RuleRegistry for centralized statistics. If not provided, creates an internal registry.

None
Source code in snakesee/estimation/estimator.py
def __init__(
    self,
    use_wildcard_conditioning: bool = False,
    half_life_days: float | None = None,
    weighting_strategy: WeightingStrategy | None = None,
    half_life_logs: int | None = None,
    config: EstimationConfig | None = None,
    rule_registry: "RuleRegistry | None" = None,
) -> None:
    """
    Initialize the estimator.

    Args:
        use_wildcard_conditioning: Whether to use wildcard-specific timing.
        half_life_days: Half-life in days for time-based weighting.
                       After this many days, a run's weight is halved.
                       Only used when weighting_strategy="time".
        weighting_strategy: Strategy for weighting historical data.
                          "time" - decay based on wall-clock time (good for stable pipelines)
                          "index" - decay based on run count (good for active development)
        half_life_logs: Half-life in log count for index-based weighting.
                       After this many runs, a run's weight is halved.
                       Only used when weighting_strategy="index".
        config: Centralized estimation configuration. If not provided, uses
               DEFAULT_CONFIG with any explicit parameters overriding it.
        rule_registry: RuleRegistry for centralized statistics. If not provided,
                      creates an internal registry.
    """
    from snakesee.state.rule_registry import RuleRegistry

    # Use provided config or default
    self.config = config if config is not None else DEFAULT_CONFIG

    # Centralized registry - create internal one if not provided
    self._rule_registry: RuleRegistry = rule_registry or RuleRegistry(config=self.config)

    # Cache for global_mean_duration (invalidated when sample count or rule count changes)
    self._global_mean_cache: float | None = None
    self._global_mean_cache_sample_count: int = 0
    self._global_mean_cache_rule_count: int = 0

    self.current_rules: set[str] | None = None  # Rules in current workflow (for filtering)
    self.code_hash_to_rules: dict[str, set[str]] = {}  # For renamed rule detection
    self.expected_job_counts: dict[str, int] | None = None  # Expected counts from Job stats
    self.use_wildcard_conditioning = use_wildcard_conditioning
    self._wildcard_conditioning_explicit = use_wildcard_conditioning  # Track if explicitly set

    # Use explicit params if provided, otherwise use config values
    self.weighting_strategy: WeightingStrategy = (
        weighting_strategy if weighting_strategy is not None else self.config.weighting_strategy
    )
    self.half_life_days = (
        half_life_days if half_life_days is not None else self.config.half_life_days
    )
    self.half_life_logs = (
        half_life_logs if half_life_logs is not None else self.config.half_life_logs
    )

    self._max_observed_thread_sum: float = 0.0
    self._provided_cores: int | None = None

    # Helper components
    self._rule_matcher = RuleMatchingEngine(max_distance=self.config.fuzzy_match_max_distance)
    self._pending_inferrer = PendingRuleInferrer()
    self._data_loader: HistoricalDataLoader | None = None
estimate_remaining
estimate_remaining(progress: WorkflowProgress) -> TimeEstimate

Estimate remaining time for a workflow.

Uses one of several estimation strategies depending on available data: 1. "weighted" - Uses per-rule historical timing with exponential weighting 2. "simple" - Falls back to average time per completed step

Parameters:

Name Type Description Default
progress WorkflowProgress

Current workflow progress state.

required

Returns:

Type Description
TimeEstimate

TimeEstimate with expected time, confidence bounds, and method.

Source code in snakesee/estimation/estimator.py
def estimate_remaining(
    self,
    progress: WorkflowProgress,
) -> TimeEstimate:
    """
    Estimate remaining time for a workflow.

    Uses one of several estimation strategies depending on available data:
    1. "weighted" - Uses per-rule historical timing with exponential weighting
    2. "simple" - Falls back to average time per completed step

    Args:
        progress: Current workflow progress state.

    Returns:
        TimeEstimate with expected time, confidence bounds, and method.
    """

    # Update peak observed thread sum from currently running jobs.
    # Only use explicitly declared threads (not historical defaults) so
    # the peak stays grounded in runtime data.
    if progress.running_jobs:
        current = sum(
            float(job.threads) if job.threads is not None else 1.0
            for job in progress.running_jobs
        )
        self._max_observed_thread_sum = max(self._max_observed_thread_sum, current)

    # Handle edge case: no jobs to do
    if progress.total_jobs == 0:
        return TimeEstimate(
            seconds_remaining=0.0,
            lower_bound=0.0,
            upper_bound=0.0,
            confidence=1.0,
            method="complete",
        )

    # Handle edge case: workflow complete
    if progress.completed_jobs >= progress.total_jobs:
        return TimeEstimate(
            seconds_remaining=0.0,
            lower_bound=0.0,
            upper_bound=0.0,
            confidence=1.0,
            method="complete",
        )

    # Handle edge case: nothing completed yet
    if progress.completed_jobs == 0:
        return self._estimate_no_progress(progress)

    # Try weighted estimation with historical data
    if self.rule_stats:
        return self._estimate_weighted(progress)

    # Fall back to simple estimation
    return self._estimate_simple(progress)
get_estimate_for_job
get_estimate_for_job(rule: str, wildcards: dict[str, str] | None = None, input_size: int | None = None, threads: int | None = None) -> tuple[float, float]

Get expected duration and variance for a specific job.

Uses a cascade of estimation strategies in priority order: 1. Full combination stats (wildcards + threads together) 2. Thread-specific stats 3. Wildcard-specific stats (if enabled) 4. Rule-level stats (with optional size scaling) 5. Fuzzy matching for renamed/similar rules 6. Global mean fallback

Parameters:

Name Type Description Default
rule str

The rule name.

required
wildcards dict[str, str] | None

Optional wildcard values for the job.

None
input_size int | None

Optional input file size in bytes for size-scaled estimates.

None
threads int | None

Optional thread count for thread-specific estimates.

None

Returns:

Type Description
float

Tuple of (expected_duration, variance). The duration is always

float

wall-clock seconds, even when thread-specific stats are used.

tuple[float, float]

Callers that need thread-seconds should multiply by the job's

tuple[float, float]

thread count: duration * threads = thread_seconds.

Source code in snakesee/estimation/estimator.py
def get_estimate_for_job(
    self,
    rule: str,
    wildcards: dict[str, str] | None = None,
    input_size: int | None = None,
    threads: int | None = None,
) -> tuple[float, float]:
    """
    Get expected duration and variance for a specific job.

    Uses a cascade of estimation strategies in priority order:
    1. Full combination stats (wildcards + threads together)
    2. Thread-specific stats
    3. Wildcard-specific stats (if enabled)
    4. Rule-level stats (with optional size scaling)
    5. Fuzzy matching for renamed/similar rules
    6. Global mean fallback

    Args:
        rule: The rule name.
        wildcards: Optional wildcard values for the job.
        input_size: Optional input file size in bytes for size-scaled estimates.
        threads: Optional thread count for thread-specific estimates.

    Returns:
        Tuple of (expected_duration, variance).  The duration is always
        wall-clock seconds, even when thread-specific stats are used.
        Callers that need thread-seconds should multiply by the job's
        thread count: ``duration * threads = thread_seconds``.
    """
    # Strategy 1: Full combination stats (wildcards + threads together)
    # This is the most precise when we have data for the exact combination
    if self.use_wildcard_conditioning and (wildcards or (threads is not None)):
        result = self._try_combination_estimate(rule, wildcards, threads)
        if result is not None:
            return result

    # Strategy 2: Thread-specific stats
    if threads is not None:
        result = self._try_thread_estimate(rule, threads)
        if result is not None:
            return result

    # Strategy 3: Wildcard-specific stats (if enabled)
    if self.use_wildcard_conditioning and wildcards:
        result = self._try_wildcard_estimate(rule, wildcards)
        if result is not None:
            return result

    # Strategy 4: Rule-level stats (with optional size scaling)
    result = self._try_rule_estimate(rule, input_size)
    if result is not None:
        return result

    # Strategy 5: Fuzzy matching for renamed/similar rules
    result = self._try_fuzzy_match_estimate(rule)
    if result is not None:
        return result

    # Strategy 6: Global mean fallback
    global_mean = self.global_mean_duration()
    return global_mean, (global_mean * self.config.variance.global_fallback) ** 2
global_mean_duration
global_mean_duration() -> float

Get the global average duration across all known rules.

Used as a fallback when a specific rule has no historical data. Result is cached and invalidated when sample count or rule count changes.

Returns:

Type Description
float

Average duration in seconds, or config.default_global_mean if no data.

Source code in snakesee/estimation/estimator.py
def global_mean_duration(self) -> float:
    """
    Get the global average duration across all known rules.

    Used as a fallback when a specific rule has no historical data.
    Result is cached and invalidated when sample count or rule count changes.

    Returns:
        Average duration in seconds, or config.default_global_mean if no data.
    """
    # Check cache validity using sample count and rule count as version indicators
    current_sample_count = self._rule_registry.total_sample_count()
    current_rule_count = self._rule_registry.rule_count()
    if (
        self._global_mean_cache is not None
        and current_sample_count == self._global_mean_cache_sample_count
        and current_rule_count == self._global_mean_cache_rule_count
    ):
        return self._global_mean_cache

    # Recalculate and cache
    effective_stats = self.rule_stats
    all_durations = [d for stats in effective_stats.values() for d in stats.durations]
    self._global_mean_cache = (
        mean(all_durations) if all_durations else self.config.default_global_mean
    )
    self._global_mean_cache_sample_count = current_sample_count
    self._global_mean_cache_rule_count = current_rule_count
    return self._global_mean_cache
load_from_backend
load_from_backend(backend: PersistenceBackend, progress_callback: ProgressCallback | None = None) -> None

Load historical execution times from a persistence backend.

Supports both filesystem and SQLite backends via the PersistenceBackend protocol. Delegates to HistoricalDataLoader.load_from_backend().

Parameters:

Name Type Description Default
backend PersistenceBackend

Persistence backend to read metadata from.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None
Source code in snakesee/estimation/estimator.py
def load_from_backend(
    self,
    backend: "PersistenceBackend",
    progress_callback: "ProgressCallback | None" = None,
) -> None:
    """Load historical execution times from a persistence backend.

    Supports both filesystem and SQLite backends via the PersistenceBackend
    protocol. Delegates to HistoricalDataLoader.load_from_backend().

    Args:
        backend: Persistence backend to read metadata from.
        progress_callback: Optional callback(current, total) for progress reporting.
    """
    loader = self._get_data_loader()
    loader.load_from_backend(backend, progress_callback)
    self.code_hash_to_rules = loader.code_hash_to_rules
load_from_events
load_from_events(events_file: Path) -> None

Load historical execution times from a snakesee events file.

Parses the .snakesee_events.jsonl file to extract job durations from job_finished events. Data is recorded directly into the RuleRegistry.

Parameters:

Name Type Description Default
events_file Path

Path to .snakesee_events.jsonl file.

required
Source code in snakesee/estimation/estimator.py
def load_from_events(self, events_file: Path) -> None:
    """
    Load historical execution times from a snakesee events file.

    Parses the .snakesee_events.jsonl file to extract job durations from
    job_finished events. Data is recorded directly into the RuleRegistry.

    Args:
        events_file: Path to .snakesee_events.jsonl file.
    """
    loader = self._get_data_loader()
    has_wildcards = loader.load_from_events(events_file)

    # Auto-enable wildcard conditioning if we have wildcard data,
    # but only if user didn't explicitly set it to False
    if has_wildcards and not self._wildcard_conditioning_explicit:
        self.use_wildcard_conditioning = True
load_from_metadata
load_from_metadata(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> None

Load historical execution times from .snakemake/metadata/.

Uses a single-pass parser for efficiency - reads each metadata file only once to collect timing stats, code hashes, and wildcard stats. Data is recorded directly into the RuleRegistry.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None
Source code in snakesee/estimation/estimator.py
def load_from_metadata(
    self,
    metadata_dir: Path,
    progress_callback: "ProgressCallback | None" = None,
) -> None:
    """
    Load historical execution times from .snakemake/metadata/.

    Uses a single-pass parser for efficiency - reads each metadata file
    only once to collect timing stats, code hashes, and wildcard stats.
    Data is recorded directly into the RuleRegistry.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress reporting.
    """
    loader = self._get_data_loader()
    loader.load_from_metadata(metadata_dir, progress_callback)
    self.code_hash_to_rules = loader.code_hash_to_rules
set_provided_cores
set_provided_cores(cores: int) -> None

Set the Snakemake -j/--cores value parsed from the log.

When available, this provides a definitive upper bound for parallelism rather than relying solely on peak observed thread sums.

Parameters:

Name Type Description Default
cores int

Resolved core count (integer) from Provided cores: N log line. Snakemake resolves --cores all to the machine's CPU count before logging, so this is always an integer.

required
Source code in snakesee/estimation/estimator.py
def set_provided_cores(self, cores: int) -> None:
    """Set the Snakemake ``-j``/``--cores`` value parsed from the log.

    When available, this provides a definitive upper bound for
    parallelism rather than relying solely on peak observed thread sums.

    Args:
        cores: Resolved core count (integer) from ``Provided cores: N``
            log line.  Snakemake resolves ``--cores all`` to the machine's
            CPU count before logging, so this is always an integer.
    """
    self._provided_cores = cores if cores >= 1 else None

Functions

levenshtein_distance cached

levenshtein_distance(s1: str, s2: str) -> int

Calculate the Levenshtein distance between two strings.

Results are cached for efficiency when comparing the same rule names multiple times during estimation.

Parameters:

Name Type Description Default
s1 str

First string.

required
s2 str

Second string.

required

Returns:

Type Description
int

The minimum number of edits (insertions, deletions, substitutions)

int

needed to transform s1 into s2.

Source code in snakesee/estimation/rule_matcher.py
@functools.lru_cache(maxsize=256)
def levenshtein_distance(s1: str, s2: str) -> int:
    """Calculate the Levenshtein distance between two strings.

    Results are cached for efficiency when comparing the same rule names
    multiple times during estimation.

    Args:
        s1: First string.
        s2: Second string.

    Returns:
        The minimum number of edits (insertions, deletions, substitutions)
        needed to transform s1 into s2.
    """
    if len(s1) < len(s2):
        return levenshtein_distance(s2, s1)

    if len(s2) == 0:
        return len(s1)

    previous_row: list[int] = list(range(len(s2) + 1))
    for i, c1 in enumerate(s1):
        current_row = [i + 1]
        for j, c2 in enumerate(s2):
            insertions = previous_row[j + 1] + 1
            deletions = current_row[j] + 1
            substitutions = previous_row[j] + (c1 != c2)
            current_row.append(min(insertions, deletions, substitutions))
        previous_row = current_row

    return previous_row[-1]

Modules

data_loader

Historical data loading for time estimation.

Classes

HistoricalDataLoader

Loads timing data from metadata and events files.

Provides methods to load historical execution data from: - .snakemake/metadata/ directory (from previous Snakemake runs) - .snakesee_events.jsonl file (from snakesee monitoring)

Source code in snakesee/estimation/data_loader.py
class HistoricalDataLoader:
    """Loads timing data from metadata and events files.

    Provides methods to load historical execution data from:
    - .snakemake/metadata/ directory (from previous Snakemake runs)
    - .snakesee_events.jsonl file (from snakesee monitoring)
    """

    def __init__(
        self,
        registry: "RuleRegistry",
        use_wildcard_conditioning: bool = False,
    ) -> None:
        """Initialize the loader.

        Args:
            registry: RuleRegistry to load data into.
            use_wildcard_conditioning: Whether to record wildcard-specific stats.
        """
        self._registry = registry
        self.use_wildcard_conditioning = use_wildcard_conditioning
        self.code_hash_to_rules: dict[str, set[str]] = {}

    def load_from_metadata(
        self,
        metadata_dir: Path,
        progress_callback: "ProgressCallback | None" = None,
    ) -> None:
        """Load historical execution times from .snakemake/metadata/.

        Uses a single-pass parser for efficiency - reads each metadata file
        only once to collect timing stats, code hashes, and wildcard stats.

        Args:
            metadata_dir: Path to .snakemake/metadata/ directory.
            progress_callback: Optional callback(current, total) for progress.
        """
        self._consume_metadata_records(parse_metadata_files_full(metadata_dir, progress_callback))

    def load_from_backend(
        self,
        backend: "PersistenceBackend",
        progress_callback: "ProgressCallback | None" = None,
    ) -> None:
        """Load historical execution times from a persistence backend.

        Works with both filesystem and SQLite backends via the
        PersistenceBackend protocol.

        Args:
            backend: Persistence backend to read from.
            progress_callback: Optional callback(current, total) for progress.
        """
        self._consume_metadata_records(backend.iterate_metadata(progress_callback))

    def _consume_metadata_records(
        self,
        records: "Iterator[MetadataRecord]",
    ) -> None:
        """Process metadata records into the registry and code hash map.

        Args:
            records: Iterator of MetadataRecord instances.
        """
        hash_to_rules: dict[str, set[str]] = {}

        for record in records:
            duration = record.duration
            end_time = record.end_time

            if duration is not None and end_time is not None:
                wildcards = record.wildcards if self.use_wildcard_conditioning else None
                self._registry.record_completion(
                    rule=record.rule,
                    duration=duration,
                    timestamp=end_time,
                    wildcards=wildcards,
                    input_size=record.input_size,
                )

            if record.code_hash:
                if record.code_hash not in hash_to_rules:
                    hash_to_rules[record.code_hash] = set()
                hash_to_rules[record.code_hash].add(record.rule)

        self.code_hash_to_rules = hash_to_rules

    def load_from_events(self, events_file: Path) -> bool:
        """Load historical execution times from a snakesee events file.

        Streams the events file line by line for memory efficiency.

        Args:
            events_file: Path to .snakesee_events.jsonl file.

        Returns:
            True if any wildcard data was found.
        """
        if not events_file.exists():
            return False

        has_wildcards = False

        try:
            with open(events_file, "r", encoding="utf-8") as f:
                for line in f:
                    if not line.strip():
                        continue

                    # Skip excessively long lines to prevent memory issues
                    if len(line) > MAX_EVENTS_LINE_LENGTH:
                        logger.debug(
                            "Skipping oversized line in events file: %d bytes (max %d)",
                            len(line),
                            MAX_EVENTS_LINE_LENGTH,
                        )
                        continue

                    try:
                        event = orjson.loads(line)
                    except orjson.JSONDecodeError:
                        continue

                    if event.get("event_type") != "job_finished":
                        continue

                    duration = event.get("duration")
                    timestamp = event.get("timestamp")
                    rule_name = event.get("rule_name")
                    wildcards = event.get("wildcards")
                    threads = event.get("threads")

                    if duration is None or timestamp is None or rule_name is None:
                        continue

                    wc_dict = wildcards if isinstance(wildcards, dict) else None
                    threads_int = None
                    if threads is not None:
                        try:
                            candidate = int(threads)
                        except (TypeError, ValueError):
                            logger.debug(
                                "Ignoring invalid thread count in events file: %r",
                                threads,
                            )
                        else:
                            if candidate > 0:
                                threads_int = candidate
                    self._registry.record_completion(
                        rule=rule_name,
                        duration=duration,
                        timestamp=timestamp,
                        wildcards=wc_dict,
                        threads=threads_int,
                    )

                    if wc_dict:
                        has_wildcards = True

        except OSError as e:
            logger.warning("Error reading events file %s: %s", events_file, e)
            return False

        return has_wildcards
Functions
__init__
__init__(registry: RuleRegistry, use_wildcard_conditioning: bool = False) -> None

Initialize the loader.

Parameters:

Name Type Description Default
registry RuleRegistry

RuleRegistry to load data into.

required
use_wildcard_conditioning bool

Whether to record wildcard-specific stats.

False
Source code in snakesee/estimation/data_loader.py
def __init__(
    self,
    registry: "RuleRegistry",
    use_wildcard_conditioning: bool = False,
) -> None:
    """Initialize the loader.

    Args:
        registry: RuleRegistry to load data into.
        use_wildcard_conditioning: Whether to record wildcard-specific stats.
    """
    self._registry = registry
    self.use_wildcard_conditioning = use_wildcard_conditioning
    self.code_hash_to_rules: dict[str, set[str]] = {}
load_from_backend
load_from_backend(backend: PersistenceBackend, progress_callback: ProgressCallback | None = None) -> None

Load historical execution times from a persistence backend.

Works with both filesystem and SQLite backends via the PersistenceBackend protocol.

Parameters:

Name Type Description Default
backend PersistenceBackend

Persistence backend to read from.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress.

None
Source code in snakesee/estimation/data_loader.py
def load_from_backend(
    self,
    backend: "PersistenceBackend",
    progress_callback: "ProgressCallback | None" = None,
) -> None:
    """Load historical execution times from a persistence backend.

    Works with both filesystem and SQLite backends via the
    PersistenceBackend protocol.

    Args:
        backend: Persistence backend to read from.
        progress_callback: Optional callback(current, total) for progress.
    """
    self._consume_metadata_records(backend.iterate_metadata(progress_callback))
load_from_events
load_from_events(events_file: Path) -> bool

Load historical execution times from a snakesee events file.

Streams the events file line by line for memory efficiency.

Parameters:

Name Type Description Default
events_file Path

Path to .snakesee_events.jsonl file.

required

Returns:

Type Description
bool

True if any wildcard data was found.

Source code in snakesee/estimation/data_loader.py
def load_from_events(self, events_file: Path) -> bool:
    """Load historical execution times from a snakesee events file.

    Streams the events file line by line for memory efficiency.

    Args:
        events_file: Path to .snakesee_events.jsonl file.

    Returns:
        True if any wildcard data was found.
    """
    if not events_file.exists():
        return False

    has_wildcards = False

    try:
        with open(events_file, "r", encoding="utf-8") as f:
            for line in f:
                if not line.strip():
                    continue

                # Skip excessively long lines to prevent memory issues
                if len(line) > MAX_EVENTS_LINE_LENGTH:
                    logger.debug(
                        "Skipping oversized line in events file: %d bytes (max %d)",
                        len(line),
                        MAX_EVENTS_LINE_LENGTH,
                    )
                    continue

                try:
                    event = orjson.loads(line)
                except orjson.JSONDecodeError:
                    continue

                if event.get("event_type") != "job_finished":
                    continue

                duration = event.get("duration")
                timestamp = event.get("timestamp")
                rule_name = event.get("rule_name")
                wildcards = event.get("wildcards")
                threads = event.get("threads")

                if duration is None or timestamp is None or rule_name is None:
                    continue

                wc_dict = wildcards if isinstance(wildcards, dict) else None
                threads_int = None
                if threads is not None:
                    try:
                        candidate = int(threads)
                    except (TypeError, ValueError):
                        logger.debug(
                            "Ignoring invalid thread count in events file: %r",
                            threads,
                        )
                    else:
                        if candidate > 0:
                            threads_int = candidate
                self._registry.record_completion(
                    rule=rule_name,
                    duration=duration,
                    timestamp=timestamp,
                    wildcards=wc_dict,
                    threads=threads_int,
                )

                if wc_dict:
                    has_wildcards = True

    except OSError as e:
        logger.warning("Error reading events file %s: %s", events_file, e)
        return False

    return has_wildcards
load_from_metadata
load_from_metadata(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> None

Load historical execution times from .snakemake/metadata/.

Uses a single-pass parser for efficiency - reads each metadata file only once to collect timing stats, code hashes, and wildcard stats.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress.

None
Source code in snakesee/estimation/data_loader.py
def load_from_metadata(
    self,
    metadata_dir: Path,
    progress_callback: "ProgressCallback | None" = None,
) -> None:
    """Load historical execution times from .snakemake/metadata/.

    Uses a single-pass parser for efficiency - reads each metadata file
    only once to collect timing stats, code hashes, and wildcard stats.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress.
    """
    self._consume_metadata_records(parse_metadata_files_full(metadata_dir, progress_callback))

Functions

estimator

Time estimation for Snakemake workflow progress.

Note: The variance/confidence calculations in this module could potentially be consolidated with snakesee.variance.VarianceCalculator for DRY-ness. Both modules handle similar variance heuristics and confidence calculations. A future refactor could delegate these calculations to shared helpers in VarianceCalculator.

Classes

TimeEstimator

Estimates remaining workflow time from historical data.

Uses per-rule timing statistics from previous workflow runs to estimate how long the remaining jobs will take. Falls back to simple linear estimation when historical data is unavailable.

Attributes:

Name Type Description
rule_stats dict[str, RuleTimingStats]

Dictionary mapping rule names to their timing statistics.

thread_stats dict[str, ThreadTimingStats]

Dictionary mapping rule names to thread-conditioned timing stats.

wildcard_stats dict[str, dict[str, WildcardTimingStats]]

Nested dict of wildcard-conditioned timing stats.

use_wildcard_conditioning

Whether to use wildcard-specific estimates.

config

Centralized estimation configuration.

Source code in snakesee/estimation/estimator.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
class TimeEstimator:
    """
    Estimates remaining workflow time from historical data.

    Uses per-rule timing statistics from previous workflow runs to estimate
    how long the remaining jobs will take. Falls back to simple linear
    estimation when historical data is unavailable.

    Attributes:
        rule_stats: Dictionary mapping rule names to their timing statistics.
        thread_stats: Dictionary mapping rule names to thread-conditioned timing stats.
        wildcard_stats: Nested dict of wildcard-conditioned timing stats.
        use_wildcard_conditioning: Whether to use wildcard-specific estimates.
        config: Centralized estimation configuration.
    """

    def __init__(
        self,
        use_wildcard_conditioning: bool = False,
        half_life_days: float | None = None,
        weighting_strategy: WeightingStrategy | None = None,
        half_life_logs: int | None = None,
        config: EstimationConfig | None = None,
        rule_registry: "RuleRegistry | None" = None,
    ) -> None:
        """
        Initialize the estimator.

        Args:
            use_wildcard_conditioning: Whether to use wildcard-specific timing.
            half_life_days: Half-life in days for time-based weighting.
                           After this many days, a run's weight is halved.
                           Only used when weighting_strategy="time".
            weighting_strategy: Strategy for weighting historical data.
                              "time" - decay based on wall-clock time (good for stable pipelines)
                              "index" - decay based on run count (good for active development)
            half_life_logs: Half-life in log count for index-based weighting.
                           After this many runs, a run's weight is halved.
                           Only used when weighting_strategy="index".
            config: Centralized estimation configuration. If not provided, uses
                   DEFAULT_CONFIG with any explicit parameters overriding it.
            rule_registry: RuleRegistry for centralized statistics. If not provided,
                          creates an internal registry.
        """
        from snakesee.state.rule_registry import RuleRegistry

        # Use provided config or default
        self.config = config if config is not None else DEFAULT_CONFIG

        # Centralized registry - create internal one if not provided
        self._rule_registry: RuleRegistry = rule_registry or RuleRegistry(config=self.config)

        # Cache for global_mean_duration (invalidated when sample count or rule count changes)
        self._global_mean_cache: float | None = None
        self._global_mean_cache_sample_count: int = 0
        self._global_mean_cache_rule_count: int = 0

        self.current_rules: set[str] | None = None  # Rules in current workflow (for filtering)
        self.code_hash_to_rules: dict[str, set[str]] = {}  # For renamed rule detection
        self.expected_job_counts: dict[str, int] | None = None  # Expected counts from Job stats
        self.use_wildcard_conditioning = use_wildcard_conditioning
        self._wildcard_conditioning_explicit = use_wildcard_conditioning  # Track if explicitly set

        # Use explicit params if provided, otherwise use config values
        self.weighting_strategy: WeightingStrategy = (
            weighting_strategy if weighting_strategy is not None else self.config.weighting_strategy
        )
        self.half_life_days = (
            half_life_days if half_life_days is not None else self.config.half_life_days
        )
        self.half_life_logs = (
            half_life_logs if half_life_logs is not None else self.config.half_life_logs
        )

        self._max_observed_thread_sum: float = 0.0
        self._provided_cores: int | None = None

        # Helper components
        self._rule_matcher = RuleMatchingEngine(max_distance=self.config.fuzzy_match_max_distance)
        self._pending_inferrer = PendingRuleInferrer()
        self._data_loader: HistoricalDataLoader | None = None

    @property
    def rule_stats(self) -> dict[str, RuleTimingStats]:
        """Get rule stats dict from the registry.

        Returns a dict view for backward compatibility with code that reads rule_stats.
        """
        return self._rule_registry.to_rule_stats_dict()

    @rule_stats.setter
    def rule_stats(self, value: dict[str, RuleTimingStats]) -> None:
        """Set rule stats by loading into the registry.

        Supports backward compatibility with code that sets rule_stats directly.
        """
        self._rule_registry.load_from_rule_stats(value)
        # Invalidate cache when rule_stats is set directly
        self._global_mean_cache = None

    @property
    def thread_stats(self) -> dict[str, ThreadTimingStats]:
        """Get thread stats dict from the registry."""
        return self._rule_registry.to_thread_stats_dict()

    @property
    def wildcard_stats(self) -> dict[str, dict[str, WildcardTimingStats]]:
        """Get wildcard stats dict from the registry."""
        return self._rule_registry.to_wildcard_stats_dict()

    def _get_data_loader(self) -> HistoricalDataLoader:
        """Get or create the data loader."""
        if self._data_loader is None:
            self._data_loader = HistoricalDataLoader(
                registry=self._rule_registry,
                use_wildcard_conditioning=self.use_wildcard_conditioning,
            )
        return self._data_loader

    def load_from_metadata(
        self,
        metadata_dir: Path,
        progress_callback: "ProgressCallback | None" = None,
    ) -> None:
        """
        Load historical execution times from .snakemake/metadata/.

        Uses a single-pass parser for efficiency - reads each metadata file
        only once to collect timing stats, code hashes, and wildcard stats.
        Data is recorded directly into the RuleRegistry.

        Args:
            metadata_dir: Path to .snakemake/metadata/ directory.
            progress_callback: Optional callback(current, total) for progress reporting.
        """
        loader = self._get_data_loader()
        loader.load_from_metadata(metadata_dir, progress_callback)
        self.code_hash_to_rules = loader.code_hash_to_rules

    def load_from_backend(
        self,
        backend: "PersistenceBackend",
        progress_callback: "ProgressCallback | None" = None,
    ) -> None:
        """Load historical execution times from a persistence backend.

        Supports both filesystem and SQLite backends via the PersistenceBackend
        protocol. Delegates to HistoricalDataLoader.load_from_backend().

        Args:
            backend: Persistence backend to read metadata from.
            progress_callback: Optional callback(current, total) for progress reporting.
        """
        loader = self._get_data_loader()
        loader.load_from_backend(backend, progress_callback)
        self.code_hash_to_rules = loader.code_hash_to_rules

    def load_from_events(self, events_file: Path) -> None:
        """
        Load historical execution times from a snakesee events file.

        Parses the .snakesee_events.jsonl file to extract job durations from
        job_finished events. Data is recorded directly into the RuleRegistry.

        Args:
            events_file: Path to .snakesee_events.jsonl file.
        """
        loader = self._get_data_loader()
        has_wildcards = loader.load_from_events(events_file)

        # Auto-enable wildcard conditioning if we have wildcard data,
        # but only if user didn't explicitly set it to False
        if has_wildcards and not self._wildcard_conditioning_explicit:
            self.use_wildcard_conditioning = True

    def _find_similar_rule(
        self, rule: str, max_distance: int | None = None
    ) -> tuple[str, RuleTimingStats] | None:
        """
        Find the most similar known rule using code hash and Levenshtein distance.

        First checks if any known rule shares the same code hash (renamed rule).
        Then falls back to Levenshtein distance for similar names.

        Args:
            rule: The unknown rule name to match.
            max_distance: Maximum edit distance to consider a match.
                          Defaults to config.fuzzy_match_max_distance.

        Returns:
            Tuple of (matched_rule_name, stats) if a similar rule is found,
            None otherwise.
        """
        effective_stats = self.rule_stats
        if not effective_stats:
            return None

        return self._rule_matcher.find_best_match(
            rule=rule,
            code_hash_to_rules=self.code_hash_to_rules,
            rule_stats=effective_stats,
            max_distance=max_distance,
        )

    def _get_weighted_mean(self, stats: RuleTimingStats) -> float:
        """Get weighted mean using configured strategy."""
        return stats.weighted_mean(
            half_life_days=self.half_life_days,
            strategy=self.weighting_strategy,
            half_life_logs=self.half_life_logs,
        )

    def _get_size_scaled_estimate(
        self, stats: RuleTimingStats, input_size: int
    ) -> tuple[float, float]:
        """Get size-scaled estimate using configured strategy."""
        return stats.size_scaled_estimate(
            input_size=input_size,
            half_life_days=self.half_life_days,
            strategy=self.weighting_strategy,
            half_life_logs=self.half_life_logs,
        )

    def _get_recency_factor(self, stats: RuleTimingStats) -> float:
        """Get recency factor using configured strategy."""
        return stats.recency_factor(
            half_life_days=self.half_life_days,
            strategy=self.weighting_strategy,
            half_life_logs=self.half_life_logs,
        )

    def _try_combination_estimate(
        self, rule: str, wildcards: dict[str, str] | None, threads: int | None
    ) -> tuple[float, float] | None:
        """Try to get estimate from full wildcard+threads combination stats.

        This provides the most precise estimate when we have historical data
        for the exact same combination of wildcards and threads.

        Uses a two-tier approach:
        1. First try with standard MIN_SAMPLES threshold (3+) for high confidence
        2. Fall back to any historical data (1+ samples) with higher variance

        Args:
            rule: The rule name.
            wildcards: Wildcard values for the job.
            threads: Thread count for the job.

        Returns:
            Tuple of (expected_duration, variance) if combination stats available, None otherwise.
        """
        # First try with standard threshold for high-confidence estimate
        combo_stats = self._rule_registry.get_combination_stats(rule, wildcards, threads)

        if combo_stats is None:
            # Fall back to any historical data (even 1-2 samples)
            # This is better than using the rule average which may be completely wrong
            combo_stats = self._rule_registry.get_combination_stats(
                rule, wildcards, threads, min_samples=1
            )
            if combo_stats is None:
                return None

            # Use higher variance for low-sample estimates
            combo_mean = self._get_weighted_mean(combo_stats)
            # With 1-2 samples, use 50% variance multiplier (wider bounds)
            combo_var = (combo_mean * 0.5) ** 2
            return combo_mean, combo_var

        combo_mean = self._get_weighted_mean(combo_stats)
        # Tight variance for high-confidence combination match (3+ samples)
        combo_var = (
            combo_stats.std_dev**2
            if combo_stats.count > 1
            else (combo_mean * self.config.variance.exact_wildcard_match) ** 2
        )
        return combo_mean, combo_var

    def _try_thread_estimate(self, rule: str, threads: int) -> tuple[float, float] | None:
        """Try to get estimate from thread-specific stats.

        Args:
            rule: The rule name.
            threads: Thread count for the job.

        Returns:
            Tuple of (expected_duration, variance) if thread stats available, None otherwise.
        """
        effective_thread_stats = self.thread_stats
        if rule not in effective_thread_stats:
            return None

        thread_stats, matched_threads = effective_thread_stats[rule].get_best_match(threads)
        if thread_stats is None:
            return None

        thread_mean = self._get_weighted_mean(thread_stats)
        var_mult = self.config.variance

        # Tighter variance for exact thread match, wider for aggregate fallback
        if matched_threads is not None:
            thread_var = (
                thread_stats.std_dev**2
                if thread_stats.count > 1
                else (thread_mean * var_mult.exact_thread_match) ** 2
            )
        else:
            # Aggregate fallback - use standard variance
            thread_var = (
                thread_stats.std_dev**2
                if thread_stats.count > 1
                else (thread_mean * var_mult.aggregate_fallback) ** 2
            )
        return thread_mean, thread_var

    def _try_wildcard_estimate(
        self, rule: str, wildcards: dict[str, str]
    ) -> tuple[float, float] | None:
        """Try to get estimate from wildcard-specific stats.

        Args:
            rule: The rule name.
            wildcards: Wildcard values for the job.

        Returns:
            Tuple of (expected_duration, variance) if wildcard stats available, None otherwise.
        """
        effective_wildcard_stats = self.wildcard_stats
        if rule not in effective_wildcard_stats:
            return None

        rule_wc_stats = effective_wildcard_stats[rule]

        # Find the most predictive wildcard key for this rule
        best_key = WildcardTimingStats.get_most_predictive_key(rule_wc_stats)

        if best_key is None or best_key not in wildcards:
            return None

        wc_value = wildcards[best_key]
        wts = rule_wc_stats[best_key]
        value_stats = wts.get_stats_for_value(wc_value)

        if value_stats is None:
            return None

        # Use wildcard-specific statistics
        rule_mean = self._get_weighted_mean(value_stats)
        rule_var = (
            value_stats.std_dev**2
            if value_stats.count > 1
            else (rule_mean * self.config.variance.exact_wildcard_match) ** 2
        )
        return rule_mean, rule_var

    def _try_rule_estimate(self, rule: str, input_size: int | None) -> tuple[float, float] | None:
        """Try to get estimate from rule-level stats.

        Args:
            rule: The rule name.
            input_size: Optional input file size in bytes for size-scaled estimates.

        Returns:
            Tuple of (expected_duration, variance) if rule stats available, None otherwise.
        """
        effective_stats = self.rule_stats
        if rule not in effective_stats:
            return None

        stats = effective_stats[rule]

        # Try size-scaled estimate if input size is available
        if input_size is not None and input_size > 0:
            scaled_est, confidence = self._get_size_scaled_estimate(stats, input_size)
            size_conf_threshold = self.config.confidence_thresholds.size_scaling_min
            if confidence > size_conf_threshold:
                # Reduce variance for size-scaled estimates
                rule_var = (
                    stats.std_dev**2 * (1 - confidence * 0.5)
                    if stats.count > 1
                    else (scaled_est * self.config.variance.size_scaled) ** 2
                )
                return scaled_est, rule_var

        # Standard rule-level estimate
        rule_mean = self._get_weighted_mean(stats)
        rule_var = (
            stats.std_dev**2
            if stats.count > 1
            else (rule_mean * self.config.variance.rule_fallback) ** 2
        )
        return rule_mean, rule_var

    def _try_fuzzy_match_estimate(self, rule: str) -> tuple[float, float] | None:
        """Try to get estimate from fuzzy-matched similar rules.

        Args:
            rule: The unknown rule name to match.

        Returns:
            Tuple of (expected_duration, variance) if similar rule found, None otherwise.
        """
        similar = self._find_similar_rule(rule)
        if similar is None:
            return None

        _matched_rule, stats = similar
        rule_mean = self._get_weighted_mean(stats)
        # Wider variance for fuzzy matches due to uncertainty
        rule_var = (
            stats.std_dev**2
            if stats.count > 1
            else (rule_mean * self.config.variance.fuzzy_match) ** 2
        )
        return rule_mean, rule_var

    def get_estimate_for_job(
        self,
        rule: str,
        wildcards: dict[str, str] | None = None,
        input_size: int | None = None,
        threads: int | None = None,
    ) -> tuple[float, float]:
        """
        Get expected duration and variance for a specific job.

        Uses a cascade of estimation strategies in priority order:
        1. Full combination stats (wildcards + threads together)
        2. Thread-specific stats
        3. Wildcard-specific stats (if enabled)
        4. Rule-level stats (with optional size scaling)
        5. Fuzzy matching for renamed/similar rules
        6. Global mean fallback

        Args:
            rule: The rule name.
            wildcards: Optional wildcard values for the job.
            input_size: Optional input file size in bytes for size-scaled estimates.
            threads: Optional thread count for thread-specific estimates.

        Returns:
            Tuple of (expected_duration, variance).  The duration is always
            wall-clock seconds, even when thread-specific stats are used.
            Callers that need thread-seconds should multiply by the job's
            thread count: ``duration * threads = thread_seconds``.
        """
        # Strategy 1: Full combination stats (wildcards + threads together)
        # This is the most precise when we have data for the exact combination
        if self.use_wildcard_conditioning and (wildcards or (threads is not None)):
            result = self._try_combination_estimate(rule, wildcards, threads)
            if result is not None:
                return result

        # Strategy 2: Thread-specific stats
        if threads is not None:
            result = self._try_thread_estimate(rule, threads)
            if result is not None:
                return result

        # Strategy 3: Wildcard-specific stats (if enabled)
        if self.use_wildcard_conditioning and wildcards:
            result = self._try_wildcard_estimate(rule, wildcards)
            if result is not None:
                return result

        # Strategy 4: Rule-level stats (with optional size scaling)
        result = self._try_rule_estimate(rule, input_size)
        if result is not None:
            return result

        # Strategy 5: Fuzzy matching for renamed/similar rules
        result = self._try_fuzzy_match_estimate(rule)
        if result is not None:
            return result

        # Strategy 6: Global mean fallback
        global_mean = self.global_mean_duration()
        return global_mean, (global_mean * self.config.variance.global_fallback) ** 2

    def global_mean_duration(self) -> float:
        """
        Get the global average duration across all known rules.

        Used as a fallback when a specific rule has no historical data.
        Result is cached and invalidated when sample count or rule count changes.

        Returns:
            Average duration in seconds, or config.default_global_mean if no data.
        """
        # Check cache validity using sample count and rule count as version indicators
        current_sample_count = self._rule_registry.total_sample_count()
        current_rule_count = self._rule_registry.rule_count()
        if (
            self._global_mean_cache is not None
            and current_sample_count == self._global_mean_cache_sample_count
            and current_rule_count == self._global_mean_cache_rule_count
        ):
            return self._global_mean_cache

        # Recalculate and cache
        effective_stats = self.rule_stats
        all_durations = [d for stats in effective_stats.values() for d in stats.durations]
        self._global_mean_cache = (
            mean(all_durations) if all_durations else self.config.default_global_mean
        )
        self._global_mean_cache_sample_count = current_sample_count
        self._global_mean_cache_rule_count = current_rule_count
        return self._global_mean_cache

    def estimate_remaining(
        self,
        progress: WorkflowProgress,
    ) -> TimeEstimate:
        """
        Estimate remaining time for a workflow.

        Uses one of several estimation strategies depending on available data:
        1. "weighted" - Uses per-rule historical timing with exponential weighting
        2. "simple" - Falls back to average time per completed step

        Args:
            progress: Current workflow progress state.

        Returns:
            TimeEstimate with expected time, confidence bounds, and method.
        """

        # Update peak observed thread sum from currently running jobs.
        # Only use explicitly declared threads (not historical defaults) so
        # the peak stays grounded in runtime data.
        if progress.running_jobs:
            current = sum(
                float(job.threads) if job.threads is not None else 1.0
                for job in progress.running_jobs
            )
            self._max_observed_thread_sum = max(self._max_observed_thread_sum, current)

        # Handle edge case: no jobs to do
        if progress.total_jobs == 0:
            return TimeEstimate(
                seconds_remaining=0.0,
                lower_bound=0.0,
                upper_bound=0.0,
                confidence=1.0,
                method="complete",
            )

        # Handle edge case: workflow complete
        if progress.completed_jobs >= progress.total_jobs:
            return TimeEstimate(
                seconds_remaining=0.0,
                lower_bound=0.0,
                upper_bound=0.0,
                confidence=1.0,
                method="complete",
            )

        # Handle edge case: nothing completed yet
        if progress.completed_jobs == 0:
            return self._estimate_no_progress(progress)

        # Try weighted estimation with historical data
        if self.rule_stats:
            return self._estimate_weighted(progress)

        # Fall back to simple estimation
        return self._estimate_simple(progress)

    def _estimate_no_progress(self, progress: WorkflowProgress) -> TimeEstimate:
        """
        Estimate when no jobs have completed yet.

        Uses thread-seconds / inferred-cores model for parallelism adjustment.

        Args:
            progress: Current workflow progress.

        Returns:
            TimeEstimate with very low confidence.
        """
        global_mean = self.global_mean_duration()

        # Compute total thread-seconds from known jobs when available.
        # When we have a job list we can use per-job thread counts;
        # otherwise we assume 1 thread per job (no thread info available).
        has_job_list = bool(progress.pending_jobs_list or progress.running_jobs)
        if has_job_list:
            total_thread_seconds = sum(
                global_mean * self._job_threads(job)
                for job in chain(progress.pending_jobs_list, progress.running_jobs)
            )
        else:
            total_thread_seconds = global_mean * progress.total_jobs

        cores = self.estimated_cores
        estimate = total_thread_seconds / cores

        return TimeEstimate(
            seconds_remaining=estimate,
            lower_bound=estimate * self.config.bootstrap_lower_multiplier,
            upper_bound=estimate * self.config.bootstrap_upper_multiplier,
            confidence=self.config.confidence_thresholds.bootstrap_confidence,
            method="bootstrap",
            inferred_cores=cores,
        )

    def _estimate_simple(self, progress: WorkflowProgress) -> TimeEstimate:
        """
        Simple linear estimation using thread-seconds when possible.

        When pending/running job lists are available, computes an observed
        thread-second rate from completed work (elapsed × cores) and
        estimates remaining thread-seconds from pending job threads.  This
        handles mixed-thread workloads better than plain job-count
        extrapolation.

        When no job lists are available, falls back to the classic
        elapsed/completed × remaining linear extrapolation.

        Args:
            progress: Current workflow progress.

        Returns:
            TimeEstimate using simple extrapolation.
        """
        elapsed = progress.elapsed_seconds
        if elapsed is None or elapsed <= 0:
            return self._estimate_no_progress(progress)

        if progress.completed_jobs <= 0:
            return self._estimate_no_progress(progress)

        cores = self.estimated_cores

        # When we have pending/running job lists, use thread-seconds model
        # (same pattern as _estimate_weighted and _estimate_no_progress):
        # compute total remaining thread-seconds, then divide by cores.
        has_job_list = bool(progress.pending_jobs_list or progress.running_jobs)
        if has_job_list:
            # Observed wall-clock rate per job → thread-seconds per job
            avg_wall_per_job = elapsed / progress.completed_jobs
            avg_thread_seconds_per_job = avg_wall_per_job * cores

            # Sum remaining thread-seconds across pending + running jobs
            total_thread_seconds = sum(
                avg_thread_seconds_per_job * self._job_threads(job)
                for job in chain(progress.pending_jobs_list, progress.running_jobs)
            )
            # Subtract elapsed thread-seconds for running jobs
            for job in progress.running_jobs:
                job_elapsed = job.elapsed or 0.0
                t = self._job_threads(job)
                total_thread_seconds = max(0.0, total_thread_seconds - job_elapsed * t)

            estimate = total_thread_seconds / cores
        else:
            # Classic linear extrapolation (no thread info available)
            avg_time_per_step = elapsed / progress.completed_jobs
            remaining_steps = progress.total_jobs - progress.completed_jobs
            estimate = avg_time_per_step * remaining_steps

        # Confidence grows with more completed jobs
        confidence = min(
            self.config.simple_estimate_confidence_cap,
            progress.completed_jobs / self.config.simple_estimate_jobs_divisor,
        )

        # Wider bounds for fewer samples
        uncertainty = max(0.3, 1.0 - (progress.completed_jobs / progress.total_jobs))

        return TimeEstimate(
            seconds_remaining=estimate,
            lower_bound=estimate * (1 - uncertainty),
            upper_bound=estimate * (1 + uncertainty * 2),
            confidence=confidence,
            method="simple",
            inferred_cores=cores,
        )

    def _job_threads(self, job: JobInfo) -> float:
        """Resolve effective thread count for a job.

        Uses the job's declared threads if available, otherwise falls back
        to the weighted-average thread count from historical data for the rule.
        """
        if job.threads is not None:
            return float(job.threads)
        return self._rule_registry.typical_threads(job.rule)

    def set_provided_cores(self, cores: int) -> None:
        """Set the Snakemake ``-j``/``--cores`` value parsed from the log.

        When available, this provides a definitive upper bound for
        parallelism rather than relying solely on peak observed thread sums.

        Args:
            cores: Resolved core count (integer) from ``Provided cores: N``
                log line.  Snakemake resolves ``--cores all`` to the machine's
                CPU count before logging, so this is always an integer.
        """
        self._provided_cores = cores if cores >= 1 else None

    @property
    def estimated_cores(self) -> float:
        """Inferred core count used to convert thread-seconds to wall-clock time.

        Priority:
        1. ``Provided cores: N`` parsed from the Snakemake log (definitive).
        2. Peak observed sum of running-job threads (approximation of ``-j``).
        3. Fallback to 1.0 when no data is available.

        Returns:
            Estimated core count (>= 1.0).
        """
        if self._provided_cores is not None:
            return max(1.0, float(self._provided_cores))
        return max(1.0, self._max_observed_thread_sum)

    def _calculate_confidence_scores(
        self,
        rule_completed: dict[str, int],
        effective_stats: dict[str, RuleTimingStats],
    ) -> tuple[float, float, float, float]:
        """Calculate confidence component scores.

        Args:
            rule_completed: Dict of rule name to completion count.
            effective_stats: Dict of rule name to timing stats.

        Returns:
            Tuple of (sample_confidence, avg_recency, avg_consistency, data_coverage).
        """
        data_coverage = len(rule_completed) / max(1, len(effective_stats))
        total_samples = sum(s.count for s in effective_stats.values())
        sample_confidence = min(1.0, total_samples / self.config.min_samples_for_confidence)

        # Calculate average recency and consistency across rules with data
        recency_scores: list[float] = []
        consistency_scores: list[float] = []
        for rule in rule_completed:
            if rule in effective_stats:
                stats = effective_stats[rule]
                recency_scores.append(self._get_recency_factor(stats))
                consistency_scores.append(stats.recent_consistency())

        avg_recency = sum(recency_scores) / len(recency_scores) if recency_scores else 0.5
        avg_consistency = (
            sum(consistency_scores) / len(consistency_scores) if consistency_scores else 0.5
        )

        return sample_confidence, avg_recency, avg_consistency, data_coverage

    def _estimate_weighted(
        self,
        progress: WorkflowProgress,
    ) -> TimeEstimate:
        """Weighted estimation using per-rule historical timing.

        Algorithm Overview:
        ------------------
        This method implements an Exponentially Weighted Moving Average (EWMA)
        approach for time estimation, which is well-suited for workflows that
        evolve over time (e.g., code changes, data size variations).

        Key Design Decisions:

        1. **Exponential Weighting**: Recent executions are weighted more heavily
           than older ones. This is based on the observation that:
           - Pipeline code changes frequently during development
           - Recent runs better reflect current performance characteristics
           - Older timing data may reflect outdated code or different conditions

           The decay is controlled by half-life parameters:
           - `half_life_logs`: After N runs, a run's weight is halved (index-based)
           - `half_life_days`: After N days, a run's weight is halved (time-based)

        2. **Median Input Size for Size Scaling**: When scaling estimates by input
           file size, we use the median historical input size as the reference
           point rather than the mean. This provides robustness to outliers
           (e.g., test files that are much smaller than production files).

        3. **Thread-Aware Core Estimation**: Rather than using a naive sqrt(parallelism)
           formula, we compute total thread-seconds for remaining work and divide by
           the inferred number of available cores. Cores are estimated from the peak
           observed sum of running job threads (approximating snakemake's -j value).

        4. **Confidence Scoring**: Confidence is a weighted combination of:
           - Sample size (more historical data = more confidence)
           - Recency (fresher data = more confidence)
           - Consistency (lower variance = more confidence)
           - Data coverage (more rules with data = more confidence)

        Args:
            progress: Current workflow progress.

        Returns:
            TimeEstimate using weighted historical data.

        References:
            - EWMA: Standard statistical technique for time series smoothing
            - Half-life decay: Common in recommendation systems and caching
        """
        # Get rule completion counts from recent completions
        rule_completed: dict[str, int] = {}
        for job in progress.recent_completions:
            rule_completed[job.rule] = rule_completed.get(job.rule, 0) + 1

        # Count running jobs by rule
        running_by_rule: dict[str, int] = {}
        for job in progress.running_jobs:
            running_by_rule[job.rule] = running_by_rule.get(job.rule, 0) + 1

        # Only augment with historical counts if we don't have expected_job_counts
        # (to avoid skewing proportional inference with historical execution counts)
        effective_stats = self.rule_stats
        if not self.expected_job_counts:
            for rule, stats in effective_stats.items():
                if rule not in rule_completed:
                    rule_completed[rule] = stats.count

        # Estimate pending rule distribution
        # If expected_job_counts is set, _infer_pending_rules will use exact calculation.
        # Use WorkflowProgress.pending_jobs so failed and incomplete jobs are not double-counted.
        pending = progress.pending_jobs
        pending_rules = self._infer_pending_rules(
            rule_completed, pending, self.current_rules, running_by_rule
        )

        # Calculate expected remaining thread-seconds and variance
        total_thread_seconds = 0.0
        total_variance = 0.0
        global_mean = self.global_mean_duration()

        # Use wildcard-conditioned estimates for pending jobs if we have a COMPLETE job list
        # If pending_jobs_list is incomplete, fall back to rule-count-based estimation
        pending_list_len = len(progress.pending_jobs_list) if progress.pending_jobs_list else 0
        use_pending_list = pending_list_len > 0 and pending_list_len >= pending

        if use_pending_list:
            # We have actual pending jobs with wildcards - use per-job estimates
            for job in progress.pending_jobs_list:
                expected, variance = self.get_estimate_for_job(
                    rule=job.rule,
                    wildcards=job.wildcards,
                    input_size=job.input_size,
                    threads=job.threads,
                )
                t = self._job_threads(job)
                total_thread_seconds += expected * t
                # Variance in seconds² → thread-seconds²: Var(t·T) = T²·Var(t)
                total_variance += variance * t * t
        else:
            # Fall back to rule-count-based estimation
            for rule, count in pending_rules.items():
                if rule in effective_stats:
                    stats = effective_stats[rule]
                    rule_mean = self._get_weighted_mean(stats)
                    rule_var = (
                        stats.std_dev**2
                        if stats.count > 1
                        else (rule_mean * self.config.variance.rule_fallback) ** 2
                    )
                else:
                    # Unknown rule: use global mean with higher uncertainty
                    rule_mean = global_mean
                    rule_var = (global_mean * self.config.variance.global_fallback) ** 2

                threads = self._rule_registry.typical_threads(rule)
                total_thread_seconds += count * rule_mean * threads
                total_variance += count * rule_var * threads * threads

        # Add time for running jobs - use wildcard-specific estimates when available
        for job in progress.running_jobs:
            # Use get_estimate_for_job which handles wildcard conditioning
            expected, variance = self.get_estimate_for_job(
                rule=job.rule,
                wildcards=job.wildcards,
                input_size=job.input_size,
                threads=job.threads,
            )

            # Use actual elapsed time if available
            elapsed = job.elapsed or 0.0
            remaining = max(0, expected - elapsed)
            t = self._job_threads(job)
            total_thread_seconds += remaining * t
            total_variance += variance * t * t

        # Convert thread-seconds to wall-clock time using inferred cores
        cores = self.estimated_cores
        effective_time = total_thread_seconds / cores

        # Calculate confidence bounds (variance scales with cores^2)
        fallback_std = effective_time * self.config.variance.rule_fallback
        std_dev = (math.sqrt(total_variance) / cores) if total_variance > 0 else fallback_std
        lower = max(0, effective_time - 2 * std_dev)
        upper = effective_time + 2 * std_dev

        # Calculate confidence from multiple factors
        sample_conf, avg_recency, avg_consistency, data_coverage = (
            self._calculate_confidence_scores(rule_completed, effective_stats)
        )
        weights = self.config.confidence_weights
        base_confidence = (
            weights.sample_size * sample_conf
            + weights.recency * avg_recency
            + weights.consistency * avg_consistency
            + weights.data_coverage * data_coverage
        )
        confidence = min(self.config.confidence_thresholds.max_confidence, base_confidence)

        return TimeEstimate(
            seconds_remaining=effective_time,
            lower_bound=lower,
            upper_bound=upper,
            confidence=confidence,
            method="weighted",
            inferred_cores=cores,
        )

    def _infer_pending_rules(
        self,
        completed_by_rule: dict[str, int],
        pending_count: int,
        current_rules: set[str] | None = None,
        running_by_rule: dict[str, int] | None = None,
    ) -> dict[str, int]:
        """
        Infer the composition of pending rules.

        If expected_job_counts is available (from Job stats table), uses exact
        calculation: pending = expected - completed - running.
        Otherwise falls back to proportional inference.

        Args:
            completed_by_rule: Count of completed jobs per rule.
            pending_count: Total number of pending jobs.
            current_rules: Optional set of rules that exist in the current workflow.
                If provided, only rules in this set will be included in inference.
                This filters out deleted rules from historical data.
            running_by_rule: Optional count of running jobs per rule.

        Returns:
            Estimated count of pending jobs per rule.
        """
        return self._pending_inferrer.infer(
            completed_by_rule=completed_by_rule,
            pending_count=pending_count,
            expected_job_counts=self.expected_job_counts,
            current_rules=current_rules,
            running_by_rule=running_by_rule,
        )
Attributes
estimated_cores property
estimated_cores: float

Inferred core count used to convert thread-seconds to wall-clock time.

Priority: 1. Provided cores: N parsed from the Snakemake log (definitive). 2. Peak observed sum of running-job threads (approximation of -j). 3. Fallback to 1.0 when no data is available.

Returns:

Type Description
float

Estimated core count (>= 1.0).

rule_stats property writable
rule_stats: dict[str, RuleTimingStats]

Get rule stats dict from the registry.

Returns a dict view for backward compatibility with code that reads rule_stats.

thread_stats property
thread_stats: dict[str, ThreadTimingStats]

Get thread stats dict from the registry.

wildcard_stats property
wildcard_stats: dict[str, dict[str, WildcardTimingStats]]

Get wildcard stats dict from the registry.

Functions
__init__
__init__(use_wildcard_conditioning: bool = False, half_life_days: float | None = None, weighting_strategy: WeightingStrategy | None = None, half_life_logs: int | None = None, config: EstimationConfig | None = None, rule_registry: RuleRegistry | None = None) -> None

Initialize the estimator.

Parameters:

Name Type Description Default
use_wildcard_conditioning bool

Whether to use wildcard-specific timing.

False
half_life_days float | None

Half-life in days for time-based weighting. After this many days, a run's weight is halved. Only used when weighting_strategy="time".

None
weighting_strategy WeightingStrategy | None

Strategy for weighting historical data. "time" - decay based on wall-clock time (good for stable pipelines) "index" - decay based on run count (good for active development)

None
half_life_logs int | None

Half-life in log count for index-based weighting. After this many runs, a run's weight is halved. Only used when weighting_strategy="index".

None
config EstimationConfig | None

Centralized estimation configuration. If not provided, uses DEFAULT_CONFIG with any explicit parameters overriding it.

None
rule_registry RuleRegistry | None

RuleRegistry for centralized statistics. If not provided, creates an internal registry.

None
Source code in snakesee/estimation/estimator.py
def __init__(
    self,
    use_wildcard_conditioning: bool = False,
    half_life_days: float | None = None,
    weighting_strategy: WeightingStrategy | None = None,
    half_life_logs: int | None = None,
    config: EstimationConfig | None = None,
    rule_registry: "RuleRegistry | None" = None,
) -> None:
    """
    Initialize the estimator.

    Args:
        use_wildcard_conditioning: Whether to use wildcard-specific timing.
        half_life_days: Half-life in days for time-based weighting.
                       After this many days, a run's weight is halved.
                       Only used when weighting_strategy="time".
        weighting_strategy: Strategy for weighting historical data.
                          "time" - decay based on wall-clock time (good for stable pipelines)
                          "index" - decay based on run count (good for active development)
        half_life_logs: Half-life in log count for index-based weighting.
                       After this many runs, a run's weight is halved.
                       Only used when weighting_strategy="index".
        config: Centralized estimation configuration. If not provided, uses
               DEFAULT_CONFIG with any explicit parameters overriding it.
        rule_registry: RuleRegistry for centralized statistics. If not provided,
                      creates an internal registry.
    """
    from snakesee.state.rule_registry import RuleRegistry

    # Use provided config or default
    self.config = config if config is not None else DEFAULT_CONFIG

    # Centralized registry - create internal one if not provided
    self._rule_registry: RuleRegistry = rule_registry or RuleRegistry(config=self.config)

    # Cache for global_mean_duration (invalidated when sample count or rule count changes)
    self._global_mean_cache: float | None = None
    self._global_mean_cache_sample_count: int = 0
    self._global_mean_cache_rule_count: int = 0

    self.current_rules: set[str] | None = None  # Rules in current workflow (for filtering)
    self.code_hash_to_rules: dict[str, set[str]] = {}  # For renamed rule detection
    self.expected_job_counts: dict[str, int] | None = None  # Expected counts from Job stats
    self.use_wildcard_conditioning = use_wildcard_conditioning
    self._wildcard_conditioning_explicit = use_wildcard_conditioning  # Track if explicitly set

    # Use explicit params if provided, otherwise use config values
    self.weighting_strategy: WeightingStrategy = (
        weighting_strategy if weighting_strategy is not None else self.config.weighting_strategy
    )
    self.half_life_days = (
        half_life_days if half_life_days is not None else self.config.half_life_days
    )
    self.half_life_logs = (
        half_life_logs if half_life_logs is not None else self.config.half_life_logs
    )

    self._max_observed_thread_sum: float = 0.0
    self._provided_cores: int | None = None

    # Helper components
    self._rule_matcher = RuleMatchingEngine(max_distance=self.config.fuzzy_match_max_distance)
    self._pending_inferrer = PendingRuleInferrer()
    self._data_loader: HistoricalDataLoader | None = None
estimate_remaining
estimate_remaining(progress: WorkflowProgress) -> TimeEstimate

Estimate remaining time for a workflow.

Uses one of several estimation strategies depending on available data: 1. "weighted" - Uses per-rule historical timing with exponential weighting 2. "simple" - Falls back to average time per completed step

Parameters:

Name Type Description Default
progress WorkflowProgress

Current workflow progress state.

required

Returns:

Type Description
TimeEstimate

TimeEstimate with expected time, confidence bounds, and method.

Source code in snakesee/estimation/estimator.py
def estimate_remaining(
    self,
    progress: WorkflowProgress,
) -> TimeEstimate:
    """
    Estimate remaining time for a workflow.

    Uses one of several estimation strategies depending on available data:
    1. "weighted" - Uses per-rule historical timing with exponential weighting
    2. "simple" - Falls back to average time per completed step

    Args:
        progress: Current workflow progress state.

    Returns:
        TimeEstimate with expected time, confidence bounds, and method.
    """

    # Update peak observed thread sum from currently running jobs.
    # Only use explicitly declared threads (not historical defaults) so
    # the peak stays grounded in runtime data.
    if progress.running_jobs:
        current = sum(
            float(job.threads) if job.threads is not None else 1.0
            for job in progress.running_jobs
        )
        self._max_observed_thread_sum = max(self._max_observed_thread_sum, current)

    # Handle edge case: no jobs to do
    if progress.total_jobs == 0:
        return TimeEstimate(
            seconds_remaining=0.0,
            lower_bound=0.0,
            upper_bound=0.0,
            confidence=1.0,
            method="complete",
        )

    # Handle edge case: workflow complete
    if progress.completed_jobs >= progress.total_jobs:
        return TimeEstimate(
            seconds_remaining=0.0,
            lower_bound=0.0,
            upper_bound=0.0,
            confidence=1.0,
            method="complete",
        )

    # Handle edge case: nothing completed yet
    if progress.completed_jobs == 0:
        return self._estimate_no_progress(progress)

    # Try weighted estimation with historical data
    if self.rule_stats:
        return self._estimate_weighted(progress)

    # Fall back to simple estimation
    return self._estimate_simple(progress)
get_estimate_for_job
get_estimate_for_job(rule: str, wildcards: dict[str, str] | None = None, input_size: int | None = None, threads: int | None = None) -> tuple[float, float]

Get expected duration and variance for a specific job.

Uses a cascade of estimation strategies in priority order: 1. Full combination stats (wildcards + threads together) 2. Thread-specific stats 3. Wildcard-specific stats (if enabled) 4. Rule-level stats (with optional size scaling) 5. Fuzzy matching for renamed/similar rules 6. Global mean fallback

Parameters:

Name Type Description Default
rule str

The rule name.

required
wildcards dict[str, str] | None

Optional wildcard values for the job.

None
input_size int | None

Optional input file size in bytes for size-scaled estimates.

None
threads int | None

Optional thread count for thread-specific estimates.

None

Returns:

Type Description
float

Tuple of (expected_duration, variance). The duration is always

float

wall-clock seconds, even when thread-specific stats are used.

tuple[float, float]

Callers that need thread-seconds should multiply by the job's

tuple[float, float]

thread count: duration * threads = thread_seconds.

Source code in snakesee/estimation/estimator.py
def get_estimate_for_job(
    self,
    rule: str,
    wildcards: dict[str, str] | None = None,
    input_size: int | None = None,
    threads: int | None = None,
) -> tuple[float, float]:
    """
    Get expected duration and variance for a specific job.

    Uses a cascade of estimation strategies in priority order:
    1. Full combination stats (wildcards + threads together)
    2. Thread-specific stats
    3. Wildcard-specific stats (if enabled)
    4. Rule-level stats (with optional size scaling)
    5. Fuzzy matching for renamed/similar rules
    6. Global mean fallback

    Args:
        rule: The rule name.
        wildcards: Optional wildcard values for the job.
        input_size: Optional input file size in bytes for size-scaled estimates.
        threads: Optional thread count for thread-specific estimates.

    Returns:
        Tuple of (expected_duration, variance).  The duration is always
        wall-clock seconds, even when thread-specific stats are used.
        Callers that need thread-seconds should multiply by the job's
        thread count: ``duration * threads = thread_seconds``.
    """
    # Strategy 1: Full combination stats (wildcards + threads together)
    # This is the most precise when we have data for the exact combination
    if self.use_wildcard_conditioning and (wildcards or (threads is not None)):
        result = self._try_combination_estimate(rule, wildcards, threads)
        if result is not None:
            return result

    # Strategy 2: Thread-specific stats
    if threads is not None:
        result = self._try_thread_estimate(rule, threads)
        if result is not None:
            return result

    # Strategy 3: Wildcard-specific stats (if enabled)
    if self.use_wildcard_conditioning and wildcards:
        result = self._try_wildcard_estimate(rule, wildcards)
        if result is not None:
            return result

    # Strategy 4: Rule-level stats (with optional size scaling)
    result = self._try_rule_estimate(rule, input_size)
    if result is not None:
        return result

    # Strategy 5: Fuzzy matching for renamed/similar rules
    result = self._try_fuzzy_match_estimate(rule)
    if result is not None:
        return result

    # Strategy 6: Global mean fallback
    global_mean = self.global_mean_duration()
    return global_mean, (global_mean * self.config.variance.global_fallback) ** 2
global_mean_duration
global_mean_duration() -> float

Get the global average duration across all known rules.

Used as a fallback when a specific rule has no historical data. Result is cached and invalidated when sample count or rule count changes.

Returns:

Type Description
float

Average duration in seconds, or config.default_global_mean if no data.

Source code in snakesee/estimation/estimator.py
def global_mean_duration(self) -> float:
    """
    Get the global average duration across all known rules.

    Used as a fallback when a specific rule has no historical data.
    Result is cached and invalidated when sample count or rule count changes.

    Returns:
        Average duration in seconds, or config.default_global_mean if no data.
    """
    # Check cache validity using sample count and rule count as version indicators
    current_sample_count = self._rule_registry.total_sample_count()
    current_rule_count = self._rule_registry.rule_count()
    if (
        self._global_mean_cache is not None
        and current_sample_count == self._global_mean_cache_sample_count
        and current_rule_count == self._global_mean_cache_rule_count
    ):
        return self._global_mean_cache

    # Recalculate and cache
    effective_stats = self.rule_stats
    all_durations = [d for stats in effective_stats.values() for d in stats.durations]
    self._global_mean_cache = (
        mean(all_durations) if all_durations else self.config.default_global_mean
    )
    self._global_mean_cache_sample_count = current_sample_count
    self._global_mean_cache_rule_count = current_rule_count
    return self._global_mean_cache
load_from_backend
load_from_backend(backend: PersistenceBackend, progress_callback: ProgressCallback | None = None) -> None

Load historical execution times from a persistence backend.

Supports both filesystem and SQLite backends via the PersistenceBackend protocol. Delegates to HistoricalDataLoader.load_from_backend().

Parameters:

Name Type Description Default
backend PersistenceBackend

Persistence backend to read metadata from.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None
Source code in snakesee/estimation/estimator.py
def load_from_backend(
    self,
    backend: "PersistenceBackend",
    progress_callback: "ProgressCallback | None" = None,
) -> None:
    """Load historical execution times from a persistence backend.

    Supports both filesystem and SQLite backends via the PersistenceBackend
    protocol. Delegates to HistoricalDataLoader.load_from_backend().

    Args:
        backend: Persistence backend to read metadata from.
        progress_callback: Optional callback(current, total) for progress reporting.
    """
    loader = self._get_data_loader()
    loader.load_from_backend(backend, progress_callback)
    self.code_hash_to_rules = loader.code_hash_to_rules
load_from_events
load_from_events(events_file: Path) -> None

Load historical execution times from a snakesee events file.

Parses the .snakesee_events.jsonl file to extract job durations from job_finished events. Data is recorded directly into the RuleRegistry.

Parameters:

Name Type Description Default
events_file Path

Path to .snakesee_events.jsonl file.

required
Source code in snakesee/estimation/estimator.py
def load_from_events(self, events_file: Path) -> None:
    """
    Load historical execution times from a snakesee events file.

    Parses the .snakesee_events.jsonl file to extract job durations from
    job_finished events. Data is recorded directly into the RuleRegistry.

    Args:
        events_file: Path to .snakesee_events.jsonl file.
    """
    loader = self._get_data_loader()
    has_wildcards = loader.load_from_events(events_file)

    # Auto-enable wildcard conditioning if we have wildcard data,
    # but only if user didn't explicitly set it to False
    if has_wildcards and not self._wildcard_conditioning_explicit:
        self.use_wildcard_conditioning = True
load_from_metadata
load_from_metadata(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> None

Load historical execution times from .snakemake/metadata/.

Uses a single-pass parser for efficiency - reads each metadata file only once to collect timing stats, code hashes, and wildcard stats. Data is recorded directly into the RuleRegistry.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None
Source code in snakesee/estimation/estimator.py
def load_from_metadata(
    self,
    metadata_dir: Path,
    progress_callback: "ProgressCallback | None" = None,
) -> None:
    """
    Load historical execution times from .snakemake/metadata/.

    Uses a single-pass parser for efficiency - reads each metadata file
    only once to collect timing stats, code hashes, and wildcard stats.
    Data is recorded directly into the RuleRegistry.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress reporting.
    """
    loader = self._get_data_loader()
    loader.load_from_metadata(metadata_dir, progress_callback)
    self.code_hash_to_rules = loader.code_hash_to_rules
set_provided_cores
set_provided_cores(cores: int) -> None

Set the Snakemake -j/--cores value parsed from the log.

When available, this provides a definitive upper bound for parallelism rather than relying solely on peak observed thread sums.

Parameters:

Name Type Description Default
cores int

Resolved core count (integer) from Provided cores: N log line. Snakemake resolves --cores all to the machine's CPU count before logging, so this is always an integer.

required
Source code in snakesee/estimation/estimator.py
def set_provided_cores(self, cores: int) -> None:
    """Set the Snakemake ``-j``/``--cores`` value parsed from the log.

    When available, this provides a definitive upper bound for
    parallelism rather than relying solely on peak observed thread sums.

    Args:
        cores: Resolved core count (integer) from ``Provided cores: N``
            log line.  Snakemake resolves ``--cores all`` to the machine's
            CPU count before logging, so this is always an integer.
    """
    self._provided_cores = cores if cores >= 1 else None

Functions

pending_inferrer

Inference of pending rule distribution for time estimation.

Classes

PendingRuleInferrer

Infers the distribution of pending jobs by rule.

When we know the total pending count but not the breakdown by rule, this class infers the distribution based on: 1. Expected job counts (from Snakemake's Job stats table) if available 2. Proportional inference from completed job distribution otherwise

Source code in snakesee/estimation/pending_inferrer.py
class PendingRuleInferrer:
    """Infers the distribution of pending jobs by rule.

    When we know the total pending count but not the breakdown by rule,
    this class infers the distribution based on:
    1. Expected job counts (from Snakemake's Job stats table) if available
    2. Proportional inference from completed job distribution otherwise
    """

    def infer(
        self,
        completed_by_rule: dict[str, int],
        pending_count: int,
        expected_job_counts: dict[str, int] | None = None,
        current_rules: set[str] | None = None,
        running_by_rule: dict[str, int] | None = None,
    ) -> dict[str, int]:
        """Infer the distribution of pending jobs by rule.

        Args:
            completed_by_rule: Count of completed jobs per rule.
            pending_count: Total number of pending jobs.
            expected_job_counts: Expected counts from Job stats table (most accurate).
            current_rules: Set of rules in current workflow (filters deleted rules).
            running_by_rule: Count of running jobs per rule.

        Returns:
            Estimated count of pending jobs per rule.
        """
        if pending_count <= 0:
            return {}

        running_by_rule = running_by_rule or {}

        # Use exact calculation if we have expected job counts
        if expected_job_counts:
            return self._exact_calculation(
                expected_job_counts,
                completed_by_rule,
                running_by_rule,
            )

        # Fall back to proportional inference
        return self._proportional_inference(
            completed_by_rule,
            pending_count,
            current_rules,
        )

    def _exact_calculation(
        self,
        expected_job_counts: dict[str, int],
        completed_by_rule: dict[str, int],
        running_by_rule: dict[str, int],
    ) -> dict[str, int]:
        """Calculate pending using expected - completed - running."""
        pending_rules: dict[str, int] = {}

        for rule, expected in expected_job_counts.items():
            completed = completed_by_rule.get(rule, 0)
            running = running_by_rule.get(rule, 0)
            remaining = expected - completed - running
            if remaining > 0:
                pending_rules[rule] = remaining

        return pending_rules

    def _proportional_inference(
        self,
        completed_by_rule: dict[str, int],
        pending_count: int,
        current_rules: set[str] | None,
    ) -> dict[str, int]:
        """Infer pending distribution proportionally to completed jobs.

        Note: Due to rounding, the sum of returned values may not exactly
        equal pending_count. This is expected and the estimation handles
        this gracefully.
        """
        if not completed_by_rule:
            return {}

        # Filter out deleted rules if current_rules is provided
        if current_rules is not None:
            completed_by_rule = {r: c for r, c in completed_by_rule.items() if r in current_rules}

        total_completed = sum(completed_by_rule.values())
        if total_completed == 0:
            return {}

        pending_rules: dict[str, int] = {}
        for rule, count in completed_by_rule.items():
            proportion = count / total_completed
            estimated = round(pending_count * proportion)
            if estimated > 0:
                pending_rules[rule] = estimated

        return pending_rules
Functions
infer
infer(completed_by_rule: dict[str, int], pending_count: int, expected_job_counts: dict[str, int] | None = None, current_rules: set[str] | None = None, running_by_rule: dict[str, int] | None = None) -> dict[str, int]

Infer the distribution of pending jobs by rule.

Parameters:

Name Type Description Default
completed_by_rule dict[str, int]

Count of completed jobs per rule.

required
pending_count int

Total number of pending jobs.

required
expected_job_counts dict[str, int] | None

Expected counts from Job stats table (most accurate).

None
current_rules set[str] | None

Set of rules in current workflow (filters deleted rules).

None
running_by_rule dict[str, int] | None

Count of running jobs per rule.

None

Returns:

Type Description
dict[str, int]

Estimated count of pending jobs per rule.

Source code in snakesee/estimation/pending_inferrer.py
def infer(
    self,
    completed_by_rule: dict[str, int],
    pending_count: int,
    expected_job_counts: dict[str, int] | None = None,
    current_rules: set[str] | None = None,
    running_by_rule: dict[str, int] | None = None,
) -> dict[str, int]:
    """Infer the distribution of pending jobs by rule.

    Args:
        completed_by_rule: Count of completed jobs per rule.
        pending_count: Total number of pending jobs.
        expected_job_counts: Expected counts from Job stats table (most accurate).
        current_rules: Set of rules in current workflow (filters deleted rules).
        running_by_rule: Count of running jobs per rule.

    Returns:
        Estimated count of pending jobs per rule.
    """
    if pending_count <= 0:
        return {}

    running_by_rule = running_by_rule or {}

    # Use exact calculation if we have expected job counts
    if expected_job_counts:
        return self._exact_calculation(
            expected_job_counts,
            completed_by_rule,
            running_by_rule,
        )

    # Fall back to proportional inference
    return self._proportional_inference(
        completed_by_rule,
        pending_count,
        current_rules,
    )

rule_matcher

Rule matching utilities for fuzzy matching and code hash lookup.

Classes

RuleMatchingEngine

Matches rules by name similarity and code hash.

Used to find timing data for renamed rules or similar rules when exact matches aren't available.

Source code in snakesee/estimation/rule_matcher.py
class RuleMatchingEngine:
    """Matches rules by name similarity and code hash.

    Used to find timing data for renamed rules or similar rules
    when exact matches aren't available.
    """

    def __init__(self, max_distance: int = 3) -> None:
        """Initialize the matcher.

        Args:
            max_distance: Maximum Levenshtein distance for fuzzy matches.
        """
        self.max_distance = max_distance

    def find_by_code_hash(
        self,
        rule: str,
        code_hash_to_rules: dict[str, set[str]],
        known_rules: set[str],
    ) -> str | None:
        """Find a rule with matching code hash.

        When multiple rules share the same code hash and are in known_rules,
        returns the lexicographically smallest rule name for deterministic behavior.

        Args:
            rule: Rule name to look up.
            code_hash_to_rules: Mapping of code hashes to rule sets.
            known_rules: Set of rules with available stats.

        Returns:
            Name of matching rule (lexicographically smallest if multiple),
            or None if not found.
        """
        for _hash, rules in code_hash_to_rules.items():
            if rule in rules:
                # Find all candidate rules that match criteria
                candidates = {r for r in rules if r != rule and r in known_rules}
                if candidates:
                    # Return lexicographically smallest for deterministic selection
                    return min(candidates)
        return None

    def find_similar(
        self,
        rule: str,
        known_rules: set[str],
        max_distance: int | None = None,
    ) -> tuple[str, int] | None:
        """Find similar rule by Levenshtein distance.

        When multiple rules have the same distance, returns the lexicographically
        smallest one for deterministic behavior.

        Args:
            rule: Rule name to match.
            known_rules: Set of rules to search.
            max_distance: Maximum distance (uses instance default if None).

        Returns:
            Tuple of (matched_rule, distance), or None if no match.
        """
        if max_distance is None:
            max_distance = self.max_distance

        best_match: str | None = None
        best_distance = max_distance + 1

        for known_rule in known_rules:
            distance = levenshtein_distance(rule, known_rule)
            # Prefer lower distance, then lexicographically smaller name for ties
            if distance < best_distance or (
                distance == best_distance and best_match is not None and known_rule < best_match
            ):
                best_distance = distance
                best_match = known_rule

        if best_match is not None and best_distance <= max_distance:
            return best_match, best_distance

        return None

    def find_best_match(
        self,
        rule: str,
        code_hash_to_rules: dict[str, set[str]],
        rule_stats: dict[str, RuleTimingStats],
        max_distance: int | None = None,
    ) -> tuple[str, RuleTimingStats] | None:
        """Find the best matching rule using code hash then fuzzy matching.

        Args:
            rule: Rule name to match.
            code_hash_to_rules: Mapping of code hashes to rule sets.
            rule_stats: Available rule statistics.
            max_distance: Maximum Levenshtein distance.

        Returns:
            Tuple of (matched_rule, stats), or None if no match.
        """
        known_rules = set(rule_stats.keys())

        # Try code hash first (exact code = renamed rule)
        hash_match = self.find_by_code_hash(rule, code_hash_to_rules, known_rules)
        if hash_match is not None:
            return hash_match, rule_stats[hash_match]

        # Fall back to fuzzy name matching
        similar = self.find_similar(rule, known_rules, max_distance)
        if similar is not None:
            matched_rule, _distance = similar
            return matched_rule, rule_stats[matched_rule]

        return None
Functions
__init__
__init__(max_distance: int = 3) -> None

Initialize the matcher.

Parameters:

Name Type Description Default
max_distance int

Maximum Levenshtein distance for fuzzy matches.

3
Source code in snakesee/estimation/rule_matcher.py
def __init__(self, max_distance: int = 3) -> None:
    """Initialize the matcher.

    Args:
        max_distance: Maximum Levenshtein distance for fuzzy matches.
    """
    self.max_distance = max_distance
find_best_match
find_best_match(rule: str, code_hash_to_rules: dict[str, set[str]], rule_stats: dict[str, RuleTimingStats], max_distance: int | None = None) -> tuple[str, RuleTimingStats] | None

Find the best matching rule using code hash then fuzzy matching.

Parameters:

Name Type Description Default
rule str

Rule name to match.

required
code_hash_to_rules dict[str, set[str]]

Mapping of code hashes to rule sets.

required
rule_stats dict[str, RuleTimingStats]

Available rule statistics.

required
max_distance int | None

Maximum Levenshtein distance.

None

Returns:

Type Description
tuple[str, RuleTimingStats] | None

Tuple of (matched_rule, stats), or None if no match.

Source code in snakesee/estimation/rule_matcher.py
def find_best_match(
    self,
    rule: str,
    code_hash_to_rules: dict[str, set[str]],
    rule_stats: dict[str, RuleTimingStats],
    max_distance: int | None = None,
) -> tuple[str, RuleTimingStats] | None:
    """Find the best matching rule using code hash then fuzzy matching.

    Args:
        rule: Rule name to match.
        code_hash_to_rules: Mapping of code hashes to rule sets.
        rule_stats: Available rule statistics.
        max_distance: Maximum Levenshtein distance.

    Returns:
        Tuple of (matched_rule, stats), or None if no match.
    """
    known_rules = set(rule_stats.keys())

    # Try code hash first (exact code = renamed rule)
    hash_match = self.find_by_code_hash(rule, code_hash_to_rules, known_rules)
    if hash_match is not None:
        return hash_match, rule_stats[hash_match]

    # Fall back to fuzzy name matching
    similar = self.find_similar(rule, known_rules, max_distance)
    if similar is not None:
        matched_rule, _distance = similar
        return matched_rule, rule_stats[matched_rule]

    return None
find_by_code_hash
find_by_code_hash(rule: str, code_hash_to_rules: dict[str, set[str]], known_rules: set[str]) -> str | None

Find a rule with matching code hash.

When multiple rules share the same code hash and are in known_rules, returns the lexicographically smallest rule name for deterministic behavior.

Parameters:

Name Type Description Default
rule str

Rule name to look up.

required
code_hash_to_rules dict[str, set[str]]

Mapping of code hashes to rule sets.

required
known_rules set[str]

Set of rules with available stats.

required

Returns:

Type Description
str | None

Name of matching rule (lexicographically smallest if multiple),

str | None

or None if not found.

Source code in snakesee/estimation/rule_matcher.py
def find_by_code_hash(
    self,
    rule: str,
    code_hash_to_rules: dict[str, set[str]],
    known_rules: set[str],
) -> str | None:
    """Find a rule with matching code hash.

    When multiple rules share the same code hash and are in known_rules,
    returns the lexicographically smallest rule name for deterministic behavior.

    Args:
        rule: Rule name to look up.
        code_hash_to_rules: Mapping of code hashes to rule sets.
        known_rules: Set of rules with available stats.

    Returns:
        Name of matching rule (lexicographically smallest if multiple),
        or None if not found.
    """
    for _hash, rules in code_hash_to_rules.items():
        if rule in rules:
            # Find all candidate rules that match criteria
            candidates = {r for r in rules if r != rule and r in known_rules}
            if candidates:
                # Return lexicographically smallest for deterministic selection
                return min(candidates)
    return None
find_similar
find_similar(rule: str, known_rules: set[str], max_distance: int | None = None) -> tuple[str, int] | None

Find similar rule by Levenshtein distance.

When multiple rules have the same distance, returns the lexicographically smallest one for deterministic behavior.

Parameters:

Name Type Description Default
rule str

Rule name to match.

required
known_rules set[str]

Set of rules to search.

required
max_distance int | None

Maximum distance (uses instance default if None).

None

Returns:

Type Description
tuple[str, int] | None

Tuple of (matched_rule, distance), or None if no match.

Source code in snakesee/estimation/rule_matcher.py
def find_similar(
    self,
    rule: str,
    known_rules: set[str],
    max_distance: int | None = None,
) -> tuple[str, int] | None:
    """Find similar rule by Levenshtein distance.

    When multiple rules have the same distance, returns the lexicographically
    smallest one for deterministic behavior.

    Args:
        rule: Rule name to match.
        known_rules: Set of rules to search.
        max_distance: Maximum distance (uses instance default if None).

    Returns:
        Tuple of (matched_rule, distance), or None if no match.
    """
    if max_distance is None:
        max_distance = self.max_distance

    best_match: str | None = None
    best_distance = max_distance + 1

    for known_rule in known_rules:
        distance = levenshtein_distance(rule, known_rule)
        # Prefer lower distance, then lexicographically smaller name for ties
        if distance < best_distance or (
            distance == best_distance and best_match is not None and known_rule < best_match
        ):
            best_distance = distance
            best_match = known_rule

    if best_match is not None and best_distance <= max_distance:
        return best_match, best_distance

    return None

Functions

levenshtein_distance cached
levenshtein_distance(s1: str, s2: str) -> int

Calculate the Levenshtein distance between two strings.

Results are cached for efficiency when comparing the same rule names multiple times during estimation.

Parameters:

Name Type Description Default
s1 str

First string.

required
s2 str

Second string.

required

Returns:

Type Description
int

The minimum number of edits (insertions, deletions, substitutions)

int

needed to transform s1 into s2.

Source code in snakesee/estimation/rule_matcher.py
@functools.lru_cache(maxsize=256)
def levenshtein_distance(s1: str, s2: str) -> int:
    """Calculate the Levenshtein distance between two strings.

    Results are cached for efficiency when comparing the same rule names
    multiple times during estimation.

    Args:
        s1: First string.
        s2: Second string.

    Returns:
        The minimum number of edits (insertions, deletions, substitutions)
        needed to transform s1 into s2.
    """
    if len(s1) < len(s2):
        return levenshtein_distance(s2, s1)

    if len(s2) == 0:
        return len(s1)

    previous_row: list[int] = list(range(len(s2) + 1))
    for i, c1 in enumerate(s1):
        current_row = [i + 1]
        for j, c2 in enumerate(s2):
            insertions = previous_row[j + 1] + 1
            deletions = current_row[j] + 1
            substitutions = previous_row[j] + (c1 != c2)
            current_row.append(min(insertions, deletions, substitutions))
        previous_row = current_row

    return previous_row[-1]