Skip to content

parser

Parser package for Snakemake log file parsing.

This package provides components for parsing Snakemake log files:

  • IncrementalLogReader: Streaming reader with position tracking
  • LogFilePosition: File position and rotation tracking
  • LogLineParser: Line-by-line parsing with context
  • JobLifecycleTracker: Job start/finish tracking
  • FailureTracker: Failure deduplication

The parser module is split into focused components: - metadata.py: Metadata file parsing (MetadataRecord, parse_metadata_files) - stats.py: Timing statistics collection (collect_rule_timing_stats) - utils.py: Utility functions (_parse_wildcards, calculate_input_size) - core.py: Job parsing and workflow state assembly

Classes

FailureTracker

Tracks job failures with deduplication.

Prevents duplicate failure entries for the same rule/jobid combination.

Source code in snakesee/parser/failure_tracker.py
class FailureTracker:
    """Tracks job failures with deduplication.

    Prevents duplicate failure entries for the same rule/jobid combination.
    """

    __slots__ = ("_failed_jobs", "_seen_failures")

    def __init__(self) -> None:
        """Initialize the failure tracker."""
        self._failed_jobs: list[JobInfo] = []
        self._seen_failures: set[tuple[str, str | None]] = set()

    def record_failure(
        self,
        rule: str,
        jobid: str | None = None,
        wildcards: dict[str, str] | None = None,
        threads: int | None = None,
        log_file: Path | None = None,
    ) -> bool:
        """Record a job failure.

        Args:
            rule: Name of the failed rule.
            jobid: Job identifier (if known).
            wildcards: Wildcard values.
            threads: Thread count.
            log_file: Path to the log file.

        Returns:
            True if this is a new failure, False if duplicate.
        """
        key = (rule, jobid)
        if key in self._seen_failures:
            return False

        self._seen_failures.add(key)
        self._failed_jobs.append(
            JobInfo(
                rule=rule,
                job_id=jobid,
                wildcards=wildcards,
                threads=threads,
                log_file=log_file,
            )
        )
        return True

    def get_failed_jobs(self) -> list[JobInfo]:
        """Get list of failed jobs.

        Returns:
            List of JobInfo for failed jobs.
        """
        return list(self._failed_jobs)

    def reset(self) -> None:
        """Clear all failure tracking state."""
        self._failed_jobs.clear()
        self._seen_failures.clear()

Functions

__init__
__init__() -> None

Initialize the failure tracker.

Source code in snakesee/parser/failure_tracker.py
def __init__(self) -> None:
    """Initialize the failure tracker."""
    self._failed_jobs: list[JobInfo] = []
    self._seen_failures: set[tuple[str, str | None]] = set()
get_failed_jobs
get_failed_jobs() -> list[JobInfo]

Get list of failed jobs.

Returns:

Type Description
list[JobInfo]

List of JobInfo for failed jobs.

Source code in snakesee/parser/failure_tracker.py
def get_failed_jobs(self) -> list[JobInfo]:
    """Get list of failed jobs.

    Returns:
        List of JobInfo for failed jobs.
    """
    return list(self._failed_jobs)
record_failure
record_failure(rule: str, jobid: str | None = None, wildcards: dict[str, str] | None = None, threads: int | None = None, log_file: Path | None = None) -> bool

Record a job failure.

Parameters:

Name Type Description Default
rule str

Name of the failed rule.

required
jobid str | None

Job identifier (if known).

None
wildcards dict[str, str] | None

Wildcard values.

None
threads int | None

Thread count.

None
log_file Path | None

Path to the log file.

None

Returns:

Type Description
bool

True if this is a new failure, False if duplicate.

Source code in snakesee/parser/failure_tracker.py
def record_failure(
    self,
    rule: str,
    jobid: str | None = None,
    wildcards: dict[str, str] | None = None,
    threads: int | None = None,
    log_file: Path | None = None,
) -> bool:
    """Record a job failure.

    Args:
        rule: Name of the failed rule.
        jobid: Job identifier (if known).
        wildcards: Wildcard values.
        threads: Thread count.
        log_file: Path to the log file.

    Returns:
        True if this is a new failure, False if duplicate.
    """
    key = (rule, jobid)
    if key in self._seen_failures:
        return False

    self._seen_failures.add(key)
    self._failed_jobs.append(
        JobInfo(
            rule=rule,
            job_id=jobid,
            wildcards=wildcards,
            threads=threads,
            log_file=log_file,
        )
    )
    return True
reset
reset() -> None

Clear all failure tracking state.

Source code in snakesee/parser/failure_tracker.py
def reset(self) -> None:
    """Clear all failure tracking state."""
    self._failed_jobs.clear()
    self._seen_failures.clear()

IncrementalLogReader

Streaming reader for Snakemake log files with position tracking.

Reads log lines incrementally, tracking the current file position to only parse new content on subsequent calls. Maintains cumulative state for running jobs, completed jobs, failed jobs, and progress.

Handles log file rotation by detecting inode changes or file truncation.

This is a coordinator class that delegates to specialized components: - LogFilePosition: File position tracking and rotation detection - LogLineParser: Log line parsing with context tracking - JobLifecycleTracker: Job start/finish tracking - FailureTracker: Failure deduplication

Attributes:

Name Type Description
log_path

Path to the log file being monitored.

Source code in snakesee/parser/log_reader.py
class IncrementalLogReader:
    """Streaming reader for Snakemake log files with position tracking.

    Reads log lines incrementally, tracking the current file position to only
    parse new content on subsequent calls. Maintains cumulative state for
    running jobs, completed jobs, failed jobs, and progress.

    Handles log file rotation by detecting inode changes or file truncation.

    This is a coordinator class that delegates to specialized components:
    - LogFilePosition: File position tracking and rotation detection
    - LogLineParser: Log line parsing with context tracking
    - JobLifecycleTracker: Job start/finish tracking
    - FailureTracker: Failure deduplication

    Attributes:
        log_path: Path to the log file being monitored.
    """

    def __init__(self, log_path: Path) -> None:
        """Initialize the incremental log reader.

        Args:
            log_path: Path to the Snakemake log file.
        """
        self.log_path = log_path
        self._lock = threading.RLock()

        # Delegate components
        self._position = LogFilePosition(log_path)
        self._parser = LogLineParser()
        self._jobs = JobLifecycleTracker()
        self._failures = FailureTracker()

        # Progress state
        self._completed: int = 0
        self._total: int = 0

    def reset(self) -> None:
        """Reset reader to start of file and clear all state.

        Call this when switching to a different log file or to re-read
        from the beginning.
        """
        with self._lock:
            self._reset_unlocked()

    def set_log_path(self, log_path: Path) -> None:
        """Change the log file being monitored.

        Resets all state if the path changes.

        Args:
            log_path: New log file path.
        """
        with self._lock:
            if log_path != self.log_path:
                self.log_path = log_path
                self._position = LogFilePosition(log_path)
                self._reset_unlocked()

    def _reset_unlocked(self) -> None:
        """Reset state without acquiring lock (caller must hold lock)."""
        self._position.reset()
        self._parser.reset()
        self._jobs.reset()
        self._failures.reset()
        self._completed = 0
        self._total = 0

    def read_new_lines(self) -> int:
        """Read and parse new lines from the log file.

        Updates internal state based on new log content. This method
        should be called periodically to process new log entries.

        Returns:
            Number of new lines processed.
        """
        if not self.log_path.exists():
            return 0

        with self._lock:
            if self._position.check_rotation():
                # File was rotated - reset all state
                self._parser.reset()
                self._jobs.reset()
                self._failures.reset()
                self._completed = 0
                self._total = 0

            lines_processed = 0
            try:
                with open(self.log_path, "r", encoding="utf-8", errors="replace") as f:
                    # Clamp offset to file bounds
                    file_size = f.seek(0, 2)
                    self._position.clamp_to_size(file_size)
                    f.seek(self._position.offset)

                    for line in f:
                        self._process_line(line)
                        lines_processed += 1

                    self._position.offset = f.tell()

                    # Flush any pending error at end of file
                    if pending := self._parser.flush_pending_error():
                        self._process_event(pending)
            except FileNotFoundError:
                pass
            except PermissionError as e:
                logger.warning("Permission denied reading log file %s: %s", self.log_path, e)
            except OSError as e:
                logger.warning("Error reading log file %s: %s", self.log_path, e)

            return lines_processed

    def _process_line(self, line: str) -> None:
        """Process a parsed line and update state.

        Args:
            line: Raw log line to process.
        """
        events = self._parser.parse_line(line)
        for event in events:
            self._process_event(event)

    def _process_event(self, event: ParseEvent) -> None:
        """Process a single parsed event and update state.

        Args:
            event: Parsed event to process.
        """
        if event.event_type == ParseEventType.PROGRESS:
            completed = event.data["completed"]
            total = event.data["total"]
            if isinstance(completed, int) and isinstance(total, int):
                self._completed = completed
                self._total = total

        elif event.event_type == ParseEventType.JOBID:
            rule = event.data.get("rule")
            if rule is not None:
                jobid = str(event.data["jobid"])
                if not self._jobs.is_job_started(jobid):
                    timestamp = event.data.get("timestamp")
                    wildcards = event.data.get("wildcards")
                    threads = event.data.get("threads")
                    self._jobs.start_job(
                        jobid=jobid,
                        rule=str(rule),
                        start_time=float(timestamp)
                        if isinstance(timestamp, (int, float))
                        else None,
                        wildcards=wildcards if isinstance(wildcards, dict) else None,
                        threads=threads if isinstance(threads, int) else None,
                    )
                log_path = event.data.get("log_path")
                if isinstance(log_path, str):
                    self._jobs.set_job_log(jobid, log_path)

        elif event.event_type == ParseEventType.WILDCARDS:
            wc_jobid = event.data.get("jobid")
            wc_wildcards = event.data.get("wildcards")
            if isinstance(wc_jobid, str) and isinstance(wc_wildcards, dict):
                self._jobs.update_job(wc_jobid, wildcards=wc_wildcards)

        elif event.event_type == ParseEventType.THREADS:
            th_jobid = event.data.get("jobid")
            th_threads = event.data.get("threads")
            if isinstance(th_jobid, str) and isinstance(th_threads, int):
                self._jobs.update_job(th_jobid, threads=th_threads)

        elif event.event_type == ParseEventType.LOG_PATH:
            lp_jobid = event.data.get("jobid")
            lp_path = event.data.get("log_path")
            if isinstance(lp_jobid, str) and isinstance(lp_path, str):
                self._jobs.set_job_log(lp_jobid, lp_path)

        elif event.event_type == ParseEventType.JOB_FINISHED:
            fin_jobid = str(event.data["jobid"])
            fin_timestamp = event.data.get("timestamp")
            fin_end = float(fin_timestamp) if isinstance(fin_timestamp, (int, float)) else None
            self._jobs.finish_job(fin_jobid, end_time=fin_end)

        elif event.event_type == ParseEventType.ERROR:
            err_rule = str(event.data["rule"])
            err_jobid = event.data.get("jobid")
            err_log = event.data.get("log_path")
            err_wildcards = event.data.get("wildcards")
            err_threads = event.data.get("threads")
            self._failures.record_failure(
                rule=err_rule,
                jobid=str(err_jobid) if err_jobid is not None else None,
                wildcards=err_wildcards if isinstance(err_wildcards, dict) else None,
                threads=err_threads if isinstance(err_threads, int) else None,
                log_file=Path(err_log) if isinstance(err_log, str) else None,
            )

    @property
    def progress(self) -> tuple[int, int]:
        """Get current workflow progress.

        Returns:
            Tuple of (completed_count, total_count).
        """
        with self._lock:
            return self._completed, self._total

    @property
    def running_jobs(self) -> list[JobInfo]:
        """Get list of currently running jobs.

        Returns:
            List of JobInfo for jobs that started but haven't finished.
        """
        with self._lock:
            return self._jobs.get_running_jobs()

    @property
    def completed_jobs(self) -> list[JobInfo]:
        """Get list of completed jobs with timing info.

        Returns:
            List of JobInfo for completed jobs, sorted by end time (newest first).
        """
        with self._lock:
            return self._jobs.get_completed_jobs()

    @property
    def failed_jobs(self) -> list[JobInfo]:
        """Get list of failed jobs.

        Returns:
            List of JobInfo for jobs that encountered errors.
        """
        with self._lock:
            return self._failures.get_failed_jobs()

Attributes

completed_jobs property
completed_jobs: list[JobInfo]

Get list of completed jobs with timing info.

Returns:

Type Description
list[JobInfo]

List of JobInfo for completed jobs, sorted by end time (newest first).

failed_jobs property
failed_jobs: list[JobInfo]

Get list of failed jobs.

Returns:

Type Description
list[JobInfo]

List of JobInfo for jobs that encountered errors.

progress property
progress: tuple[int, int]

Get current workflow progress.

Returns:

Type Description
tuple[int, int]

Tuple of (completed_count, total_count).

running_jobs property
running_jobs: list[JobInfo]

Get list of currently running jobs.

Returns:

Type Description
list[JobInfo]

List of JobInfo for jobs that started but haven't finished.

Functions

__init__
__init__(log_path: Path) -> None

Initialize the incremental log reader.

Parameters:

Name Type Description Default
log_path Path

Path to the Snakemake log file.

required
Source code in snakesee/parser/log_reader.py
def __init__(self, log_path: Path) -> None:
    """Initialize the incremental log reader.

    Args:
        log_path: Path to the Snakemake log file.
    """
    self.log_path = log_path
    self._lock = threading.RLock()

    # Delegate components
    self._position = LogFilePosition(log_path)
    self._parser = LogLineParser()
    self._jobs = JobLifecycleTracker()
    self._failures = FailureTracker()

    # Progress state
    self._completed: int = 0
    self._total: int = 0
read_new_lines
read_new_lines() -> int

Read and parse new lines from the log file.

Updates internal state based on new log content. This method should be called periodically to process new log entries.

Returns:

Type Description
int

Number of new lines processed.

Source code in snakesee/parser/log_reader.py
def read_new_lines(self) -> int:
    """Read and parse new lines from the log file.

    Updates internal state based on new log content. This method
    should be called periodically to process new log entries.

    Returns:
        Number of new lines processed.
    """
    if not self.log_path.exists():
        return 0

    with self._lock:
        if self._position.check_rotation():
            # File was rotated - reset all state
            self._parser.reset()
            self._jobs.reset()
            self._failures.reset()
            self._completed = 0
            self._total = 0

        lines_processed = 0
        try:
            with open(self.log_path, "r", encoding="utf-8", errors="replace") as f:
                # Clamp offset to file bounds
                file_size = f.seek(0, 2)
                self._position.clamp_to_size(file_size)
                f.seek(self._position.offset)

                for line in f:
                    self._process_line(line)
                    lines_processed += 1

                self._position.offset = f.tell()

                # Flush any pending error at end of file
                if pending := self._parser.flush_pending_error():
                    self._process_event(pending)
        except FileNotFoundError:
            pass
        except PermissionError as e:
            logger.warning("Permission denied reading log file %s: %s", self.log_path, e)
        except OSError as e:
            logger.warning("Error reading log file %s: %s", self.log_path, e)

        return lines_processed
reset
reset() -> None

Reset reader to start of file and clear all state.

Call this when switching to a different log file or to re-read from the beginning.

Source code in snakesee/parser/log_reader.py
def reset(self) -> None:
    """Reset reader to start of file and clear all state.

    Call this when switching to a different log file or to re-read
    from the beginning.
    """
    with self._lock:
        self._reset_unlocked()
set_log_path
set_log_path(log_path: Path) -> None

Change the log file being monitored.

Resets all state if the path changes.

Parameters:

Name Type Description Default
log_path Path

New log file path.

required
Source code in snakesee/parser/log_reader.py
def set_log_path(self, log_path: Path) -> None:
    """Change the log file being monitored.

    Resets all state if the path changes.

    Args:
        log_path: New log file path.
    """
    with self._lock:
        if log_path != self.log_path:
            self.log_path = log_path
            self._position = LogFilePosition(log_path)
            self._reset_unlocked()

JobLifecycleTracker

Tracks jobs from start to completion.

Maintains state for started jobs, finished job IDs, and completed jobs with full timing information.

Source code in snakesee/parser/job_tracker.py
class JobLifecycleTracker:
    """Tracks jobs from start to completion.

    Maintains state for started jobs, finished job IDs, and completed
    jobs with full timing information.
    """

    __slots__ = (
        "_completed_jobs",
        "_finished_jobids",
        "_job_logs",
        "_started_jobs",
    )

    def __init__(self) -> None:
        """Initialize the job tracker."""
        self._started_jobs: dict[str, StartedJobData] = {}
        self._finished_jobids: set[str] = set()
        self._completed_jobs: list[JobInfo] = []
        self._job_logs: dict[str, str] = {}

    def start_job(
        self,
        jobid: str,
        rule: str,
        start_time: float | None = None,
        wildcards: dict[str, str] | None = None,
        threads: int | None = None,
    ) -> None:
        """Record a job starting.

        Args:
            jobid: Unique job identifier.
            rule: Name of the rule.
            start_time: Unix timestamp when job started.
            wildcards: Wildcard values for this job.
            threads: Number of threads allocated.
        """
        if jobid not in self._started_jobs:
            self._started_jobs[jobid] = StartedJobData(
                rule=rule,
                start_time=start_time,
                wildcards=wildcards,
                threads=threads,
            )

    def update_job(
        self,
        jobid: str,
        wildcards: dict[str, str] | None = None,
        threads: int | None = None,
    ) -> None:
        """Update an existing job's metadata.

        Args:
            jobid: Job identifier.
            wildcards: Wildcard values to update.
            threads: Thread count to update.
        """
        if jobid in self._started_jobs:
            if wildcards is not None:
                self._started_jobs[jobid]["wildcards"] = wildcards
            if threads is not None:
                self._started_jobs[jobid]["threads"] = threads

    def set_job_log(self, jobid: str, log_path: str) -> None:
        """Set the log file path for a job.

        Args:
            jobid: Job identifier.
            log_path: Path to the log file.
        """
        self._job_logs[jobid] = log_path

    def get_job_log(self, jobid: str) -> str | None:
        """Get the log file path for a job.

        Args:
            jobid: Job identifier.

        Returns:
            Log file path or None if not set.
        """
        return self._job_logs.get(jobid)

    def finish_job(self, jobid: str, end_time: float | None = None) -> JobInfo | None:
        """Record a job finishing.

        Args:
            jobid: Job identifier.
            end_time: Unix timestamp when job finished.

        Returns:
            JobInfo for the completed job, or None if job was not tracked.
        """
        self._finished_jobids.add(jobid)

        if jobid in self._started_jobs:
            job_data = self._started_jobs[jobid]
            log_path = self._job_logs.get(jobid)
            job_info = JobInfo(
                rule=job_data["rule"],
                job_id=jobid,
                start_time=job_data["start_time"],
                end_time=end_time,
                wildcards=job_data["wildcards"],
                threads=job_data["threads"],
                log_file=Path(log_path) if log_path else None,
            )
            self._completed_jobs.append(job_info)
            # Clean up started job data to prevent memory growth
            del self._started_jobs[jobid]
            # Also clean up the job log entry
            self._job_logs.pop(jobid, None)
            return job_info
        return None

    def get_running_jobs(self) -> list[JobInfo]:
        """Get list of jobs that started but haven't finished.

        Returns:
            List of JobInfo for running jobs.
        """
        running: list[JobInfo] = []
        for jobid, job_data in self._started_jobs.items():
            if jobid not in self._finished_jobids:
                log_path = self._job_logs.get(jobid)
                running.append(
                    JobInfo(
                        rule=job_data["rule"],
                        job_id=jobid,
                        start_time=job_data["start_time"],
                        wildcards=job_data["wildcards"],
                        threads=job_data["threads"],
                        log_file=Path(log_path) if log_path else None,
                    )
                )
        return running

    def get_completed_jobs(self) -> list[JobInfo]:
        """Get list of completed jobs sorted by end time (newest first).

        Returns:
            List of JobInfo for completed jobs.
        """
        return sorted(
            self._completed_jobs,
            key=lambda j: j.end_time or 0,
            reverse=True,
        )

    def is_job_started(self, jobid: str) -> bool:
        """Check if a job has been started.

        Args:
            jobid: Job identifier.

        Returns:
            True if the job has been started.
        """
        return jobid in self._started_jobs

    def reset(self) -> None:
        """Clear all job tracking state."""
        self._started_jobs.clear()
        self._finished_jobids.clear()
        self._completed_jobs.clear()
        self._job_logs.clear()

Functions

__init__
__init__() -> None

Initialize the job tracker.

Source code in snakesee/parser/job_tracker.py
def __init__(self) -> None:
    """Initialize the job tracker."""
    self._started_jobs: dict[str, StartedJobData] = {}
    self._finished_jobids: set[str] = set()
    self._completed_jobs: list[JobInfo] = []
    self._job_logs: dict[str, str] = {}
finish_job
finish_job(jobid: str, end_time: float | None = None) -> JobInfo | None

Record a job finishing.

Parameters:

Name Type Description Default
jobid str

Job identifier.

required
end_time float | None

Unix timestamp when job finished.

None

Returns:

Type Description
JobInfo | None

JobInfo for the completed job, or None if job was not tracked.

Source code in snakesee/parser/job_tracker.py
def finish_job(self, jobid: str, end_time: float | None = None) -> JobInfo | None:
    """Record a job finishing.

    Args:
        jobid: Job identifier.
        end_time: Unix timestamp when job finished.

    Returns:
        JobInfo for the completed job, or None if job was not tracked.
    """
    self._finished_jobids.add(jobid)

    if jobid in self._started_jobs:
        job_data = self._started_jobs[jobid]
        log_path = self._job_logs.get(jobid)
        job_info = JobInfo(
            rule=job_data["rule"],
            job_id=jobid,
            start_time=job_data["start_time"],
            end_time=end_time,
            wildcards=job_data["wildcards"],
            threads=job_data["threads"],
            log_file=Path(log_path) if log_path else None,
        )
        self._completed_jobs.append(job_info)
        # Clean up started job data to prevent memory growth
        del self._started_jobs[jobid]
        # Also clean up the job log entry
        self._job_logs.pop(jobid, None)
        return job_info
    return None
get_completed_jobs
get_completed_jobs() -> list[JobInfo]

Get list of completed jobs sorted by end time (newest first).

Returns:

Type Description
list[JobInfo]

List of JobInfo for completed jobs.

Source code in snakesee/parser/job_tracker.py
def get_completed_jobs(self) -> list[JobInfo]:
    """Get list of completed jobs sorted by end time (newest first).

    Returns:
        List of JobInfo for completed jobs.
    """
    return sorted(
        self._completed_jobs,
        key=lambda j: j.end_time or 0,
        reverse=True,
    )
get_job_log
get_job_log(jobid: str) -> str | None

Get the log file path for a job.

Parameters:

Name Type Description Default
jobid str

Job identifier.

required

Returns:

Type Description
str | None

Log file path or None if not set.

Source code in snakesee/parser/job_tracker.py
def get_job_log(self, jobid: str) -> str | None:
    """Get the log file path for a job.

    Args:
        jobid: Job identifier.

    Returns:
        Log file path or None if not set.
    """
    return self._job_logs.get(jobid)
get_running_jobs
get_running_jobs() -> list[JobInfo]

Get list of jobs that started but haven't finished.

Returns:

Type Description
list[JobInfo]

List of JobInfo for running jobs.

Source code in snakesee/parser/job_tracker.py
def get_running_jobs(self) -> list[JobInfo]:
    """Get list of jobs that started but haven't finished.

    Returns:
        List of JobInfo for running jobs.
    """
    running: list[JobInfo] = []
    for jobid, job_data in self._started_jobs.items():
        if jobid not in self._finished_jobids:
            log_path = self._job_logs.get(jobid)
            running.append(
                JobInfo(
                    rule=job_data["rule"],
                    job_id=jobid,
                    start_time=job_data["start_time"],
                    wildcards=job_data["wildcards"],
                    threads=job_data["threads"],
                    log_file=Path(log_path) if log_path else None,
                )
            )
    return running
is_job_started
is_job_started(jobid: str) -> bool

Check if a job has been started.

Parameters:

Name Type Description Default
jobid str

Job identifier.

required

Returns:

Type Description
bool

True if the job has been started.

Source code in snakesee/parser/job_tracker.py
def is_job_started(self, jobid: str) -> bool:
    """Check if a job has been started.

    Args:
        jobid: Job identifier.

    Returns:
        True if the job has been started.
    """
    return jobid in self._started_jobs
reset
reset() -> None

Clear all job tracking state.

Source code in snakesee/parser/job_tracker.py
def reset(self) -> None:
    """Clear all job tracking state."""
    self._started_jobs.clear()
    self._finished_jobids.clear()
    self._completed_jobs.clear()
    self._job_logs.clear()
set_job_log
set_job_log(jobid: str, log_path: str) -> None

Set the log file path for a job.

Parameters:

Name Type Description Default
jobid str

Job identifier.

required
log_path str

Path to the log file.

required
Source code in snakesee/parser/job_tracker.py
def set_job_log(self, jobid: str, log_path: str) -> None:
    """Set the log file path for a job.

    Args:
        jobid: Job identifier.
        log_path: Path to the log file.
    """
    self._job_logs[jobid] = log_path
start_job
start_job(jobid: str, rule: str, start_time: float | None = None, wildcards: dict[str, str] | None = None, threads: int | None = None) -> None

Record a job starting.

Parameters:

Name Type Description Default
jobid str

Unique job identifier.

required
rule str

Name of the rule.

required
start_time float | None

Unix timestamp when job started.

None
wildcards dict[str, str] | None

Wildcard values for this job.

None
threads int | None

Number of threads allocated.

None
Source code in snakesee/parser/job_tracker.py
def start_job(
    self,
    jobid: str,
    rule: str,
    start_time: float | None = None,
    wildcards: dict[str, str] | None = None,
    threads: int | None = None,
) -> None:
    """Record a job starting.

    Args:
        jobid: Unique job identifier.
        rule: Name of the rule.
        start_time: Unix timestamp when job started.
        wildcards: Wildcard values for this job.
        threads: Number of threads allocated.
    """
    if jobid not in self._started_jobs:
        self._started_jobs[jobid] = StartedJobData(
            rule=rule,
            start_time=start_time,
            wildcards=wildcards,
            threads=threads,
        )
update_job
update_job(jobid: str, wildcards: dict[str, str] | None = None, threads: int | None = None) -> None

Update an existing job's metadata.

Parameters:

Name Type Description Default
jobid str

Job identifier.

required
wildcards dict[str, str] | None

Wildcard values to update.

None
threads int | None

Thread count to update.

None
Source code in snakesee/parser/job_tracker.py
def update_job(
    self,
    jobid: str,
    wildcards: dict[str, str] | None = None,
    threads: int | None = None,
) -> None:
    """Update an existing job's metadata.

    Args:
        jobid: Job identifier.
        wildcards: Wildcard values to update.
        threads: Thread count to update.
    """
    if jobid in self._started_jobs:
        if wildcards is not None:
            self._started_jobs[jobid]["wildcards"] = wildcards
        if threads is not None:
            self._started_jobs[jobid]["threads"] = threads

LogFilePosition

Tracks file position and detects log rotation/truncation.

Handles common scenarios like log file rotation (inode change) and file truncation (size decrease).

Attributes:

Name Type Description
path

Path to the file being tracked.

Source code in snakesee/parser/file_position.py
class LogFilePosition:
    """Tracks file position and detects log rotation/truncation.

    Handles common scenarios like log file rotation (inode change)
    and file truncation (size decrease).

    Attributes:
        path: Path to the file being tracked.
    """

    __slots__ = ("path", "_offset", "_inode")

    def __init__(self, path: Path) -> None:
        """Initialize position tracker.

        Args:
            path: Path to the file to track.
        """
        self.path = path
        self._offset: int = 0
        self._inode: int | None = None

    @property
    def offset(self) -> int:
        """Current read position in the file."""
        return self._offset

    @offset.setter
    def offset(self, value: int) -> None:
        """Set the current read position."""
        self._offset = value

    def check_rotation(self) -> bool:
        """Check if the file has been rotated or truncated.

        Detects rotation by checking for inode change or file truncation
        (current size less than last known position).

        Returns:
            True if the file was rotated and position was reset.
        """
        try:
            stat = self.path.stat()
            current_inode = stat.st_ino
            current_size = stat.st_size

            if self._inode is not None and (
                current_inode != self._inode or current_size < self._offset
            ):
                self.reset()
                return True

            self._inode = current_inode
            return False
        except FileNotFoundError:
            return False
        except OSError:
            return False

    def reset(self) -> None:
        """Reset position to start of file."""
        self._offset = 0
        self._inode = None

    def clamp_to_size(self, file_size: int) -> None:
        """Clamp offset to file bounds.

        Args:
            file_size: Current file size (must be non-negative).
        """
        if file_size < 0:
            file_size = 0
        self._offset = min(self._offset, file_size)

Attributes

offset property writable
offset: int

Current read position in the file.

Functions

__init__
__init__(path: Path) -> None

Initialize position tracker.

Parameters:

Name Type Description Default
path Path

Path to the file to track.

required
Source code in snakesee/parser/file_position.py
def __init__(self, path: Path) -> None:
    """Initialize position tracker.

    Args:
        path: Path to the file to track.
    """
    self.path = path
    self._offset: int = 0
    self._inode: int | None = None
check_rotation
check_rotation() -> bool

Check if the file has been rotated or truncated.

Detects rotation by checking for inode change or file truncation (current size less than last known position).

Returns:

Type Description
bool

True if the file was rotated and position was reset.

Source code in snakesee/parser/file_position.py
def check_rotation(self) -> bool:
    """Check if the file has been rotated or truncated.

    Detects rotation by checking for inode change or file truncation
    (current size less than last known position).

    Returns:
        True if the file was rotated and position was reset.
    """
    try:
        stat = self.path.stat()
        current_inode = stat.st_ino
        current_size = stat.st_size

        if self._inode is not None and (
            current_inode != self._inode or current_size < self._offset
        ):
            self.reset()
            return True

        self._inode = current_inode
        return False
    except FileNotFoundError:
        return False
    except OSError:
        return False
clamp_to_size
clamp_to_size(file_size: int) -> None

Clamp offset to file bounds.

Parameters:

Name Type Description Default
file_size int

Current file size (must be non-negative).

required
Source code in snakesee/parser/file_position.py
def clamp_to_size(self, file_size: int) -> None:
    """Clamp offset to file bounds.

    Args:
        file_size: Current file size (must be non-negative).
    """
    if file_size < 0:
        file_size = 0
    self._offset = min(self._offset, file_size)
reset
reset() -> None

Reset position to start of file.

Source code in snakesee/parser/file_position.py
def reset(self) -> None:
    """Reset position to start of file."""
    self._offset = 0
    self._inode = None

LogLineParser dataclass

Parses individual Snakemake log lines.

Maintains parsing context across lines to handle multi-line log entries where information spans multiple lines.

Source code in snakesee/parser/line_parser.py
@dataclass
class LogLineParser:
    """Parses individual Snakemake log lines.

    Maintains parsing context across lines to handle multi-line
    log entries where information spans multiple lines.
    """

    context: ParsingContext = field(default_factory=ParsingContext)

    def parse_line(self, line: str) -> list[ParseEvent]:
        """Parse a single log line and return structured events.

        Uses fast-path prefix checks to skip expensive regex operations
        for lines that can't possibly match.

        Updates internal context as needed for multi-line entries.
        May return multiple events when a pending error needs to be flushed.

        Args:
            line: Log line to parse.

        Returns:
            List of ParseEvents (may be empty, one, or two events).
        """
        line = line.rstrip("\n\r")
        events: list[ParseEvent] = []

        # Fast path: empty lines
        if not line:
            return events

        first_char = line[0]

        # Timestamp lines start with '[' - this ends error blocks
        if first_char == "[":
            if match := TIMESTAMP_PATTERN.match(line):
                # Flush any pending error before emitting timestamp
                if pending := self.context.get_pending_error():
                    events.append(pending)
                timestamp = _parse_timestamp(match.group(1))
                self.context.timestamp = timestamp
                events.append(ParseEvent(ParseEventType.TIMESTAMP, {"timestamp": timestamp}))
            return events

        # Indented lines start with space/tab.  In group/pipe job blocks,
        # rule starts and timestamps are indented by 4 spaces.
        if first_char in (" ", "\t"):
            return self._parse_indented_or_group_line(line, events)

        # Rule/checkpoint start - this ends error blocks
        # Matches: "rule X:", "localrule X:", "checkpoint X:", "localcheckpoint X:"
        if first_char in ("r", "l", "c") and (
            line.startswith(("rule ", "localrule ", "checkpoint ", "localcheckpoint "))
        ):
            if match := RULE_START_PATTERN.match(line):
                # Flush any pending error before emitting rule start
                if pending := self.context.get_pending_error():
                    events.append(pending)
                rule = match.group(1)
                self.context.reset_for_new_rule(rule)
                events.append(ParseEvent(ParseEventType.RULE_START, {"rule": rule}))
            return events

        # Finished job: "Finished job X" or "Finished jobid: X"
        if first_char == "F" and line.startswith("Finished "):
            if match := FINISHED_JOB_PATTERN.search(line):
                jobid = match.group(1)
                events.append(
                    ParseEvent(
                        ParseEventType.JOB_FINISHED,
                        {"jobid": jobid, "timestamp": self.context.timestamp},
                    )
                )
            return events

        # Error detection: "Error in rule X:" - starts a pending error block
        if first_char == "E" and line.startswith("Error in rule "):
            if match := ERROR_IN_RULE_PATTERN.search(line):
                # Flush any previous pending error before starting new one
                if pending := self.context.get_pending_error():
                    events.append(pending)
                rule = match.group(1)
                # Start pending error - we'll capture jobid/log from subsequent lines
                self.context.start_error_block(rule)
            return events

        # Progress line: "X of Y steps (Z%) done" - check with substring first
        if "steps" in line and "done" in line:
            if match := PROGRESS_PATTERN.search(line):
                completed = int(match.group(1))
                total = int(match.group(2))
                events.append(
                    ParseEvent(ParseEventType.PROGRESS, {"completed": completed, "total": total})
                )

        return events

    def flush_pending_error(self) -> ParseEvent | None:
        """Flush any pending error event.

        Call this at end of file or when needing to ensure all errors are emitted.

        Returns:
            ParseEvent for the pending error, or None if no pending error.
        """
        return self.context.get_pending_error()

    def _parse_indented_or_group_line(
        self, line: str, events: list[ParseEvent]
    ) -> list[ParseEvent]:
        """Parse indented lines: group-block elements or property lines.

        In group/pipe job blocks, rule starts and timestamps are indented by
        4 spaces.  Property lines are indented by 4 (normal) or 8 (group) spaces.

        Args:
            line: Indented log line starting with space/tab.
            events: Mutable list to append events to.

        Returns:
            The events list (same object passed in).
        """
        stripped = line.lstrip()
        if not stripped:
            return events

        first_stripped = stripped[0]

        # Indented timestamp: "    [Mon Jan  6 10:00:00 2026]"
        if first_stripped == "[":
            if match := TIMESTAMP_PATTERN.match(stripped):
                if pending := self.context.get_pending_error():
                    events.append(pending)
                timestamp = _parse_timestamp(match.group(1))
                self.context.timestamp = timestamp
                events.append(ParseEvent(ParseEventType.TIMESTAMP, {"timestamp": timestamp}))
            return events

        # Indented rule start: "    rule X:", "    localrule X:",
        # "    checkpoint X:", or "    localcheckpoint X:"
        if first_stripped in ("r", "l", "c") and stripped.startswith(
            ("rule ", "localrule ", "checkpoint ", "localcheckpoint ")
        ):
            if match := RULE_START_PATTERN.match(stripped):
                if pending := self.context.get_pending_error():
                    events.append(pending)
                rule = match.group(1)
                self.context.reset_for_new_rule(rule)
                events.append(ParseEvent(ParseEventType.RULE_START, {"rule": rule}))
            return events

        # Property lines (wildcards, threads, log, jobid)
        event = self._parse_indented_line(line)
        if event:
            events.append(event)
        return events

    def _parse_indented_line(self, line: str) -> ParseEvent | None:
        """Parse indented property lines (wildcards, threads, log, jobid).

        Args:
            line: Indented log line starting with space/tab.

        Returns:
            ParseEvent if line contains a recognized property, None otherwise.
        """
        stripped = line.lstrip()

        # Check each property type by prefix (faster than regex)
        if stripped.startswith("wildcards:"):
            value = stripped[10:].strip()  # len('wildcards:') = 10
            wildcards = _parse_wildcards(value)
            self.context.wildcards = wildcards
            # Also update error context if in error block
            if self.context.has_pending_error():
                self.context.error_wildcards = wildcards
            return ParseEvent(
                ParseEventType.WILDCARDS,
                {"wildcards": wildcards, "jobid": self.context.jobid},
            )

        if stripped.startswith("threads:"):
            value = stripped[8:].strip()  # len('threads:') = 8
            threads = _parse_positive_int(value, "threads")
            if threads is not None:
                self.context.threads = threads
                # Also update error context if in error block
                if self.context.has_pending_error():
                    self.context.error_threads = threads
                return ParseEvent(
                    ParseEventType.THREADS,
                    {"threads": threads, "jobid": self.context.jobid},
                )
            return None

        if stripped.startswith("log:"):
            log_path = stripped[4:].strip()  # len('log:') = 4
            self.context.log_path = log_path
            # Also update error context if in error block
            if self.context.has_pending_error():
                self.context.error_log_path = log_path
            return ParseEvent(
                ParseEventType.LOG_PATH,
                {"log_path": log_path, "jobid": self.context.jobid},
            )

        if stripped.startswith("jobid:"):
            jobid = stripped[6:].strip()  # len('jobid:') = 6
            self.context.jobid = jobid
            # Also update error context if in error block
            if self.context.has_pending_error():
                self.context.error_jobid = jobid
            return ParseEvent(
                ParseEventType.JOBID,
                {
                    "jobid": jobid,
                    "rule": self.context.rule,
                    "wildcards": self.context.wildcards,
                    "threads": self.context.threads,
                    "timestamp": self.context.timestamp,
                    "log_path": self.context.log_path,
                },
            )

        return None

    def reset(self) -> None:
        """Reset parsing context."""
        self.context = ParsingContext()

Functions

flush_pending_error
flush_pending_error() -> ParseEvent | None

Flush any pending error event.

Call this at end of file or when needing to ensure all errors are emitted.

Returns:

Type Description
ParseEvent | None

ParseEvent for the pending error, or None if no pending error.

Source code in snakesee/parser/line_parser.py
def flush_pending_error(self) -> ParseEvent | None:
    """Flush any pending error event.

    Call this at end of file or when needing to ensure all errors are emitted.

    Returns:
        ParseEvent for the pending error, or None if no pending error.
    """
    return self.context.get_pending_error()
parse_line
parse_line(line: str) -> list[ParseEvent]

Parse a single log line and return structured events.

Uses fast-path prefix checks to skip expensive regex operations for lines that can't possibly match.

Updates internal context as needed for multi-line entries. May return multiple events when a pending error needs to be flushed.

Parameters:

Name Type Description Default
line str

Log line to parse.

required

Returns:

Type Description
list[ParseEvent]

List of ParseEvents (may be empty, one, or two events).

Source code in snakesee/parser/line_parser.py
def parse_line(self, line: str) -> list[ParseEvent]:
    """Parse a single log line and return structured events.

    Uses fast-path prefix checks to skip expensive regex operations
    for lines that can't possibly match.

    Updates internal context as needed for multi-line entries.
    May return multiple events when a pending error needs to be flushed.

    Args:
        line: Log line to parse.

    Returns:
        List of ParseEvents (may be empty, one, or two events).
    """
    line = line.rstrip("\n\r")
    events: list[ParseEvent] = []

    # Fast path: empty lines
    if not line:
        return events

    first_char = line[0]

    # Timestamp lines start with '[' - this ends error blocks
    if first_char == "[":
        if match := TIMESTAMP_PATTERN.match(line):
            # Flush any pending error before emitting timestamp
            if pending := self.context.get_pending_error():
                events.append(pending)
            timestamp = _parse_timestamp(match.group(1))
            self.context.timestamp = timestamp
            events.append(ParseEvent(ParseEventType.TIMESTAMP, {"timestamp": timestamp}))
        return events

    # Indented lines start with space/tab.  In group/pipe job blocks,
    # rule starts and timestamps are indented by 4 spaces.
    if first_char in (" ", "\t"):
        return self._parse_indented_or_group_line(line, events)

    # Rule/checkpoint start - this ends error blocks
    # Matches: "rule X:", "localrule X:", "checkpoint X:", "localcheckpoint X:"
    if first_char in ("r", "l", "c") and (
        line.startswith(("rule ", "localrule ", "checkpoint ", "localcheckpoint "))
    ):
        if match := RULE_START_PATTERN.match(line):
            # Flush any pending error before emitting rule start
            if pending := self.context.get_pending_error():
                events.append(pending)
            rule = match.group(1)
            self.context.reset_for_new_rule(rule)
            events.append(ParseEvent(ParseEventType.RULE_START, {"rule": rule}))
        return events

    # Finished job: "Finished job X" or "Finished jobid: X"
    if first_char == "F" and line.startswith("Finished "):
        if match := FINISHED_JOB_PATTERN.search(line):
            jobid = match.group(1)
            events.append(
                ParseEvent(
                    ParseEventType.JOB_FINISHED,
                    {"jobid": jobid, "timestamp": self.context.timestamp},
                )
            )
        return events

    # Error detection: "Error in rule X:" - starts a pending error block
    if first_char == "E" and line.startswith("Error in rule "):
        if match := ERROR_IN_RULE_PATTERN.search(line):
            # Flush any previous pending error before starting new one
            if pending := self.context.get_pending_error():
                events.append(pending)
            rule = match.group(1)
            # Start pending error - we'll capture jobid/log from subsequent lines
            self.context.start_error_block(rule)
        return events

    # Progress line: "X of Y steps (Z%) done" - check with substring first
    if "steps" in line and "done" in line:
        if match := PROGRESS_PATTERN.search(line):
            completed = int(match.group(1))
            total = int(match.group(2))
            events.append(
                ParseEvent(ParseEventType.PROGRESS, {"completed": completed, "total": total})
            )

    return events
reset
reset() -> None

Reset parsing context.

Source code in snakesee/parser/line_parser.py
def reset(self) -> None:
    """Reset parsing context."""
    self.context = ParsingContext()

MetadataRecord dataclass

Single metadata file parsed data for efficient single-pass collection.

Contains all fields needed by various collection functions so we only read each metadata file once.

Source code in snakesee/parser/metadata.py
@dataclass(frozen=True, slots=True)
class MetadataRecord:
    """Single metadata file parsed data for efficient single-pass collection.

    Contains all fields needed by various collection functions so we only
    read each metadata file once.
    """

    rule: str
    start_time: float | None = None
    end_time: float | None = None
    wildcards: dict[str, str] | None = None
    input_size: int | None = None
    code_hash: str | None = None

    @property
    def duration(self) -> float | None:
        """Calculate duration from start and end times."""
        if self.start_time is not None and self.end_time is not None:
            return self.end_time - self.start_time
        return None

    def to_job_info(self) -> JobInfo:
        """Convert to JobInfo for compatibility with existing code."""
        return JobInfo(
            rule=self.rule,
            start_time=self.start_time,
            end_time=self.end_time,
            wildcards=self.wildcards,
            input_size=self.input_size,
        )

Attributes

duration property
duration: float | None

Calculate duration from start and end times.

Functions

to_job_info
to_job_info() -> JobInfo

Convert to JobInfo for compatibility with existing code.

Source code in snakesee/parser/metadata.py
def to_job_info(self) -> JobInfo:
    """Convert to JobInfo for compatibility with existing code."""
    return JobInfo(
        rule=self.rule,
        start_time=self.start_time,
        end_time=self.end_time,
        wildcards=self.wildcards,
        input_size=self.input_size,
    )

ParseEvent

Bases: NamedTuple

Result of parsing a log line.

Source code in snakesee/parser/line_parser.py
class ParseEvent(NamedTuple):
    """Result of parsing a log line."""

    event_type: ParseEventType
    data: dict[str, object]

ParseEventType

Bases: Enum

Types of events that can be parsed from a log line.

Source code in snakesee/parser/line_parser.py
class ParseEventType(Enum):
    """Types of events that can be parsed from a log line."""

    TIMESTAMP = "timestamp"
    PROGRESS = "progress"
    RULE_START = "rule_start"
    WILDCARDS = "wildcards"
    THREADS = "threads"
    LOG_PATH = "log_path"
    JOBID = "jobid"
    JOB_FINISHED = "job_finished"
    ERROR = "error"

ParsingContext dataclass

Current parsing state for multi-line log entries.

Snakemake logs use multi-line blocks where context from earlier lines (rule, wildcards, etc.) applies to later lines (jobid).

Source code in snakesee/parser/line_parser.py
@dataclass
class ParsingContext:
    """Current parsing state for multi-line log entries.

    Snakemake logs use multi-line blocks where context from earlier
    lines (rule, wildcards, etc.) applies to later lines (jobid).
    """

    rule: str | None = None
    jobid: str | None = None
    wildcards: dict[str, str] | None = None
    threads: int | None = None
    timestamp: float | None = None
    log_path: str | None = None

    # Pending error state - we defer ERROR emission until the error block is fully parsed
    # because jobid and log come AFTER "Error in rule X:" in the error block
    pending_error_rule: str | None = None
    error_jobid: str | None = None
    error_wildcards: dict[str, str] | None = None
    error_threads: int | None = None
    error_log_path: str | None = None

    def reset_for_new_rule(self, rule: str) -> None:
        """Reset context when entering a new rule block.

        Args:
            rule: Name of the new rule.
        """
        self.rule = rule
        self.jobid = None
        self.wildcards = None
        self.threads = None
        self.log_path = None

    def start_error_block(self, rule: str) -> None:
        """Start tracking a pending error block.

        Args:
            rule: Name of the rule that errored.
        """
        self.pending_error_rule = rule
        # Initialize with current context if rule matches
        if self.rule == rule:
            self.error_jobid = self.jobid
            self.error_wildcards = self.wildcards
            self.error_threads = self.threads
            self.error_log_path = self.log_path
        else:
            self.error_jobid = None
            self.error_wildcards = None
            self.error_threads = None
            self.error_log_path = None

    def get_pending_error(self) -> ParseEvent | None:
        """Get the pending error event and clear it.

        Returns:
            ParseEvent for the error, or None if no pending error.
        """
        if self.pending_error_rule is None:
            return None

        # Strip "(check log file(s) for error details)" suffix from error blocks
        log_path = self.error_log_path
        if log_path and " (check log file" in log_path:
            log_path = log_path.split(" (check log file")[0].strip()

        event = ParseEvent(
            ParseEventType.ERROR,
            {
                "rule": self.pending_error_rule,
                "jobid": self.error_jobid,
                "wildcards": self.error_wildcards,
                "threads": self.error_threads,
                "log_path": log_path,
            },
        )
        # Clear pending error state
        self.pending_error_rule = None
        self.error_jobid = None
        self.error_wildcards = None
        self.error_threads = None
        self.error_log_path = None
        return event

    def has_pending_error(self) -> bool:
        """Check if there's a pending error to emit."""
        return self.pending_error_rule is not None

Functions

get_pending_error
get_pending_error() -> ParseEvent | None

Get the pending error event and clear it.

Returns:

Type Description
ParseEvent | None

ParseEvent for the error, or None if no pending error.

Source code in snakesee/parser/line_parser.py
def get_pending_error(self) -> ParseEvent | None:
    """Get the pending error event and clear it.

    Returns:
        ParseEvent for the error, or None if no pending error.
    """
    if self.pending_error_rule is None:
        return None

    # Strip "(check log file(s) for error details)" suffix from error blocks
    log_path = self.error_log_path
    if log_path and " (check log file" in log_path:
        log_path = log_path.split(" (check log file")[0].strip()

    event = ParseEvent(
        ParseEventType.ERROR,
        {
            "rule": self.pending_error_rule,
            "jobid": self.error_jobid,
            "wildcards": self.error_wildcards,
            "threads": self.error_threads,
            "log_path": log_path,
        },
    )
    # Clear pending error state
    self.pending_error_rule = None
    self.error_jobid = None
    self.error_wildcards = None
    self.error_threads = None
    self.error_log_path = None
    return event
has_pending_error
has_pending_error() -> bool

Check if there's a pending error to emit.

Source code in snakesee/parser/line_parser.py
def has_pending_error(self) -> bool:
    """Check if there's a pending error to emit."""
    return self.pending_error_rule is not None
reset_for_new_rule
reset_for_new_rule(rule: str) -> None

Reset context when entering a new rule block.

Parameters:

Name Type Description Default
rule str

Name of the new rule.

required
Source code in snakesee/parser/line_parser.py
def reset_for_new_rule(self, rule: str) -> None:
    """Reset context when entering a new rule block.

    Args:
        rule: Name of the new rule.
    """
    self.rule = rule
    self.jobid = None
    self.wildcards = None
    self.threads = None
    self.log_path = None
start_error_block
start_error_block(rule: str) -> None

Start tracking a pending error block.

Parameters:

Name Type Description Default
rule str

Name of the rule that errored.

required
Source code in snakesee/parser/line_parser.py
def start_error_block(self, rule: str) -> None:
    """Start tracking a pending error block.

    Args:
        rule: Name of the rule that errored.
    """
    self.pending_error_rule = rule
    # Initialize with current context if rule matches
    if self.rule == rule:
        self.error_jobid = self.jobid
        self.error_wildcards = self.wildcards
        self.error_threads = self.threads
        self.error_log_path = self.log_path
    else:
        self.error_jobid = None
        self.error_wildcards = None
        self.error_threads = None
        self.error_log_path = None

StartedJobData

Bases: TypedDict

Data for a job that has started but not finished.

Source code in snakesee/parser/job_tracker.py
class StartedJobData(TypedDict):
    """Data for a job that has started but not finished."""

    rule: str
    start_time: float | None
    wildcards: dict[str, str] | None
    threads: int | None

Functions

calculate_input_size

calculate_input_size(file_paths: list[Path]) -> int | None

Calculate total size of input files.

Parameters:

Name Type Description Default
file_paths list[Path]

List of input file paths.

required

Returns:

Type Description
int | None

Total size in bytes, or None if any file doesn't exist.

Source code in snakesee/parser/utils.py
def calculate_input_size(file_paths: list[Path]) -> int | None:
    """
    Calculate total size of input files.

    Args:
        file_paths: List of input file paths.

    Returns:
        Total size in bytes, or None if any file doesn't exist.
    """
    total_size = 0
    for path in file_paths:
        try:
            total_size += path.stat().st_size
        except OSError:
            return None  # File doesn't exist or can't be accessed
    return total_size if file_paths else None

collect_rule_code_hashes

collect_rule_code_hashes(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> dict[str, set[str]]

Collect code hashes for each rule from metadata files.

This enables detection of renamed rules by matching their shell code. If two rules have the same code hash, they are likely the same rule that was renamed.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None

Returns:

Type Description
dict[str, set[str]]

Dictionary mapping code_hash -> set of rule names that use that code.

Source code in snakesee/parser/metadata.py
def collect_rule_code_hashes(
    metadata_dir: Path,
    progress_callback: ProgressCallback | None = None,
) -> dict[str, set[str]]:
    """
    Collect code hashes for each rule from metadata files.

    This enables detection of renamed rules by matching their shell code.
    If two rules have the same code hash, they are likely the same rule
    that was renamed.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress reporting.

    Returns:
        Dictionary mapping code_hash -> set of rule names that use that code.
    """
    hash_to_rules: dict[str, set[str]] = {}

    if not metadata_dir.exists():
        return hash_to_rules

    # Use optimized iterate_metadata_files (6-7x faster than rglob)
    for _path, data in iterate_metadata_files(
        metadata_dir,
        progress_callback,
        sort_by_mtime=False,  # Order doesn't matter for code hash collection
        use_cache=False,  # We need to read code field which isn't cached
    ):
        rule = data.get("rule")
        code = data.get("code")

        if rule and code:
            # Normalize whitespace before hashing to handle formatting differences
            normalized_code = " ".join(code.split())
            code_hash = hashlib.sha256(normalized_code.encode()).hexdigest()[:16]

            if code_hash not in hash_to_rules:
                hash_to_rules[code_hash] = set()
            hash_to_rules[code_hash].add(rule)

    return hash_to_rules

collect_rule_timing_stats

collect_rule_timing_stats(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> dict[str, RuleTimingStats]

Collect historical timing statistics per rule from metadata.

Aggregates all completed job timings by rule name, sorted chronologically by end time. Includes timestamps for time-based weighted estimation. Input sizes are included when available from job metadata.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None

Returns:

Type Description
dict[str, RuleTimingStats]

Dictionary mapping rule names to their timing statistics.

Source code in snakesee/parser/stats.py
def collect_rule_timing_stats(
    metadata_dir: Path,
    progress_callback: ProgressCallback | None = None,
) -> dict[str, RuleTimingStats]:
    """
    Collect historical timing statistics per rule from metadata.

    Aggregates all completed job timings by rule name, sorted chronologically
    by end time. Includes timestamps for time-based weighted estimation.
    Input sizes are included when available from job metadata.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress reporting.

    Returns:
        Dictionary mapping rule names to their timing statistics.
    """
    # First, collect all jobs with their timing info
    # rule -> [(duration, end_time, input_size), ...]
    jobs_by_rule: dict[str, list[tuple[float, float, int | None]]] = {}

    for job in parse_metadata_files(metadata_dir, progress_callback=progress_callback):
        duration = job.duration
        end_time = job.end_time
        if duration is None or end_time is None:
            continue

        if job.rule not in jobs_by_rule:
            jobs_by_rule[job.rule] = []
        jobs_by_rule[job.rule].append((duration, end_time, job.input_size))

    # Build stats with sorted durations, timestamps, and input sizes
    stats: dict[str, RuleTimingStats] = {}
    for rule, timing_tuples in jobs_by_rule.items():
        # Sort by end_time (oldest first) for consistent ordering
        timing_tuples.sort(key=lambda x: x[1])

        durations = [t[0] for t in timing_tuples]
        timestamps = [t[1] for t in timing_tuples]
        input_sizes = [t[2] for t in timing_tuples]

        stats[rule] = RuleTimingStats(
            rule=rule,
            durations=durations,
            timestamps=timestamps,
            input_sizes=input_sizes,
        )

    return stats

collect_wildcard_timing_stats

collect_wildcard_timing_stats(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> dict[str, dict[str, WildcardTimingStats]]

Collect timing statistics per rule, conditioned on wildcards.

Groups execution times by (rule, wildcard_key, wildcard_value) for rules that have wildcards in their metadata.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None

Returns:

Type Description
dict[str, dict[str, WildcardTimingStats]]

Nested dictionary: rule -> wildcard_key -> WildcardTimingStats

Source code in snakesee/parser/stats.py
def collect_wildcard_timing_stats(
    metadata_dir: Path,
    progress_callback: ProgressCallback | None = None,
) -> dict[str, dict[str, WildcardTimingStats]]:
    """Collect timing statistics per rule, conditioned on wildcards.

    Groups execution times by (rule, wildcard_key, wildcard_value) for rules
    that have wildcards in their metadata.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress reporting.

    Returns:
        Nested dictionary: rule -> wildcard_key -> WildcardTimingStats
    """
    # Collect all jobs with wildcards
    # Structure: rule -> wildcard_key -> wildcard_value -> [(duration, end_time), ...]
    data: dict[str, dict[str, dict[str, list[tuple[float, float]]]]] = {}

    for job in parse_metadata_files(metadata_dir, progress_callback=progress_callback):
        duration = job.duration
        end_time = job.end_time
        if duration is None or end_time is None or not job.wildcards:
            continue

        if job.rule not in data:
            data[job.rule] = {}

        for wc_key, wc_value in job.wildcards.items():
            if wc_key not in data[job.rule]:
                data[job.rule][wc_key] = {}
            if wc_value not in data[job.rule][wc_key]:
                data[job.rule][wc_key][wc_value] = []

            data[job.rule][wc_key][wc_value].append((duration, end_time))

    # Build WildcardTimingStats objects
    result: dict[str, dict[str, WildcardTimingStats]] = {}
    for rule, wc_keys in data.items():
        result[rule] = {
            wc_key: _build_wildcard_stats_for_key(rule, wc_key, wc_values)
            for wc_key, wc_values in wc_keys.items()
        }

    return result

estimate_input_size_from_output

estimate_input_size_from_output(output_path: Path, workflow_dir: Path) -> int | None

Try to estimate input size by looking for related input files.

This is a heuristic that works for common bioinformatics patterns where output files are derived from inputs with predictable naming conventions.

Examples:

  • sample.sorted.bam -> sample.bam
  • sample.fastq.gz -> looks for sample.fq.gz, sample.fastq.gz
  • sample.vcf.gz -> sample.bam

Parameters:

Name Type Description Default
output_path Path

Path to the output file.

required
workflow_dir Path

Workflow root directory.

required

Returns:

Type Description
int | None

Estimated input size in bytes, or None if not determinable.

Source code in snakesee/parser/utils.py
def estimate_input_size_from_output(
    output_path: Path,
    workflow_dir: Path,
) -> int | None:
    """
    Try to estimate input size by looking for related input files.

    This is a heuristic that works for common bioinformatics patterns where
    output files are derived from inputs with predictable naming conventions.

    Examples:
        - sample.sorted.bam -> sample.bam
        - sample.fastq.gz -> looks for sample.fq.gz, sample.fastq.gz
        - sample.vcf.gz -> sample.bam

    Args:
        output_path: Path to the output file.
        workflow_dir: Workflow root directory.

    Returns:
        Estimated input size in bytes, or None if not determinable.
    """
    # Common input file patterns relative to output
    suffixes_to_strip = [
        ".sorted.bam",
        ".sorted",
        ".trimmed",
        ".filtered",
        ".dedup",
        ".aligned",
    ]

    name = output_path.name

    # Try stripping common suffixes to find input
    for suffix in suffixes_to_strip:
        if name.endswith(suffix):
            input_name = name[: -len(suffix)]
            # Try common extensions
            for ext in [".bam", ".fastq.gz", ".fq.gz", ".fa.gz", ".fasta.gz"]:
                candidate = workflow_dir / (input_name + ext)
                if candidate.exists():
                    try:
                        return candidate.stat().st_size
                    except OSError:
                        continue

    # No input found
    return None

is_workflow_running

is_workflow_running(snakemake_dir: Path, stale_threshold: float = STALE_WORKFLOW_THRESHOLD_SECONDS, backend: PersistenceBackend | None = None) -> bool

Check if a workflow is currently running.

Uses multiple signals: 1. Lock files exist in .snakemake/locks/ 2. Incomplete markers exist in .snakemake/incomplete/ (jobs in progress) 3. Log file was recently modified (within stale_threshold seconds)

If locks AND incomplete markers both exist, the workflow is definitely running (incomplete markers are created when jobs start and removed when they finish). Log freshness is only used as a fallback when there are no incomplete markers.

Parameters:

Name Type Description Default
snakemake_dir Path

Path to the .snakemake directory.

required
stale_threshold float

Seconds since last log modification before considering the workflow stale/dead. Default 1800 seconds (30 minutes).

STALE_WORKFLOW_THRESHOLD_SECONDS
backend PersistenceBackend | None

Optional persistence backend. If provided, uses it for lock and incomplete checks instead of filesystem.

None

Returns:

Type Description
bool

True if workflow appears to be actively running, False otherwise.

Source code in snakesee/parser/core.py
def is_workflow_running(
    snakemake_dir: Path,
    stale_threshold: float = STALE_WORKFLOW_THRESHOLD_SECONDS,
    backend: PersistenceBackend | None = None,
) -> bool:
    """
    Check if a workflow is currently running.

    Uses multiple signals:
    1. Lock files exist in .snakemake/locks/
    2. Incomplete markers exist in .snakemake/incomplete/ (jobs in progress)
    3. Log file was recently modified (within stale_threshold seconds)

    If locks AND incomplete markers both exist, the workflow is definitely running
    (incomplete markers are created when jobs start and removed when they finish).
    Log freshness is only used as a fallback when there are no incomplete markers.

    Args:
        snakemake_dir: Path to the .snakemake directory.
        stale_threshold: Seconds since last log modification before considering
            the workflow stale/dead. Default 1800 seconds (30 minutes).
        backend: Optional persistence backend. If provided, uses it for
            lock and incomplete checks instead of filesystem.

    Returns:
        True if workflow appears to be actively running, False otherwise.
    """
    from snakesee.state.clock import get_clock
    from snakesee.state.paths import WorkflowPaths

    # Use backend for lock/incomplete checks if provided
    if backend is not None:
        has_locks = backend.has_locks()
        if not has_locks:
            return False

        if backend.has_incomplete_jobs():
            return True

        # Fall back to log freshness check
        paths = WorkflowPaths(snakemake_dir.parent)
        log_file = paths.find_latest_log()
        if log_file is None:
            return True

        try:
            log_mtime = log_file.stat().st_mtime
            age = get_clock().now() - log_mtime
            return age < stale_threshold
        except OSError:
            return True

    # Original filesystem-based logic (unchanged below)
    locks_dir = snakemake_dir / "locks"
    if not locks_dir.exists():
        return False

    try:
        has_locks = any(locks_dir.iterdir())
    except OSError:
        return False

    if not has_locks:
        return False

    # If locks exist AND incomplete markers exist, workflow is definitely running
    # (incomplete markers are created when jobs start, removed when they finish)
    incomplete_dir = snakemake_dir / "incomplete"
    if incomplete_dir.exists():
        try:
            has_incomplete = any(incomplete_dir.iterdir())
            if has_incomplete:
                return True
        except OSError:
            pass

    # Fall back to log freshness check when no incomplete markers
    # snakemake_dir is .snakemake, so parent is workflow_dir
    paths = WorkflowPaths(snakemake_dir.parent)
    log_file = paths.find_latest_log()
    if log_file is None:
        # No log file but locks exist - assume running (early startup)
        return True

    try:
        log_mtime = log_file.stat().st_mtime
        age = get_clock().now() - log_mtime
        # If log hasn't been modified recently, workflow is likely dead
        return age < stale_threshold
    except OSError:
        # Can't stat log file - assume running to be safe
        return True

parse_all_jobs_from_log

parse_all_jobs_from_log(log_path: Path, *, _cached_lines: list[str] | None = None) -> list[JobInfo]

Parse all scheduled jobs from a snakemake log file.

Extracts job information (rule, wildcards, threads, jobid) from the log by parsing job blocks. This captures ALL jobs that snakemake scheduled, including those not yet started.

This is useful for getting pending jobs with their wildcards when the snakesee logger plugin is not available.

Parameters:

Name Type Description Default
log_path Path

Path to the snakemake log file.

required
_cached_lines list[str] | None

Pre-read log lines (internal optimization).

None

Returns:

Type Description
list[JobInfo]

List of JobInfo for all scheduled jobs.

Source code in snakesee/parser/core.py
def parse_all_jobs_from_log(
    log_path: Path,
    *,
    _cached_lines: list[str] | None = None,
) -> list[JobInfo]:
    """
    Parse all scheduled jobs from a snakemake log file.

    Extracts job information (rule, wildcards, threads, jobid) from the log
    by parsing job blocks. This captures ALL jobs that snakemake scheduled,
    including those not yet started.

    This is useful for getting pending jobs with their wildcards when the
    snakesee logger plugin is not available.

    Args:
        log_path: Path to the snakemake log file.
        _cached_lines: Pre-read log lines (internal optimization).

    Returns:
        List of JobInfo for all scheduled jobs.
    """
    all_jobs: list[JobInfo] = []
    seen_jobids: set[str] = set()

    current_rule: str | None = None
    current_jobid: str | None = None
    current_wildcards: dict[str, str] | None = None
    current_threads: int | None = None

    try:
        lines = _cached_lines if _cached_lines is not None else log_path.read_text().splitlines()
        for line in lines:
            # Track current rule being scheduled
            if match := RULE_START_PATTERN.match(line.lstrip()):
                # Save previous job if complete
                if current_rule is not None and current_jobid is not None:
                    if current_jobid not in seen_jobids:
                        seen_jobids.add(current_jobid)
                        all_jobs.append(
                            JobInfo(
                                rule=current_rule,
                                job_id=current_jobid,
                                wildcards=current_wildcards,
                                threads=current_threads,
                            )
                        )

                # Start new job block
                current_rule = match.group(1)
                current_jobid = None
                current_wildcards = None
                current_threads = None

            # Capture wildcards within rule block
            elif match := WILDCARDS_PATTERN.match(line):
                current_wildcards = _parse_wildcards(match.group(1))

            # Capture threads within rule block
            elif match := THREADS_PATTERN.match(line):
                current_threads = int(match.group(1))

            # Capture jobid within rule block
            elif match := JOBID_PATTERN.match(line):
                current_jobid = match.group(1)

        # Don't forget the last job
        if current_rule is not None and current_jobid is not None:
            if current_jobid not in seen_jobids:
                all_jobs.append(
                    JobInfo(
                        rule=current_rule,
                        job_id=current_jobid,
                        wildcards=current_wildcards,
                        threads=current_threads,
                    )
                )

    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)

    return all_jobs

parse_completed_jobs_from_log

parse_completed_jobs_from_log(log_path: Path, *, _cached_lines: list[str] | None = None) -> list[JobInfo]

Parse completed jobs with timing from a snakemake log file.

Extracts job start/end times from log timestamps to reconstruct job durations. This is useful for historical logs where metadata may have been overwritten by later runs.

Parameters:

Name Type Description Default
log_path Path

Path to the snakemake log file.

required
_cached_lines list[str] | None

Pre-read log lines (internal optimization).

None

Returns:

Type Description
list[JobInfo]

List of JobInfo for completed jobs with timing information.

Source code in snakesee/parser/core.py
def parse_completed_jobs_from_log(
    log_path: Path,
    *,
    _cached_lines: list[str] | None = None,
) -> list[JobInfo]:
    """
    Parse completed jobs with timing from a snakemake log file.

    Extracts job start/end times from log timestamps to reconstruct
    job durations. This is useful for historical logs where metadata
    may have been overwritten by later runs.

    Args:
        log_path: Path to the snakemake log file.
        _cached_lines: Pre-read log lines (internal optimization).

    Returns:
        List of JobInfo for completed jobs with timing information.
    """
    completed_jobs: list[JobInfo] = []

    # Track started jobs: jobid -> (rule, start_time, wildcards, threads)
    started_jobs: dict[str, tuple[str, float, dict[str, str] | None, int | None]] = {}
    # Job logs: jobid -> log_path (separate lookup by unique jobid)
    job_logs: dict[str, str] = {}
    current_rule: str | None = None
    current_timestamp: float | None = None
    current_wildcards: dict[str, str] | None = None
    current_threads: int | None = None
    current_jobid: str | None = None
    current_log_path: str | None = None

    try:
        lines = _cached_lines if _cached_lines is not None else log_path.read_text().splitlines()
        for line in lines:
            # Check for timestamp
            if match := TIMESTAMP_PATTERN.match(line.lstrip()):
                current_timestamp = _parse_timestamp(match.group(1))

            # Track current rule being executed
            elif match := RULE_START_PATTERN.match(line.lstrip()):
                current_rule = match.group(1)
                current_wildcards = None
                current_threads = None
                current_jobid = None
                current_log_path = None

            # Capture wildcards within rule block
            elif match := WILDCARDS_PATTERN.match(line):
                current_wildcards = _parse_wildcards(match.group(1))

            # Capture threads within rule block
            elif match := THREADS_PATTERN.match(line):
                current_threads = int(match.group(1))
                # Update already-stored job if threads comes after jobid
                if current_jobid and current_jobid in started_jobs:
                    rule, ts, wc, _ = started_jobs[current_jobid]
                    started_jobs[current_jobid] = (rule, ts, wc, current_threads)

            # Capture log path within rule block - store by jobid
            elif match := LOG_PATTERN.match(line):
                current_log_path = match.group(1).strip()
                if current_jobid:
                    job_logs[current_jobid] = current_log_path

            # Capture jobid within rule block
            elif match := JOBID_PATTERN.match(line):
                current_jobid = match.group(1)
                if current_rule is not None and current_timestamp is not None:
                    started_jobs[current_jobid] = (
                        current_rule,
                        current_timestamp,
                        current_wildcards,
                        current_threads,
                    )
                # Store log by jobid if we already captured it
                if current_log_path:
                    job_logs[current_jobid] = current_log_path

            # Track finished jobs
            elif match := FINISHED_JOB_PATTERN.search(line):
                jobid = match.group(1)
                if jobid in started_jobs and current_timestamp is not None:
                    rule, start_time, wildcards, threads = started_jobs[jobid]
                    # Look up log by jobid - unique within this run
                    job_log = job_logs.get(jobid)
                    completed_jobs.append(
                        JobInfo(
                            rule=rule,
                            job_id=jobid,
                            start_time=start_time,
                            end_time=current_timestamp,
                            wildcards=wildcards,
                            threads=threads,
                            log_file=Path(job_log) if job_log else None,
                        )
                    )

    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)

    return completed_jobs

parse_cores_from_log

parse_cores_from_log(log_path: Path) -> int | None

Parse the Provided cores: N line from a Snakemake log.

This line appears near the start of a run and reflects the -j/--cores value passed to Snakemake. Only the first occurrence is used.

Parameters:

Name Type Description Default
log_path Path

Path to the Snakemake log file.

required

Returns:

Type Description
int | None

The core count, or None if the line was not found.

Source code in snakesee/parser/core.py
def parse_cores_from_log(log_path: Path) -> int | None:
    """
    Parse the ``Provided cores: N`` line from a Snakemake log.

    This line appears near the start of a run and reflects the ``-j``/``--cores``
    value passed to Snakemake.  Only the first occurrence is used.

    Args:
        log_path: Path to the Snakemake log file.

    Returns:
        The core count, or ``None`` if the line was not found.
    """
    from snakesee.parser.patterns import CORES_PATTERN

    try:
        content = log_path.read_text(errors="ignore")
    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)
        return None

    match = CORES_PATTERN.search(content)
    if match is not None:
        return int(match.group(1))
    return None

parse_failed_jobs_from_log

parse_failed_jobs_from_log(log_path: Path, *, _cached_lines: list[str] | None = None) -> list[JobInfo]

Parse failed jobs from the snakemake log file.

Looks for "Error in rule X:" patterns and extracts the rule name and associated job ID when available. Useful for --keep-going workflows.

Parameters:

Name Type Description Default
log_path Path

Path to the snakemake log file.

required
_cached_lines list[str] | None

Pre-read log lines (internal optimization).

None

Returns:

Type Description
list[JobInfo]

List of JobInfo for jobs that failed.

Source code in snakesee/parser/core.py
def parse_failed_jobs_from_log(  # noqa: C901
    log_path: Path,
    *,
    _cached_lines: list[str] | None = None,
) -> list[JobInfo]:
    """
    Parse failed jobs from the snakemake log file.

    Looks for "Error in rule X:" patterns and extracts the rule name
    and associated job ID when available. Useful for --keep-going workflows.

    Args:
        log_path: Path to the snakemake log file.
        _cached_lines: Pre-read log lines (internal optimization).

    Returns:
        List of JobInfo for jobs that failed.
    """
    failed_jobs: list[JobInfo] = []
    seen_failures: set[tuple[str, str | None]] = set()  # (rule, jobid) pairs

    # Track context: rule, jobid, wildcards, threads, and log for each job block
    current_rule: str | None = None
    current_jobid: str | None = None
    current_wildcards: dict[str, str] | None = None
    current_threads: int | None = None
    current_log_path: str | None = None
    # Job logs: jobid -> log_path (separate lookup by unique jobid)
    job_logs: dict[str, str] = {}

    # Track pending error - we defer emission until the error block is fully parsed
    # because jobid and log come AFTER "Error in rule X:" in the error block
    pending_error_rule: str | None = None
    error_jobid: str | None = None
    error_wildcards: dict[str, str] | None = None
    error_threads: int | None = None
    error_log_path: str | None = None

    def emit_pending_error() -> None:
        """Emit the pending error if we have one."""
        nonlocal pending_error_rule, error_jobid, error_wildcards, error_threads, error_log_path
        if pending_error_rule is not None:
            # Look up log by jobid if not captured directly in error block
            job_log = error_log_path or (job_logs.get(error_jobid) if error_jobid else None)
            key = (pending_error_rule, error_jobid)
            if key not in seen_failures:
                seen_failures.add(key)
                failed_jobs.append(
                    JobInfo(
                        rule=pending_error_rule,
                        job_id=error_jobid,
                        wildcards=error_wildcards,
                        threads=error_threads,
                        log_file=Path(job_log) if job_log else None,
                    )
                )
            # Reset pending error state
            pending_error_rule = None
            error_jobid = None
            error_wildcards = None
            error_threads = None
            error_log_path = None

    try:
        lines = _cached_lines if _cached_lines is not None else log_path.read_text().splitlines()
        for line in lines:
            # Track current rule - this also ends any pending error block
            if match := RULE_START_PATTERN.match(line.lstrip()):
                emit_pending_error()
                current_rule = match.group(1)
                current_jobid = None
                current_wildcards = None
                current_threads = None
                current_log_path = None

            # Timestamp lines end error blocks
            elif TIMESTAMP_PATTERN.match(line.lstrip()):
                emit_pending_error()

            # Capture wildcards - applies to both rule blocks and error blocks
            elif match := WILDCARDS_PATTERN.match(line):
                wildcards = _parse_wildcards(match.group(1))
                current_wildcards = wildcards
                if pending_error_rule is not None:
                    error_wildcards = wildcards

            # Capture threads - applies to both rule blocks and error blocks
            elif match := THREADS_PATTERN.match(line):
                threads = int(match.group(1))
                current_threads = threads
                if pending_error_rule is not None:
                    error_threads = threads

            # Capture log path - applies to both rule blocks and error blocks
            elif match := LOG_PATTERN.match(line):
                # Strip "(check log file(s) for error details)" suffix from error blocks
                log_value = match.group(1).strip()
                if " (check log file" in log_value:
                    log_value = log_value.split(" (check log file")[0].strip()
                current_log_path = log_value
                if current_jobid:
                    job_logs[current_jobid] = log_value
                if pending_error_rule is not None:
                    error_log_path = log_value

            # Capture jobid - applies to both rule blocks and error blocks
            elif match := JOBID_PATTERN.match(line):
                jobid = match.group(1)
                current_jobid = jobid
                # Store log by jobid if we already captured it
                if current_log_path:
                    job_logs[jobid] = current_log_path
                if pending_error_rule is not None:
                    error_jobid = jobid

            # Detect errors - start a new pending error block
            elif match := ERROR_IN_RULE_PATTERN.search(line):
                # Emit any previous pending error first
                emit_pending_error()
                rule = match.group(1)
                # Start new pending error - we'll capture jobid/wildcards/threads/log
                # from subsequent lines in the error block
                pending_error_rule = rule
                # Initialize with context from the rule block if the rule matches
                if current_rule == rule:
                    error_jobid = current_jobid
                    error_wildcards = current_wildcards
                    error_threads = current_threads
                    error_log_path = current_log_path or (
                        job_logs.get(current_jobid) if current_jobid else None
                    )
                else:
                    error_jobid = None
                    error_wildcards = None
                    error_threads = None
                    error_log_path = None

        # Emit any final pending error at end of file
        emit_pending_error()

    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)

    return failed_jobs

parse_incomplete_jobs

parse_incomplete_jobs(incomplete_dir: Path, min_start_time: float | None = None) -> Iterator[JobInfo]

Parse currently running jobs from incomplete markers.

Snakemake creates marker files in .snakemake/incomplete/ for jobs that are in progress. The marker filename is base64-encoded output file path. The file modification time indicates when the job started.

Note: This is a fallback method. Prefer parse_running_jobs_from_log() which provides rule names.

Parameters:

Name Type Description Default
incomplete_dir Path

Path to .snakemake/incomplete/ directory.

required
min_start_time float | None

If provided, only yield markers with mtime >= this time. Used to filter out stale markers from previous workflow runs.

None

Yields:

Type Description
JobInfo

JobInfo instances for each in-progress job.

Source code in snakesee/parser/core.py
def parse_incomplete_jobs(
    incomplete_dir: Path, min_start_time: float | None = None
) -> Iterator[JobInfo]:
    """
    Parse currently running jobs from incomplete markers.

    Snakemake creates marker files in .snakemake/incomplete/ for jobs that
    are in progress. The marker filename is base64-encoded output file path.
    The file modification time indicates when the job started.

    Note: This is a fallback method. Prefer parse_running_jobs_from_log()
    which provides rule names.

    Args:
        incomplete_dir: Path to .snakemake/incomplete/ directory.
        min_start_time: If provided, only yield markers with mtime >= this time.
            Used to filter out stale markers from previous workflow runs.

    Yields:
        JobInfo instances for each in-progress job.
    """
    import base64

    if not incomplete_dir.exists():
        return

    for marker in incomplete_dir.rglob("*"):
        if marker.is_file() and marker.name != "migration_underway":
            try:
                marker_mtime = marker.stat().st_mtime

                # Skip markers that are older than the current workflow run
                if min_start_time is not None and marker_mtime < min_start_time:
                    continue

                # Decode the base64 filename to get the output file path
                output_file: Path | None = None
                try:
                    decoded = base64.b64decode(marker.name).decode("utf-8")
                    output_file = Path(decoded)
                except (ValueError, UnicodeDecodeError) as e:
                    logger.warning("Failed to decode base64 marker filename %s: %s", marker.name, e)

                # The marker's mtime is approximately when the job started
                yield JobInfo(
                    rule="unknown",  # Cannot determine rule from marker filename
                    start_time=marker_mtime,
                    output_file=output_file,
                )
            except OSError:
                continue

parse_job_stats_counts_from_log

parse_job_stats_counts_from_log(log_path: Path) -> dict[str, int]

Parse the 'Job stats' table from a Snakemake log to get rule -> job count mapping.

The job stats table appears at the start of a Snakemake run and lists all rules that will be executed along with their job counts. This function captures both the rule names AND their expected counts.

Parameters:

Name Type Description Default
log_path Path

Path to the Snakemake log file.

required

Returns:

Type Description
dict[str, int]

Dictionary mapping rule names to their expected job counts.

dict[str, int]

Returns empty dict if the table cannot be found or parsed.

Source code in snakesee/parser/core.py
def parse_job_stats_counts_from_log(log_path: Path) -> dict[str, int]:
    """
    Parse the 'Job stats' table from a Snakemake log to get rule -> job count mapping.

    The job stats table appears at the start of a Snakemake run and lists all
    rules that will be executed along with their job counts. This function
    captures both the rule names AND their expected counts.

    Args:
        log_path: Path to the Snakemake log file.

    Returns:
        Dictionary mapping rule names to their expected job counts.
        Returns empty dict if the table cannot be found or parsed.
    """
    counts: dict[str, int] = {}

    try:
        content = log_path.read_text(errors="ignore")
    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)
        return counts

    lines = content.splitlines()
    in_job_stats = False
    past_header = False

    for line in lines:
        # Look for the start of job stats table
        if line.strip() == "Job stats:":
            in_job_stats = True
            continue

        if not in_job_stats:
            continue

        stripped = line.strip()

        # Skip empty lines
        if not stripped:
            # Empty line after data means end of table
            if past_header and counts:
                break
            continue

        # Skip header line (job count)
        if stripped.startswith("job") and "count" in stripped:
            continue

        # Skip separator line (dashes)
        if stripped.startswith("-"):
            past_header = True
            continue

        # Parse data row: "rule_name    count"
        if past_header:
            parts = stripped.split()
            if len(parts) >= 2:
                rule_name = parts[0]
                # Skip 'total' row
                if rule_name != "total":
                    try:
                        count = int(parts[-1])
                        counts[rule_name] = count
                    except ValueError:
                        # Skip if count isn't a valid integer
                        pass

    return counts

parse_job_stats_from_log

parse_job_stats_from_log(log_path: Path) -> set[str]

Parse the 'Job stats' table from a Snakemake log to get the set of rules.

The job stats table appears at the start of a Snakemake run and lists all rules that will be executed along with their job counts.

Parameters:

Name Type Description Default
log_path Path

Path to the Snakemake log file.

required

Returns:

Type Description
set[str]

Set of rule names from the job stats table. Returns empty set if

set[str]

the table cannot be found or parsed.

Source code in snakesee/parser/core.py
def parse_job_stats_from_log(log_path: Path) -> set[str]:
    """
    Parse the 'Job stats' table from a Snakemake log to get the set of rules.

    The job stats table appears at the start of a Snakemake run and lists all
    rules that will be executed along with their job counts.

    Args:
        log_path: Path to the Snakemake log file.

    Returns:
        Set of rule names from the job stats table. Returns empty set if
        the table cannot be found or parsed.
    """
    rules: set[str] = set()

    try:
        content = log_path.read_text(errors="ignore")
    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)
        return rules

    lines = content.splitlines()
    in_job_stats = False
    past_header = False

    for line in lines:
        # Look for the start of job stats table
        if line.strip() == "Job stats:":
            in_job_stats = True
            continue

        if not in_job_stats:
            continue

        stripped = line.strip()

        # Skip empty lines
        if not stripped:
            # Empty line after data means end of table
            if past_header and rules:
                break
            continue

        # Skip header line (job count)
        if stripped.startswith("job") and "count" in stripped:
            continue

        # Skip separator line (dashes)
        if stripped.startswith("-"):
            past_header = True
            continue

        # Parse data row: "rule_name    count"
        if past_header:
            parts = stripped.split()
            if len(parts) >= 2:
                rule_name = parts[0]
                # Skip 'total' row
                if rule_name != "total":
                    rules.add(rule_name)

    return rules

parse_metadata_files

parse_metadata_files(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> Iterator[JobInfo]

Parse completed job information from Snakemake metadata files.

Reads JSON metadata files from .snakemake/metadata/ to extract timing information for completed jobs, including input file sizes.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None

Yields:

Type Description
JobInfo

JobInfo instances for each completed job found.

Source code in snakesee/parser/metadata.py
def parse_metadata_files(
    metadata_dir: Path,
    progress_callback: ProgressCallback | None = None,
) -> Iterator[JobInfo]:
    """
    Parse completed job information from Snakemake metadata files.

    Reads JSON metadata files from .snakemake/metadata/ to extract
    timing information for completed jobs, including input file sizes.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress reporting.

    Yields:
        JobInfo instances for each completed job found.
    """
    for _path, data in iterate_metadata_files(metadata_dir, progress_callback):
        rule = data.get("rule")
        starttime = data.get("starttime")
        endtime = data.get("endtime")

        if rule is not None and starttime is not None and endtime is not None:
            # Extract wildcards if present (Snakemake stores as dict)
            wildcards_data = data.get("wildcards")
            wildcards: dict[str, str] | None = None
            if isinstance(wildcards_data, dict):
                wildcards = {str(k): str(v) for k, v in wildcards_data.items()}

            yield JobInfo(
                rule=rule,
                start_time=starttime,
                end_time=endtime,
                wildcards=wildcards,
                input_size=calculate_metadata_input_size(data.get("input")),
            )

parse_metadata_files_full

parse_metadata_files_full(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> Iterator[MetadataRecord]

Parse all metadata from Snakemake metadata files in a single pass.

This is more efficient than calling parse_metadata_files and collect_rule_code_hashes separately, as it reads each file only once.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None

Yields:

Type Description
MetadataRecord

MetadataRecord instances containing timing and code hash data.

Source code in snakesee/parser/metadata.py
def parse_metadata_files_full(
    metadata_dir: Path,
    progress_callback: ProgressCallback | None = None,
) -> Iterator[MetadataRecord]:
    """
    Parse all metadata from Snakemake metadata files in a single pass.

    This is more efficient than calling parse_metadata_files and
    collect_rule_code_hashes separately, as it reads each file only once.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress reporting.

    Yields:
        MetadataRecord instances containing timing and code hash data.
    """
    for _path, data in iterate_metadata_files(metadata_dir, progress_callback):
        rule = data.get("rule")
        if rule is None:
            continue

        # Extract timing data
        starttime = data.get("starttime")
        endtime = data.get("endtime")

        # Extract wildcards if present
        wildcards_data = data.get("wildcards")
        wildcards: dict[str, str] | None = None
        if isinstance(wildcards_data, dict):
            wildcards = {str(k): str(v) for k, v in wildcards_data.items()}

        # Extract and hash code
        code_hash: str | None = None
        code = data.get("code")
        if code:
            normalized_code = " ".join(code.split())
            code_hash = hashlib.sha256(normalized_code.encode()).hexdigest()[:16]

        yield MetadataRecord(
            rule=rule,
            start_time=starttime,
            end_time=endtime,
            wildcards=wildcards,
            input_size=calculate_metadata_input_size(data.get("input")),
            code_hash=code_hash,
        )

parse_progress_from_log

parse_progress_from_log(log_path: Path, *, _cached_lines: list[str] | None = None) -> tuple[int, int]

Parse current progress from a snakemake log file.

Reads the log file and finds the most recent progress line.

Parameters:

Name Type Description Default
log_path Path

Path to the snakemake log file.

required
_cached_lines list[str] | None

Pre-read log lines (internal optimization).

None

Returns:

Type Description
tuple[int, int]

Tuple of (completed_count, total_count). Returns (0, 0) if no progress found.

Source code in snakesee/parser/core.py
def parse_progress_from_log(
    log_path: Path,
    *,
    _cached_lines: list[str] | None = None,
) -> tuple[int, int]:
    """
    Parse current progress from a snakemake log file.

    Reads the log file and finds the most recent progress line.

    Args:
        log_path: Path to the snakemake log file.
        _cached_lines: Pre-read log lines (internal optimization).

    Returns:
        Tuple of (completed_count, total_count). Returns (0, 0) if no progress found.
    """
    completed, total = 0, 0
    try:
        lines = _cached_lines if _cached_lines is not None else log_path.read_text().splitlines()
        for line in lines:
            if match := PROGRESS_PATTERN.search(line):
                completed = int(match.group(1))
                total = int(match.group(2))
    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)
    return completed, total

parse_rules_from_log

parse_rules_from_log(log_path: Path) -> dict[str, int]

Parse rule execution counts from a snakemake log file.

Counts how many times each rule appears to have completed based on log entries.

Parameters:

Name Type Description Default
log_path Path

Path to the snakemake log file.

required

Returns:

Type Description
dict[str, int]

Dictionary mapping rule names to completion counts.

Source code in snakesee/parser/core.py
def parse_rules_from_log(log_path: Path) -> dict[str, int]:
    """
    Parse rule execution counts from a snakemake log file.

    Counts how many times each rule appears to have completed based on log entries.

    Args:
        log_path: Path to the snakemake log file.

    Returns:
        Dictionary mapping rule names to completion counts.
    """
    rule_counts: dict[str, int] = {}
    current_rule: str | None = None
    job_rules: dict[str, str] = {}

    try:
        for line in log_path.read_text().splitlines():
            # Track current rule being executed
            if match := RULE_START_PATTERN.match(line.lstrip()):
                current_rule = match.group(1)
            # Map jobid to current rule
            elif (match := JOBID_PATTERN.match(line)) and current_rule is not None:
                job_rules[match.group(1)] = current_rule
            # Count finished jobs using jobid-to-rule mapping
            elif match := FINISHED_JOB_PATTERN.search(line):
                rule = job_rules.get(match.group(1), current_rule)
                if rule is not None:
                    rule_counts[rule] = rule_counts.get(rule, 0) + 1
    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)

    return rule_counts

parse_running_jobs_from_log

parse_running_jobs_from_log(log_path: Path, *, _cached_lines: list[str] | None = None) -> list[JobInfo]

Parse currently running jobs by analyzing the log file.

Tracks jobs that have started (rule + jobid) but not yet finished or failed.

Parameters:

Name Type Description Default
log_path Path

Path to the snakemake log file.

required
_cached_lines list[str] | None

Pre-read log lines (internal optimization).

None

Returns:

Type Description
list[JobInfo]

List of JobInfo for jobs that appear to be running.

Source code in snakesee/parser/core.py
def parse_running_jobs_from_log(  # noqa: C901
    log_path: Path,
    *,
    _cached_lines: list[str] | None = None,
) -> list[JobInfo]:
    """
    Parse currently running jobs by analyzing the log file.

    Tracks jobs that have started (rule + jobid) but not yet finished or failed.

    Args:
        log_path: Path to the snakemake log file.
        _cached_lines: Pre-read log lines (internal optimization).

    Returns:
        List of JobInfo for jobs that appear to be running.
    """
    # Track started jobs: jobid -> (rule, start_line_num, wildcards, threads)
    started_jobs: dict[str, tuple[str, int, dict[str, str] | None, int | None]] = {}
    # Job logs: jobid -> log_path (separate lookup by unique jobid)
    job_logs: dict[str, str] = {}
    finished_jobids: set[str] = set()
    failed_jobids: set[str] = set()

    current_rule: str | None = None
    current_jobid: str | None = None
    current_wildcards: dict[str, str] | None = None
    current_threads: int | None = None
    current_log_path: str | None = None

    # Track pending error to capture jobid from error block
    pending_error_rule: str | None = None
    error_jobid: str | None = None

    def record_pending_error() -> None:
        """Record the failed jobid from a pending error block."""
        nonlocal pending_error_rule, error_jobid
        if pending_error_rule is not None and error_jobid is not None:
            failed_jobids.add(error_jobid)
        pending_error_rule = None
        error_jobid = None

    try:
        lines = _cached_lines if _cached_lines is not None else log_path.read_text().splitlines()
        for line_num, line in enumerate(lines):
            # Track current rule being executed
            if match := RULE_START_PATTERN.match(line.lstrip()):
                record_pending_error()
                current_rule = match.group(1)
                current_jobid = None  # Reset jobid for new rule block
                current_wildcards = None
                current_threads = None
                current_log_path = None

            # Timestamp lines end error blocks
            elif TIMESTAMP_PATTERN.match(line.lstrip()):
                record_pending_error()

            # Capture wildcards within rule block
            elif match := WILDCARDS_PATTERN.match(line):
                current_wildcards = _parse_wildcards(match.group(1))

            # Capture threads within rule block
            elif match := THREADS_PATTERN.match(line):
                current_threads = int(match.group(1))
                # Update already-stored job if threads comes after jobid
                if current_jobid and current_jobid in started_jobs:
                    rule, ln, wc, _ = started_jobs[current_jobid]
                    started_jobs[current_jobid] = (rule, ln, wc, current_threads)

            # Capture log path within rule block - store by jobid
            elif match := LOG_PATTERN.match(line):
                current_log_path = match.group(1).strip()
                if current_jobid:
                    job_logs[current_jobid] = current_log_path

            # Capture jobid within rule block or error block
            elif match := JOBID_PATTERN.match(line):
                jobid = match.group(1)
                current_jobid = jobid
                if current_rule is not None and jobid not in started_jobs:
                    started_jobs[jobid] = (
                        current_rule,
                        line_num,
                        current_wildcards,
                        current_threads,
                    )
                # Store log by jobid if we already captured it
                if current_log_path:
                    job_logs[jobid] = current_log_path
                # If we're in an error block, capture this jobid as failed
                if pending_error_rule is not None:
                    error_jobid = jobid

            # Track finished jobs
            elif match := FINISHED_JOB_PATTERN.search(line):
                finished_jobids.add(match.group(1))

            # Track error blocks to capture failed jobids
            elif match := ERROR_IN_RULE_PATTERN.search(line):
                record_pending_error()
                pending_error_rule = match.group(1)
                # Initialize error_jobid from context if rule matches
                if current_rule == pending_error_rule:
                    error_jobid = current_jobid
                else:
                    error_jobid = None

        # Record any final pending error
        record_pending_error()

    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)
        return []

    # Jobs that started but haven't finished or failed are running
    running: list[JobInfo] = []
    # Use try/except to avoid TOCTOU race between exists() and stat()
    log_mtime: float | None = None
    try:
        log_mtime = log_path.stat().st_mtime
    except OSError:
        pass

    excluded_jobids = finished_jobids | failed_jobids
    for jobid, (rule, _line_num, wildcards, threads) in started_jobs.items():
        if jobid not in excluded_jobids:
            # Look up log by jobid - unique within this run
            job_log = job_logs.get(jobid)
            running.append(
                JobInfo(
                    rule=rule,
                    job_id=jobid,
                    start_time=log_mtime,  # Approximate; could improve with timestamps
                    wildcards=wildcards,
                    threads=threads,
                    log_file=Path(job_log) if job_log else None,
                )
            )

    return running

parse_threads_from_log

parse_threads_from_log(log_path: Path) -> dict[str, int]

Parse a jobid -> threads mapping from a snakemake log file.

This is used to augment metadata completions with thread info, since metadata files don't store the threads directive.

Parameters:

Name Type Description Default
log_path Path

Path to the snakemake log file.

required

Returns:

Type Description
dict[str, int]

Dictionary mapping job_id to thread count.

Source code in snakesee/parser/core.py
def parse_threads_from_log(log_path: Path) -> dict[str, int]:
    """
    Parse a jobid -> threads mapping from a snakemake log file.

    This is used to augment metadata completions with thread info,
    since metadata files don't store the threads directive.

    Args:
        log_path: Path to the snakemake log file.

    Returns:
        Dictionary mapping job_id to thread count.
    """
    threads_map: dict[str, int] = {}
    current_jobid: str | None = None
    current_threads: int | None = None

    try:
        for line in log_path.read_text().splitlines():
            # Track current rule (resets context)
            if RULE_START_PATTERN.match(line.lstrip()):
                current_jobid = None
                current_threads = None

            # Capture threads
            elif match := THREADS_PATTERN.match(line):
                current_threads = int(match.group(1))
                # Update already-stored job if threads comes after jobid
                if current_jobid and current_jobid not in threads_map:
                    threads_map[current_jobid] = current_threads

            # Capture jobid
            elif match := JOBID_PATTERN.match(line):
                current_jobid = match.group(1)
                if current_threads is not None and current_jobid not in threads_map:
                    threads_map[current_jobid] = current_threads

    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)

    return threads_map

parse_workflow_state

parse_workflow_state(workflow_dir: Path, log_file: Path | None = None, cutoff_time: float | None = None, log_reader: IncrementalLogReader | None = None) -> WorkflowProgress

Parse complete workflow state from .snakemake directory.

Combines information from log files, metadata, incomplete markers, and lock files to build a complete picture of workflow state.

Parameters:

Name Type Description Default
workflow_dir Path

Root directory containing .snakemake/.

required
log_file Path | None

Optional specific log file to parse. If None, uses the latest.

None
cutoff_time float | None

Optional upper bound for filtering completions (e.g., when the next log started). Used for "time machine" view of historical logs.

None
log_reader IncrementalLogReader | None

Optional incremental log reader for efficient polling. When provided, uses cached state instead of re-parsing the entire log file on each call.

None

Returns:

Type Description
WorkflowProgress

Current workflow state as a WorkflowProgress instance.

Source code in snakesee/parser/core.py
def parse_workflow_state(
    workflow_dir: Path,
    log_file: Path | None = None,
    cutoff_time: float | None = None,
    log_reader: IncrementalLogReader | None = None,
) -> WorkflowProgress:
    """
    Parse complete workflow state from .snakemake directory.

    Combines information from log files, metadata, incomplete markers,
    and lock files to build a complete picture of workflow state.

    Args:
        workflow_dir: Root directory containing .snakemake/.
        log_file: Optional specific log file to parse. If None, uses the latest.
        cutoff_time: Optional upper bound for filtering completions (e.g., when
                     the next log started). Used for "time machine" view of
                     historical logs.
        log_reader: Optional incremental log reader for efficient polling.
                    When provided, uses cached state instead of re-parsing
                    the entire log file on each call.

    Returns:
        Current workflow state as a WorkflowProgress instance.
    """
    from snakesee.state.paths import WorkflowPaths

    paths = WorkflowPaths(workflow_dir)

    # Auto-detect persistence backend (SQLite DB or filesystem)
    from snakesee.persistence import detect_backend

    backend = detect_backend(workflow_dir)

    snakemake_dir = paths.snakemake_dir

    # Use specified log file or find latest
    latest_log = paths.find_latest_log()
    log_path = log_file if log_file is not None else latest_log
    is_latest_log = log_file is None or log_file == latest_log

    # Determine status from lock files (only relevant for latest log)
    workflow_is_running = is_latest_log and is_workflow_running(snakemake_dir, backend=backend)
    if workflow_is_running:
        status = WorkflowStatus.RUNNING
    else:
        status = WorkflowStatus.COMPLETED

    # Update incremental reader if provided and log path matches/changes
    if log_reader is not None and log_path is not None:
        log_reader.set_log_path(log_path)
        log_reader.read_new_lines()

    # Cache log file lines to avoid repeated reads when not using log_reader
    # This is a significant performance optimization for large log files
    cached_lines: list[str] | None = None
    if log_reader is None and log_path is not None:
        try:
            cached_lines = log_path.read_text().splitlines()
        except OSError:
            cached_lines = []

    # Parse progress from log file (or use reader state)
    if log_reader is not None and log_path is not None:
        completed, total = log_reader.progress
    else:
        completed, total = (
            (0, 0)
            if log_path is None
            else parse_progress_from_log(log_path, _cached_lines=cached_lines)
        )

    # Get workflow start time for filtering incomplete markers
    # Prefer the first timestamp in the log (when jobs actually started) over file ctime
    start_time: float | None = None
    if log_path is not None:
        start_time = _get_first_log_timestamp(log_path, _cached_lines=cached_lines)
        if start_time is None:
            # Fall back to file ctime if no timestamps in log yet
            try:
                start_time = log_path.stat().st_ctime
            except OSError:
                pass

    # Parse completed jobs for recent completions
    metadata_dir = snakemake_dir / "metadata"
    all_completions = list(parse_metadata_files(metadata_dir))

    # Filter completions to the relevant timeframe
    completions = _collect_filtered_completions(all_completions, log_path, cutoff_time, log_reader)

    # Augment completions with threads from log (metadata doesn't have threads)
    if log_path is not None and completions:
        completions = _augment_completions_with_threads(
            completions, log_path, _cached_lines=cached_lines
        )

    completions.sort(key=lambda j: j.end_time or 0, reverse=True)

    # Parse running jobs from log file (provides rule names)
    running: list[JobInfo] = []
    failed_list: list[JobInfo] = []
    if log_path is not None:
        if log_reader is not None:
            running = log_reader.running_jobs
            failed_list = log_reader.failed_jobs
        else:
            running = parse_running_jobs_from_log(log_path, _cached_lines=cached_lines)
            failed_list = parse_failed_jobs_from_log(log_path, _cached_lines=cached_lines)

    # Check for incomplete jobs (from DB or filesystem markers)
    if is_latest_log:
        incomplete_list = [
            JobInfo(
                rule=ij.rule or "unknown",
                start_time=ij.start_time,
                output_file=ij.output_file,
            )
            for ij in backend.iterate_incomplete_jobs(min_start_time=start_time)
        ]
    else:
        incomplete_list = []

    # Reconcile running, failed, and incomplete job lists
    running, incomplete_list = _reconcile_job_lists(
        running, failed_list, incomplete_list, workflow_is_running
    )

    # Determine final status based on running jobs, failures, and incomplete markers
    status = _determine_final_workflow_status(
        running=running,
        failed_list=failed_list,
        incomplete_list=incomplete_list,
        completed=completed,
        total=total,
        is_latest_log=is_latest_log,
        workflow_is_running=workflow_is_running,
    )

    return WorkflowProgress(
        workflow_dir=workflow_dir,
        status=status,
        total_jobs=total,
        completed_jobs=completed,
        failed_jobs=len(failed_list),
        failed_jobs_list=failed_list,
        incomplete_jobs_list=incomplete_list,
        running_jobs=running,
        recent_completions=completions,
        start_time=start_time,
        log_file=log_path,
    )

Modules

core

Parsers for Snakemake log files and job tracking.

This module contains log file parsing functions for tracking running, completed, and failed jobs. Metadata parsing has been moved to snakesee.parser.metadata and statistics collection to snakesee.parser.stats.

Classes

MetadataRecord dataclass

Single metadata file parsed data for efficient single-pass collection.

Contains all fields needed by various collection functions so we only read each metadata file once.

Source code in snakesee/parser/metadata.py
@dataclass(frozen=True, slots=True)
class MetadataRecord:
    """Single metadata file parsed data for efficient single-pass collection.

    Contains all fields needed by various collection functions so we only
    read each metadata file once.
    """

    rule: str
    start_time: float | None = None
    end_time: float | None = None
    wildcards: dict[str, str] | None = None
    input_size: int | None = None
    code_hash: str | None = None

    @property
    def duration(self) -> float | None:
        """Calculate duration from start and end times."""
        if self.start_time is not None and self.end_time is not None:
            return self.end_time - self.start_time
        return None

    def to_job_info(self) -> JobInfo:
        """Convert to JobInfo for compatibility with existing code."""
        return JobInfo(
            rule=self.rule,
            start_time=self.start_time,
            end_time=self.end_time,
            wildcards=self.wildcards,
            input_size=self.input_size,
        )
Attributes
duration property
duration: float | None

Calculate duration from start and end times.

Functions
to_job_info
to_job_info() -> JobInfo

Convert to JobInfo for compatibility with existing code.

Source code in snakesee/parser/metadata.py
def to_job_info(self) -> JobInfo:
    """Convert to JobInfo for compatibility with existing code."""
    return JobInfo(
        rule=self.rule,
        start_time=self.start_time,
        end_time=self.end_time,
        wildcards=self.wildcards,
        input_size=self.input_size,
    )

Functions

calculate_input_size
calculate_input_size(file_paths: list[Path]) -> int | None

Calculate total size of input files.

Parameters:

Name Type Description Default
file_paths list[Path]

List of input file paths.

required

Returns:

Type Description
int | None

Total size in bytes, or None if any file doesn't exist.

Source code in snakesee/parser/utils.py
def calculate_input_size(file_paths: list[Path]) -> int | None:
    """
    Calculate total size of input files.

    Args:
        file_paths: List of input file paths.

    Returns:
        Total size in bytes, or None if any file doesn't exist.
    """
    total_size = 0
    for path in file_paths:
        try:
            total_size += path.stat().st_size
        except OSError:
            return None  # File doesn't exist or can't be accessed
    return total_size if file_paths else None
collect_rule_code_hashes
collect_rule_code_hashes(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> dict[str, set[str]]

Collect code hashes for each rule from metadata files.

This enables detection of renamed rules by matching their shell code. If two rules have the same code hash, they are likely the same rule that was renamed.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None

Returns:

Type Description
dict[str, set[str]]

Dictionary mapping code_hash -> set of rule names that use that code.

Source code in snakesee/parser/metadata.py
def collect_rule_code_hashes(
    metadata_dir: Path,
    progress_callback: ProgressCallback | None = None,
) -> dict[str, set[str]]:
    """
    Collect code hashes for each rule from metadata files.

    This enables detection of renamed rules by matching their shell code.
    If two rules have the same code hash, they are likely the same rule
    that was renamed.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress reporting.

    Returns:
        Dictionary mapping code_hash -> set of rule names that use that code.
    """
    hash_to_rules: dict[str, set[str]] = {}

    if not metadata_dir.exists():
        return hash_to_rules

    # Use optimized iterate_metadata_files (6-7x faster than rglob)
    for _path, data in iterate_metadata_files(
        metadata_dir,
        progress_callback,
        sort_by_mtime=False,  # Order doesn't matter for code hash collection
        use_cache=False,  # We need to read code field which isn't cached
    ):
        rule = data.get("rule")
        code = data.get("code")

        if rule and code:
            # Normalize whitespace before hashing to handle formatting differences
            normalized_code = " ".join(code.split())
            code_hash = hashlib.sha256(normalized_code.encode()).hexdigest()[:16]

            if code_hash not in hash_to_rules:
                hash_to_rules[code_hash] = set()
            hash_to_rules[code_hash].add(rule)

    return hash_to_rules
collect_rule_timing_stats
collect_rule_timing_stats(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> dict[str, RuleTimingStats]

Collect historical timing statistics per rule from metadata.

Aggregates all completed job timings by rule name, sorted chronologically by end time. Includes timestamps for time-based weighted estimation. Input sizes are included when available from job metadata.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None

Returns:

Type Description
dict[str, RuleTimingStats]

Dictionary mapping rule names to their timing statistics.

Source code in snakesee/parser/stats.py
def collect_rule_timing_stats(
    metadata_dir: Path,
    progress_callback: ProgressCallback | None = None,
) -> dict[str, RuleTimingStats]:
    """
    Collect historical timing statistics per rule from metadata.

    Aggregates all completed job timings by rule name, sorted chronologically
    by end time. Includes timestamps for time-based weighted estimation.
    Input sizes are included when available from job metadata.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress reporting.

    Returns:
        Dictionary mapping rule names to their timing statistics.
    """
    # First, collect all jobs with their timing info
    # rule -> [(duration, end_time, input_size), ...]
    jobs_by_rule: dict[str, list[tuple[float, float, int | None]]] = {}

    for job in parse_metadata_files(metadata_dir, progress_callback=progress_callback):
        duration = job.duration
        end_time = job.end_time
        if duration is None or end_time is None:
            continue

        if job.rule not in jobs_by_rule:
            jobs_by_rule[job.rule] = []
        jobs_by_rule[job.rule].append((duration, end_time, job.input_size))

    # Build stats with sorted durations, timestamps, and input sizes
    stats: dict[str, RuleTimingStats] = {}
    for rule, timing_tuples in jobs_by_rule.items():
        # Sort by end_time (oldest first) for consistent ordering
        timing_tuples.sort(key=lambda x: x[1])

        durations = [t[0] for t in timing_tuples]
        timestamps = [t[1] for t in timing_tuples]
        input_sizes = [t[2] for t in timing_tuples]

        stats[rule] = RuleTimingStats(
            rule=rule,
            durations=durations,
            timestamps=timestamps,
            input_sizes=input_sizes,
        )

    return stats
collect_wildcard_timing_stats
collect_wildcard_timing_stats(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> dict[str, dict[str, WildcardTimingStats]]

Collect timing statistics per rule, conditioned on wildcards.

Groups execution times by (rule, wildcard_key, wildcard_value) for rules that have wildcards in their metadata.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None

Returns:

Type Description
dict[str, dict[str, WildcardTimingStats]]

Nested dictionary: rule -> wildcard_key -> WildcardTimingStats

Source code in snakesee/parser/stats.py
def collect_wildcard_timing_stats(
    metadata_dir: Path,
    progress_callback: ProgressCallback | None = None,
) -> dict[str, dict[str, WildcardTimingStats]]:
    """Collect timing statistics per rule, conditioned on wildcards.

    Groups execution times by (rule, wildcard_key, wildcard_value) for rules
    that have wildcards in their metadata.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress reporting.

    Returns:
        Nested dictionary: rule -> wildcard_key -> WildcardTimingStats
    """
    # Collect all jobs with wildcards
    # Structure: rule -> wildcard_key -> wildcard_value -> [(duration, end_time), ...]
    data: dict[str, dict[str, dict[str, list[tuple[float, float]]]]] = {}

    for job in parse_metadata_files(metadata_dir, progress_callback=progress_callback):
        duration = job.duration
        end_time = job.end_time
        if duration is None or end_time is None or not job.wildcards:
            continue

        if job.rule not in data:
            data[job.rule] = {}

        for wc_key, wc_value in job.wildcards.items():
            if wc_key not in data[job.rule]:
                data[job.rule][wc_key] = {}
            if wc_value not in data[job.rule][wc_key]:
                data[job.rule][wc_key][wc_value] = []

            data[job.rule][wc_key][wc_value].append((duration, end_time))

    # Build WildcardTimingStats objects
    result: dict[str, dict[str, WildcardTimingStats]] = {}
    for rule, wc_keys in data.items():
        result[rule] = {
            wc_key: _build_wildcard_stats_for_key(rule, wc_key, wc_values)
            for wc_key, wc_values in wc_keys.items()
        }

    return result
estimate_input_size_from_output
estimate_input_size_from_output(output_path: Path, workflow_dir: Path) -> int | None

Try to estimate input size by looking for related input files.

This is a heuristic that works for common bioinformatics patterns where output files are derived from inputs with predictable naming conventions.

Examples:

  • sample.sorted.bam -> sample.bam
  • sample.fastq.gz -> looks for sample.fq.gz, sample.fastq.gz
  • sample.vcf.gz -> sample.bam

Parameters:

Name Type Description Default
output_path Path

Path to the output file.

required
workflow_dir Path

Workflow root directory.

required

Returns:

Type Description
int | None

Estimated input size in bytes, or None if not determinable.

Source code in snakesee/parser/utils.py
def estimate_input_size_from_output(
    output_path: Path,
    workflow_dir: Path,
) -> int | None:
    """
    Try to estimate input size by looking for related input files.

    This is a heuristic that works for common bioinformatics patterns where
    output files are derived from inputs with predictable naming conventions.

    Examples:
        - sample.sorted.bam -> sample.bam
        - sample.fastq.gz -> looks for sample.fq.gz, sample.fastq.gz
        - sample.vcf.gz -> sample.bam

    Args:
        output_path: Path to the output file.
        workflow_dir: Workflow root directory.

    Returns:
        Estimated input size in bytes, or None if not determinable.
    """
    # Common input file patterns relative to output
    suffixes_to_strip = [
        ".sorted.bam",
        ".sorted",
        ".trimmed",
        ".filtered",
        ".dedup",
        ".aligned",
    ]

    name = output_path.name

    # Try stripping common suffixes to find input
    for suffix in suffixes_to_strip:
        if name.endswith(suffix):
            input_name = name[: -len(suffix)]
            # Try common extensions
            for ext in [".bam", ".fastq.gz", ".fq.gz", ".fa.gz", ".fasta.gz"]:
                candidate = workflow_dir / (input_name + ext)
                if candidate.exists():
                    try:
                        return candidate.stat().st_size
                    except OSError:
                        continue

    # No input found
    return None
is_workflow_running
is_workflow_running(snakemake_dir: Path, stale_threshold: float = STALE_WORKFLOW_THRESHOLD_SECONDS, backend: PersistenceBackend | None = None) -> bool

Check if a workflow is currently running.

Uses multiple signals: 1. Lock files exist in .snakemake/locks/ 2. Incomplete markers exist in .snakemake/incomplete/ (jobs in progress) 3. Log file was recently modified (within stale_threshold seconds)

If locks AND incomplete markers both exist, the workflow is definitely running (incomplete markers are created when jobs start and removed when they finish). Log freshness is only used as a fallback when there are no incomplete markers.

Parameters:

Name Type Description Default
snakemake_dir Path

Path to the .snakemake directory.

required
stale_threshold float

Seconds since last log modification before considering the workflow stale/dead. Default 1800 seconds (30 minutes).

STALE_WORKFLOW_THRESHOLD_SECONDS
backend PersistenceBackend | None

Optional persistence backend. If provided, uses it for lock and incomplete checks instead of filesystem.

None

Returns:

Type Description
bool

True if workflow appears to be actively running, False otherwise.

Source code in snakesee/parser/core.py
def is_workflow_running(
    snakemake_dir: Path,
    stale_threshold: float = STALE_WORKFLOW_THRESHOLD_SECONDS,
    backend: PersistenceBackend | None = None,
) -> bool:
    """
    Check if a workflow is currently running.

    Uses multiple signals:
    1. Lock files exist in .snakemake/locks/
    2. Incomplete markers exist in .snakemake/incomplete/ (jobs in progress)
    3. Log file was recently modified (within stale_threshold seconds)

    If locks AND incomplete markers both exist, the workflow is definitely running
    (incomplete markers are created when jobs start and removed when they finish).
    Log freshness is only used as a fallback when there are no incomplete markers.

    Args:
        snakemake_dir: Path to the .snakemake directory.
        stale_threshold: Seconds since last log modification before considering
            the workflow stale/dead. Default 1800 seconds (30 minutes).
        backend: Optional persistence backend. If provided, uses it for
            lock and incomplete checks instead of filesystem.

    Returns:
        True if workflow appears to be actively running, False otherwise.
    """
    from snakesee.state.clock import get_clock
    from snakesee.state.paths import WorkflowPaths

    # Use backend for lock/incomplete checks if provided
    if backend is not None:
        has_locks = backend.has_locks()
        if not has_locks:
            return False

        if backend.has_incomplete_jobs():
            return True

        # Fall back to log freshness check
        paths = WorkflowPaths(snakemake_dir.parent)
        log_file = paths.find_latest_log()
        if log_file is None:
            return True

        try:
            log_mtime = log_file.stat().st_mtime
            age = get_clock().now() - log_mtime
            return age < stale_threshold
        except OSError:
            return True

    # Original filesystem-based logic (unchanged below)
    locks_dir = snakemake_dir / "locks"
    if not locks_dir.exists():
        return False

    try:
        has_locks = any(locks_dir.iterdir())
    except OSError:
        return False

    if not has_locks:
        return False

    # If locks exist AND incomplete markers exist, workflow is definitely running
    # (incomplete markers are created when jobs start, removed when they finish)
    incomplete_dir = snakemake_dir / "incomplete"
    if incomplete_dir.exists():
        try:
            has_incomplete = any(incomplete_dir.iterdir())
            if has_incomplete:
                return True
        except OSError:
            pass

    # Fall back to log freshness check when no incomplete markers
    # snakemake_dir is .snakemake, so parent is workflow_dir
    paths = WorkflowPaths(snakemake_dir.parent)
    log_file = paths.find_latest_log()
    if log_file is None:
        # No log file but locks exist - assume running (early startup)
        return True

    try:
        log_mtime = log_file.stat().st_mtime
        age = get_clock().now() - log_mtime
        # If log hasn't been modified recently, workflow is likely dead
        return age < stale_threshold
    except OSError:
        # Can't stat log file - assume running to be safe
        return True
parse_all_jobs_from_log
parse_all_jobs_from_log(log_path: Path, *, _cached_lines: list[str] | None = None) -> list[JobInfo]

Parse all scheduled jobs from a snakemake log file.

Extracts job information (rule, wildcards, threads, jobid) from the log by parsing job blocks. This captures ALL jobs that snakemake scheduled, including those not yet started.

This is useful for getting pending jobs with their wildcards when the snakesee logger plugin is not available.

Parameters:

Name Type Description Default
log_path Path

Path to the snakemake log file.

required
_cached_lines list[str] | None

Pre-read log lines (internal optimization).

None

Returns:

Type Description
list[JobInfo]

List of JobInfo for all scheduled jobs.

Source code in snakesee/parser/core.py
def parse_all_jobs_from_log(
    log_path: Path,
    *,
    _cached_lines: list[str] | None = None,
) -> list[JobInfo]:
    """
    Parse all scheduled jobs from a snakemake log file.

    Extracts job information (rule, wildcards, threads, jobid) from the log
    by parsing job blocks. This captures ALL jobs that snakemake scheduled,
    including those not yet started.

    This is useful for getting pending jobs with their wildcards when the
    snakesee logger plugin is not available.

    Args:
        log_path: Path to the snakemake log file.
        _cached_lines: Pre-read log lines (internal optimization).

    Returns:
        List of JobInfo for all scheduled jobs.
    """
    all_jobs: list[JobInfo] = []
    seen_jobids: set[str] = set()

    current_rule: str | None = None
    current_jobid: str | None = None
    current_wildcards: dict[str, str] | None = None
    current_threads: int | None = None

    try:
        lines = _cached_lines if _cached_lines is not None else log_path.read_text().splitlines()
        for line in lines:
            # Track current rule being scheduled
            if match := RULE_START_PATTERN.match(line.lstrip()):
                # Save previous job if complete
                if current_rule is not None and current_jobid is not None:
                    if current_jobid not in seen_jobids:
                        seen_jobids.add(current_jobid)
                        all_jobs.append(
                            JobInfo(
                                rule=current_rule,
                                job_id=current_jobid,
                                wildcards=current_wildcards,
                                threads=current_threads,
                            )
                        )

                # Start new job block
                current_rule = match.group(1)
                current_jobid = None
                current_wildcards = None
                current_threads = None

            # Capture wildcards within rule block
            elif match := WILDCARDS_PATTERN.match(line):
                current_wildcards = _parse_wildcards(match.group(1))

            # Capture threads within rule block
            elif match := THREADS_PATTERN.match(line):
                current_threads = int(match.group(1))

            # Capture jobid within rule block
            elif match := JOBID_PATTERN.match(line):
                current_jobid = match.group(1)

        # Don't forget the last job
        if current_rule is not None and current_jobid is not None:
            if current_jobid not in seen_jobids:
                all_jobs.append(
                    JobInfo(
                        rule=current_rule,
                        job_id=current_jobid,
                        wildcards=current_wildcards,
                        threads=current_threads,
                    )
                )

    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)

    return all_jobs
parse_completed_jobs_from_log
parse_completed_jobs_from_log(log_path: Path, *, _cached_lines: list[str] | None = None) -> list[JobInfo]

Parse completed jobs with timing from a snakemake log file.

Extracts job start/end times from log timestamps to reconstruct job durations. This is useful for historical logs where metadata may have been overwritten by later runs.

Parameters:

Name Type Description Default
log_path Path

Path to the snakemake log file.

required
_cached_lines list[str] | None

Pre-read log lines (internal optimization).

None

Returns:

Type Description
list[JobInfo]

List of JobInfo for completed jobs with timing information.

Source code in snakesee/parser/core.py
def parse_completed_jobs_from_log(
    log_path: Path,
    *,
    _cached_lines: list[str] | None = None,
) -> list[JobInfo]:
    """
    Parse completed jobs with timing from a snakemake log file.

    Extracts job start/end times from log timestamps to reconstruct
    job durations. This is useful for historical logs where metadata
    may have been overwritten by later runs.

    Args:
        log_path: Path to the snakemake log file.
        _cached_lines: Pre-read log lines (internal optimization).

    Returns:
        List of JobInfo for completed jobs with timing information.
    """
    completed_jobs: list[JobInfo] = []

    # Track started jobs: jobid -> (rule, start_time, wildcards, threads)
    started_jobs: dict[str, tuple[str, float, dict[str, str] | None, int | None]] = {}
    # Job logs: jobid -> log_path (separate lookup by unique jobid)
    job_logs: dict[str, str] = {}
    current_rule: str | None = None
    current_timestamp: float | None = None
    current_wildcards: dict[str, str] | None = None
    current_threads: int | None = None
    current_jobid: str | None = None
    current_log_path: str | None = None

    try:
        lines = _cached_lines if _cached_lines is not None else log_path.read_text().splitlines()
        for line in lines:
            # Check for timestamp
            if match := TIMESTAMP_PATTERN.match(line.lstrip()):
                current_timestamp = _parse_timestamp(match.group(1))

            # Track current rule being executed
            elif match := RULE_START_PATTERN.match(line.lstrip()):
                current_rule = match.group(1)
                current_wildcards = None
                current_threads = None
                current_jobid = None
                current_log_path = None

            # Capture wildcards within rule block
            elif match := WILDCARDS_PATTERN.match(line):
                current_wildcards = _parse_wildcards(match.group(1))

            # Capture threads within rule block
            elif match := THREADS_PATTERN.match(line):
                current_threads = int(match.group(1))
                # Update already-stored job if threads comes after jobid
                if current_jobid and current_jobid in started_jobs:
                    rule, ts, wc, _ = started_jobs[current_jobid]
                    started_jobs[current_jobid] = (rule, ts, wc, current_threads)

            # Capture log path within rule block - store by jobid
            elif match := LOG_PATTERN.match(line):
                current_log_path = match.group(1).strip()
                if current_jobid:
                    job_logs[current_jobid] = current_log_path

            # Capture jobid within rule block
            elif match := JOBID_PATTERN.match(line):
                current_jobid = match.group(1)
                if current_rule is not None and current_timestamp is not None:
                    started_jobs[current_jobid] = (
                        current_rule,
                        current_timestamp,
                        current_wildcards,
                        current_threads,
                    )
                # Store log by jobid if we already captured it
                if current_log_path:
                    job_logs[current_jobid] = current_log_path

            # Track finished jobs
            elif match := FINISHED_JOB_PATTERN.search(line):
                jobid = match.group(1)
                if jobid in started_jobs and current_timestamp is not None:
                    rule, start_time, wildcards, threads = started_jobs[jobid]
                    # Look up log by jobid - unique within this run
                    job_log = job_logs.get(jobid)
                    completed_jobs.append(
                        JobInfo(
                            rule=rule,
                            job_id=jobid,
                            start_time=start_time,
                            end_time=current_timestamp,
                            wildcards=wildcards,
                            threads=threads,
                            log_file=Path(job_log) if job_log else None,
                        )
                    )

    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)

    return completed_jobs
parse_cores_from_log
parse_cores_from_log(log_path: Path) -> int | None

Parse the Provided cores: N line from a Snakemake log.

This line appears near the start of a run and reflects the -j/--cores value passed to Snakemake. Only the first occurrence is used.

Parameters:

Name Type Description Default
log_path Path

Path to the Snakemake log file.

required

Returns:

Type Description
int | None

The core count, or None if the line was not found.

Source code in snakesee/parser/core.py
def parse_cores_from_log(log_path: Path) -> int | None:
    """
    Parse the ``Provided cores: N`` line from a Snakemake log.

    This line appears near the start of a run and reflects the ``-j``/``--cores``
    value passed to Snakemake.  Only the first occurrence is used.

    Args:
        log_path: Path to the Snakemake log file.

    Returns:
        The core count, or ``None`` if the line was not found.
    """
    from snakesee.parser.patterns import CORES_PATTERN

    try:
        content = log_path.read_text(errors="ignore")
    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)
        return None

    match = CORES_PATTERN.search(content)
    if match is not None:
        return int(match.group(1))
    return None
parse_failed_jobs_from_log
parse_failed_jobs_from_log(log_path: Path, *, _cached_lines: list[str] | None = None) -> list[JobInfo]

Parse failed jobs from the snakemake log file.

Looks for "Error in rule X:" patterns and extracts the rule name and associated job ID when available. Useful for --keep-going workflows.

Parameters:

Name Type Description Default
log_path Path

Path to the snakemake log file.

required
_cached_lines list[str] | None

Pre-read log lines (internal optimization).

None

Returns:

Type Description
list[JobInfo]

List of JobInfo for jobs that failed.

Source code in snakesee/parser/core.py
def parse_failed_jobs_from_log(  # noqa: C901
    log_path: Path,
    *,
    _cached_lines: list[str] | None = None,
) -> list[JobInfo]:
    """
    Parse failed jobs from the snakemake log file.

    Looks for "Error in rule X:" patterns and extracts the rule name
    and associated job ID when available. Useful for --keep-going workflows.

    Args:
        log_path: Path to the snakemake log file.
        _cached_lines: Pre-read log lines (internal optimization).

    Returns:
        List of JobInfo for jobs that failed.
    """
    failed_jobs: list[JobInfo] = []
    seen_failures: set[tuple[str, str | None]] = set()  # (rule, jobid) pairs

    # Track context: rule, jobid, wildcards, threads, and log for each job block
    current_rule: str | None = None
    current_jobid: str | None = None
    current_wildcards: dict[str, str] | None = None
    current_threads: int | None = None
    current_log_path: str | None = None
    # Job logs: jobid -> log_path (separate lookup by unique jobid)
    job_logs: dict[str, str] = {}

    # Track pending error - we defer emission until the error block is fully parsed
    # because jobid and log come AFTER "Error in rule X:" in the error block
    pending_error_rule: str | None = None
    error_jobid: str | None = None
    error_wildcards: dict[str, str] | None = None
    error_threads: int | None = None
    error_log_path: str | None = None

    def emit_pending_error() -> None:
        """Emit the pending error if we have one."""
        nonlocal pending_error_rule, error_jobid, error_wildcards, error_threads, error_log_path
        if pending_error_rule is not None:
            # Look up log by jobid if not captured directly in error block
            job_log = error_log_path or (job_logs.get(error_jobid) if error_jobid else None)
            key = (pending_error_rule, error_jobid)
            if key not in seen_failures:
                seen_failures.add(key)
                failed_jobs.append(
                    JobInfo(
                        rule=pending_error_rule,
                        job_id=error_jobid,
                        wildcards=error_wildcards,
                        threads=error_threads,
                        log_file=Path(job_log) if job_log else None,
                    )
                )
            # Reset pending error state
            pending_error_rule = None
            error_jobid = None
            error_wildcards = None
            error_threads = None
            error_log_path = None

    try:
        lines = _cached_lines if _cached_lines is not None else log_path.read_text().splitlines()
        for line in lines:
            # Track current rule - this also ends any pending error block
            if match := RULE_START_PATTERN.match(line.lstrip()):
                emit_pending_error()
                current_rule = match.group(1)
                current_jobid = None
                current_wildcards = None
                current_threads = None
                current_log_path = None

            # Timestamp lines end error blocks
            elif TIMESTAMP_PATTERN.match(line.lstrip()):
                emit_pending_error()

            # Capture wildcards - applies to both rule blocks and error blocks
            elif match := WILDCARDS_PATTERN.match(line):
                wildcards = _parse_wildcards(match.group(1))
                current_wildcards = wildcards
                if pending_error_rule is not None:
                    error_wildcards = wildcards

            # Capture threads - applies to both rule blocks and error blocks
            elif match := THREADS_PATTERN.match(line):
                threads = int(match.group(1))
                current_threads = threads
                if pending_error_rule is not None:
                    error_threads = threads

            # Capture log path - applies to both rule blocks and error blocks
            elif match := LOG_PATTERN.match(line):
                # Strip "(check log file(s) for error details)" suffix from error blocks
                log_value = match.group(1).strip()
                if " (check log file" in log_value:
                    log_value = log_value.split(" (check log file")[0].strip()
                current_log_path = log_value
                if current_jobid:
                    job_logs[current_jobid] = log_value
                if pending_error_rule is not None:
                    error_log_path = log_value

            # Capture jobid - applies to both rule blocks and error blocks
            elif match := JOBID_PATTERN.match(line):
                jobid = match.group(1)
                current_jobid = jobid
                # Store log by jobid if we already captured it
                if current_log_path:
                    job_logs[jobid] = current_log_path
                if pending_error_rule is not None:
                    error_jobid = jobid

            # Detect errors - start a new pending error block
            elif match := ERROR_IN_RULE_PATTERN.search(line):
                # Emit any previous pending error first
                emit_pending_error()
                rule = match.group(1)
                # Start new pending error - we'll capture jobid/wildcards/threads/log
                # from subsequent lines in the error block
                pending_error_rule = rule
                # Initialize with context from the rule block if the rule matches
                if current_rule == rule:
                    error_jobid = current_jobid
                    error_wildcards = current_wildcards
                    error_threads = current_threads
                    error_log_path = current_log_path or (
                        job_logs.get(current_jobid) if current_jobid else None
                    )
                else:
                    error_jobid = None
                    error_wildcards = None
                    error_threads = None
                    error_log_path = None

        # Emit any final pending error at end of file
        emit_pending_error()

    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)

    return failed_jobs
parse_incomplete_jobs
parse_incomplete_jobs(incomplete_dir: Path, min_start_time: float | None = None) -> Iterator[JobInfo]

Parse currently running jobs from incomplete markers.

Snakemake creates marker files in .snakemake/incomplete/ for jobs that are in progress. The marker filename is base64-encoded output file path. The file modification time indicates when the job started.

Note: This is a fallback method. Prefer parse_running_jobs_from_log() which provides rule names.

Parameters:

Name Type Description Default
incomplete_dir Path

Path to .snakemake/incomplete/ directory.

required
min_start_time float | None

If provided, only yield markers with mtime >= this time. Used to filter out stale markers from previous workflow runs.

None

Yields:

Type Description
JobInfo

JobInfo instances for each in-progress job.

Source code in snakesee/parser/core.py
def parse_incomplete_jobs(
    incomplete_dir: Path, min_start_time: float | None = None
) -> Iterator[JobInfo]:
    """
    Parse currently running jobs from incomplete markers.

    Snakemake creates marker files in .snakemake/incomplete/ for jobs that
    are in progress. The marker filename is base64-encoded output file path.
    The file modification time indicates when the job started.

    Note: This is a fallback method. Prefer parse_running_jobs_from_log()
    which provides rule names.

    Args:
        incomplete_dir: Path to .snakemake/incomplete/ directory.
        min_start_time: If provided, only yield markers with mtime >= this time.
            Used to filter out stale markers from previous workflow runs.

    Yields:
        JobInfo instances for each in-progress job.
    """
    import base64

    if not incomplete_dir.exists():
        return

    for marker in incomplete_dir.rglob("*"):
        if marker.is_file() and marker.name != "migration_underway":
            try:
                marker_mtime = marker.stat().st_mtime

                # Skip markers that are older than the current workflow run
                if min_start_time is not None and marker_mtime < min_start_time:
                    continue

                # Decode the base64 filename to get the output file path
                output_file: Path | None = None
                try:
                    decoded = base64.b64decode(marker.name).decode("utf-8")
                    output_file = Path(decoded)
                except (ValueError, UnicodeDecodeError) as e:
                    logger.warning("Failed to decode base64 marker filename %s: %s", marker.name, e)

                # The marker's mtime is approximately when the job started
                yield JobInfo(
                    rule="unknown",  # Cannot determine rule from marker filename
                    start_time=marker_mtime,
                    output_file=output_file,
                )
            except OSError:
                continue
parse_job_stats_counts_from_log
parse_job_stats_counts_from_log(log_path: Path) -> dict[str, int]

Parse the 'Job stats' table from a Snakemake log to get rule -> job count mapping.

The job stats table appears at the start of a Snakemake run and lists all rules that will be executed along with their job counts. This function captures both the rule names AND their expected counts.

Parameters:

Name Type Description Default
log_path Path

Path to the Snakemake log file.

required

Returns:

Type Description
dict[str, int]

Dictionary mapping rule names to their expected job counts.

dict[str, int]

Returns empty dict if the table cannot be found or parsed.

Source code in snakesee/parser/core.py
def parse_job_stats_counts_from_log(log_path: Path) -> dict[str, int]:
    """
    Parse the 'Job stats' table from a Snakemake log to get rule -> job count mapping.

    The job stats table appears at the start of a Snakemake run and lists all
    rules that will be executed along with their job counts. This function
    captures both the rule names AND their expected counts.

    Args:
        log_path: Path to the Snakemake log file.

    Returns:
        Dictionary mapping rule names to their expected job counts.
        Returns empty dict if the table cannot be found or parsed.
    """
    counts: dict[str, int] = {}

    try:
        content = log_path.read_text(errors="ignore")
    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)
        return counts

    lines = content.splitlines()
    in_job_stats = False
    past_header = False

    for line in lines:
        # Look for the start of job stats table
        if line.strip() == "Job stats:":
            in_job_stats = True
            continue

        if not in_job_stats:
            continue

        stripped = line.strip()

        # Skip empty lines
        if not stripped:
            # Empty line after data means end of table
            if past_header and counts:
                break
            continue

        # Skip header line (job count)
        if stripped.startswith("job") and "count" in stripped:
            continue

        # Skip separator line (dashes)
        if stripped.startswith("-"):
            past_header = True
            continue

        # Parse data row: "rule_name    count"
        if past_header:
            parts = stripped.split()
            if len(parts) >= 2:
                rule_name = parts[0]
                # Skip 'total' row
                if rule_name != "total":
                    try:
                        count = int(parts[-1])
                        counts[rule_name] = count
                    except ValueError:
                        # Skip if count isn't a valid integer
                        pass

    return counts
parse_job_stats_from_log
parse_job_stats_from_log(log_path: Path) -> set[str]

Parse the 'Job stats' table from a Snakemake log to get the set of rules.

The job stats table appears at the start of a Snakemake run and lists all rules that will be executed along with their job counts.

Parameters:

Name Type Description Default
log_path Path

Path to the Snakemake log file.

required

Returns:

Type Description
set[str]

Set of rule names from the job stats table. Returns empty set if

set[str]

the table cannot be found or parsed.

Source code in snakesee/parser/core.py
def parse_job_stats_from_log(log_path: Path) -> set[str]:
    """
    Parse the 'Job stats' table from a Snakemake log to get the set of rules.

    The job stats table appears at the start of a Snakemake run and lists all
    rules that will be executed along with their job counts.

    Args:
        log_path: Path to the Snakemake log file.

    Returns:
        Set of rule names from the job stats table. Returns empty set if
        the table cannot be found or parsed.
    """
    rules: set[str] = set()

    try:
        content = log_path.read_text(errors="ignore")
    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)
        return rules

    lines = content.splitlines()
    in_job_stats = False
    past_header = False

    for line in lines:
        # Look for the start of job stats table
        if line.strip() == "Job stats:":
            in_job_stats = True
            continue

        if not in_job_stats:
            continue

        stripped = line.strip()

        # Skip empty lines
        if not stripped:
            # Empty line after data means end of table
            if past_header and rules:
                break
            continue

        # Skip header line (job count)
        if stripped.startswith("job") and "count" in stripped:
            continue

        # Skip separator line (dashes)
        if stripped.startswith("-"):
            past_header = True
            continue

        # Parse data row: "rule_name    count"
        if past_header:
            parts = stripped.split()
            if len(parts) >= 2:
                rule_name = parts[0]
                # Skip 'total' row
                if rule_name != "total":
                    rules.add(rule_name)

    return rules
parse_metadata_files
parse_metadata_files(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> Iterator[JobInfo]

Parse completed job information from Snakemake metadata files.

Reads JSON metadata files from .snakemake/metadata/ to extract timing information for completed jobs, including input file sizes.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None

Yields:

Type Description
JobInfo

JobInfo instances for each completed job found.

Source code in snakesee/parser/metadata.py
def parse_metadata_files(
    metadata_dir: Path,
    progress_callback: ProgressCallback | None = None,
) -> Iterator[JobInfo]:
    """
    Parse completed job information from Snakemake metadata files.

    Reads JSON metadata files from .snakemake/metadata/ to extract
    timing information for completed jobs, including input file sizes.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress reporting.

    Yields:
        JobInfo instances for each completed job found.
    """
    for _path, data in iterate_metadata_files(metadata_dir, progress_callback):
        rule = data.get("rule")
        starttime = data.get("starttime")
        endtime = data.get("endtime")

        if rule is not None and starttime is not None and endtime is not None:
            # Extract wildcards if present (Snakemake stores as dict)
            wildcards_data = data.get("wildcards")
            wildcards: dict[str, str] | None = None
            if isinstance(wildcards_data, dict):
                wildcards = {str(k): str(v) for k, v in wildcards_data.items()}

            yield JobInfo(
                rule=rule,
                start_time=starttime,
                end_time=endtime,
                wildcards=wildcards,
                input_size=calculate_metadata_input_size(data.get("input")),
            )
parse_metadata_files_full
parse_metadata_files_full(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> Iterator[MetadataRecord]

Parse all metadata from Snakemake metadata files in a single pass.

This is more efficient than calling parse_metadata_files and collect_rule_code_hashes separately, as it reads each file only once.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None

Yields:

Type Description
MetadataRecord

MetadataRecord instances containing timing and code hash data.

Source code in snakesee/parser/metadata.py
def parse_metadata_files_full(
    metadata_dir: Path,
    progress_callback: ProgressCallback | None = None,
) -> Iterator[MetadataRecord]:
    """
    Parse all metadata from Snakemake metadata files in a single pass.

    This is more efficient than calling parse_metadata_files and
    collect_rule_code_hashes separately, as it reads each file only once.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress reporting.

    Yields:
        MetadataRecord instances containing timing and code hash data.
    """
    for _path, data in iterate_metadata_files(metadata_dir, progress_callback):
        rule = data.get("rule")
        if rule is None:
            continue

        # Extract timing data
        starttime = data.get("starttime")
        endtime = data.get("endtime")

        # Extract wildcards if present
        wildcards_data = data.get("wildcards")
        wildcards: dict[str, str] | None = None
        if isinstance(wildcards_data, dict):
            wildcards = {str(k): str(v) for k, v in wildcards_data.items()}

        # Extract and hash code
        code_hash: str | None = None
        code = data.get("code")
        if code:
            normalized_code = " ".join(code.split())
            code_hash = hashlib.sha256(normalized_code.encode()).hexdigest()[:16]

        yield MetadataRecord(
            rule=rule,
            start_time=starttime,
            end_time=endtime,
            wildcards=wildcards,
            input_size=calculate_metadata_input_size(data.get("input")),
            code_hash=code_hash,
        )
parse_progress_from_log
parse_progress_from_log(log_path: Path, *, _cached_lines: list[str] | None = None) -> tuple[int, int]

Parse current progress from a snakemake log file.

Reads the log file and finds the most recent progress line.

Parameters:

Name Type Description Default
log_path Path

Path to the snakemake log file.

required
_cached_lines list[str] | None

Pre-read log lines (internal optimization).

None

Returns:

Type Description
tuple[int, int]

Tuple of (completed_count, total_count). Returns (0, 0) if no progress found.

Source code in snakesee/parser/core.py
def parse_progress_from_log(
    log_path: Path,
    *,
    _cached_lines: list[str] | None = None,
) -> tuple[int, int]:
    """
    Parse current progress from a snakemake log file.

    Reads the log file and finds the most recent progress line.

    Args:
        log_path: Path to the snakemake log file.
        _cached_lines: Pre-read log lines (internal optimization).

    Returns:
        Tuple of (completed_count, total_count). Returns (0, 0) if no progress found.
    """
    completed, total = 0, 0
    try:
        lines = _cached_lines if _cached_lines is not None else log_path.read_text().splitlines()
        for line in lines:
            if match := PROGRESS_PATTERN.search(line):
                completed = int(match.group(1))
                total = int(match.group(2))
    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)
    return completed, total
parse_rules_from_log
parse_rules_from_log(log_path: Path) -> dict[str, int]

Parse rule execution counts from a snakemake log file.

Counts how many times each rule appears to have completed based on log entries.

Parameters:

Name Type Description Default
log_path Path

Path to the snakemake log file.

required

Returns:

Type Description
dict[str, int]

Dictionary mapping rule names to completion counts.

Source code in snakesee/parser/core.py
def parse_rules_from_log(log_path: Path) -> dict[str, int]:
    """
    Parse rule execution counts from a snakemake log file.

    Counts how many times each rule appears to have completed based on log entries.

    Args:
        log_path: Path to the snakemake log file.

    Returns:
        Dictionary mapping rule names to completion counts.
    """
    rule_counts: dict[str, int] = {}
    current_rule: str | None = None
    job_rules: dict[str, str] = {}

    try:
        for line in log_path.read_text().splitlines():
            # Track current rule being executed
            if match := RULE_START_PATTERN.match(line.lstrip()):
                current_rule = match.group(1)
            # Map jobid to current rule
            elif (match := JOBID_PATTERN.match(line)) and current_rule is not None:
                job_rules[match.group(1)] = current_rule
            # Count finished jobs using jobid-to-rule mapping
            elif match := FINISHED_JOB_PATTERN.search(line):
                rule = job_rules.get(match.group(1), current_rule)
                if rule is not None:
                    rule_counts[rule] = rule_counts.get(rule, 0) + 1
    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)

    return rule_counts
parse_running_jobs_from_log
parse_running_jobs_from_log(log_path: Path, *, _cached_lines: list[str] | None = None) -> list[JobInfo]

Parse currently running jobs by analyzing the log file.

Tracks jobs that have started (rule + jobid) but not yet finished or failed.

Parameters:

Name Type Description Default
log_path Path

Path to the snakemake log file.

required
_cached_lines list[str] | None

Pre-read log lines (internal optimization).

None

Returns:

Type Description
list[JobInfo]

List of JobInfo for jobs that appear to be running.

Source code in snakesee/parser/core.py
def parse_running_jobs_from_log(  # noqa: C901
    log_path: Path,
    *,
    _cached_lines: list[str] | None = None,
) -> list[JobInfo]:
    """
    Parse currently running jobs by analyzing the log file.

    Tracks jobs that have started (rule + jobid) but not yet finished or failed.

    Args:
        log_path: Path to the snakemake log file.
        _cached_lines: Pre-read log lines (internal optimization).

    Returns:
        List of JobInfo for jobs that appear to be running.
    """
    # Track started jobs: jobid -> (rule, start_line_num, wildcards, threads)
    started_jobs: dict[str, tuple[str, int, dict[str, str] | None, int | None]] = {}
    # Job logs: jobid -> log_path (separate lookup by unique jobid)
    job_logs: dict[str, str] = {}
    finished_jobids: set[str] = set()
    failed_jobids: set[str] = set()

    current_rule: str | None = None
    current_jobid: str | None = None
    current_wildcards: dict[str, str] | None = None
    current_threads: int | None = None
    current_log_path: str | None = None

    # Track pending error to capture jobid from error block
    pending_error_rule: str | None = None
    error_jobid: str | None = None

    def record_pending_error() -> None:
        """Record the failed jobid from a pending error block."""
        nonlocal pending_error_rule, error_jobid
        if pending_error_rule is not None and error_jobid is not None:
            failed_jobids.add(error_jobid)
        pending_error_rule = None
        error_jobid = None

    try:
        lines = _cached_lines if _cached_lines is not None else log_path.read_text().splitlines()
        for line_num, line in enumerate(lines):
            # Track current rule being executed
            if match := RULE_START_PATTERN.match(line.lstrip()):
                record_pending_error()
                current_rule = match.group(1)
                current_jobid = None  # Reset jobid for new rule block
                current_wildcards = None
                current_threads = None
                current_log_path = None

            # Timestamp lines end error blocks
            elif TIMESTAMP_PATTERN.match(line.lstrip()):
                record_pending_error()

            # Capture wildcards within rule block
            elif match := WILDCARDS_PATTERN.match(line):
                current_wildcards = _parse_wildcards(match.group(1))

            # Capture threads within rule block
            elif match := THREADS_PATTERN.match(line):
                current_threads = int(match.group(1))
                # Update already-stored job if threads comes after jobid
                if current_jobid and current_jobid in started_jobs:
                    rule, ln, wc, _ = started_jobs[current_jobid]
                    started_jobs[current_jobid] = (rule, ln, wc, current_threads)

            # Capture log path within rule block - store by jobid
            elif match := LOG_PATTERN.match(line):
                current_log_path = match.group(1).strip()
                if current_jobid:
                    job_logs[current_jobid] = current_log_path

            # Capture jobid within rule block or error block
            elif match := JOBID_PATTERN.match(line):
                jobid = match.group(1)
                current_jobid = jobid
                if current_rule is not None and jobid not in started_jobs:
                    started_jobs[jobid] = (
                        current_rule,
                        line_num,
                        current_wildcards,
                        current_threads,
                    )
                # Store log by jobid if we already captured it
                if current_log_path:
                    job_logs[jobid] = current_log_path
                # If we're in an error block, capture this jobid as failed
                if pending_error_rule is not None:
                    error_jobid = jobid

            # Track finished jobs
            elif match := FINISHED_JOB_PATTERN.search(line):
                finished_jobids.add(match.group(1))

            # Track error blocks to capture failed jobids
            elif match := ERROR_IN_RULE_PATTERN.search(line):
                record_pending_error()
                pending_error_rule = match.group(1)
                # Initialize error_jobid from context if rule matches
                if current_rule == pending_error_rule:
                    error_jobid = current_jobid
                else:
                    error_jobid = None

        # Record any final pending error
        record_pending_error()

    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)
        return []

    # Jobs that started but haven't finished or failed are running
    running: list[JobInfo] = []
    # Use try/except to avoid TOCTOU race between exists() and stat()
    log_mtime: float | None = None
    try:
        log_mtime = log_path.stat().st_mtime
    except OSError:
        pass

    excluded_jobids = finished_jobids | failed_jobids
    for jobid, (rule, _line_num, wildcards, threads) in started_jobs.items():
        if jobid not in excluded_jobids:
            # Look up log by jobid - unique within this run
            job_log = job_logs.get(jobid)
            running.append(
                JobInfo(
                    rule=rule,
                    job_id=jobid,
                    start_time=log_mtime,  # Approximate; could improve with timestamps
                    wildcards=wildcards,
                    threads=threads,
                    log_file=Path(job_log) if job_log else None,
                )
            )

    return running
parse_threads_from_log
parse_threads_from_log(log_path: Path) -> dict[str, int]

Parse a jobid -> threads mapping from a snakemake log file.

This is used to augment metadata completions with thread info, since metadata files don't store the threads directive.

Parameters:

Name Type Description Default
log_path Path

Path to the snakemake log file.

required

Returns:

Type Description
dict[str, int]

Dictionary mapping job_id to thread count.

Source code in snakesee/parser/core.py
def parse_threads_from_log(log_path: Path) -> dict[str, int]:
    """
    Parse a jobid -> threads mapping from a snakemake log file.

    This is used to augment metadata completions with thread info,
    since metadata files don't store the threads directive.

    Args:
        log_path: Path to the snakemake log file.

    Returns:
        Dictionary mapping job_id to thread count.
    """
    threads_map: dict[str, int] = {}
    current_jobid: str | None = None
    current_threads: int | None = None

    try:
        for line in log_path.read_text().splitlines():
            # Track current rule (resets context)
            if RULE_START_PATTERN.match(line.lstrip()):
                current_jobid = None
                current_threads = None

            # Capture threads
            elif match := THREADS_PATTERN.match(line):
                current_threads = int(match.group(1))
                # Update already-stored job if threads comes after jobid
                if current_jobid and current_jobid not in threads_map:
                    threads_map[current_jobid] = current_threads

            # Capture jobid
            elif match := JOBID_PATTERN.match(line):
                current_jobid = match.group(1)
                if current_threads is not None and current_jobid not in threads_map:
                    threads_map[current_jobid] = current_threads

    except OSError as e:
        logger.info("Could not read log file %s: %s", log_path, e)

    return threads_map
parse_workflow_state
parse_workflow_state(workflow_dir: Path, log_file: Path | None = None, cutoff_time: float | None = None, log_reader: IncrementalLogReader | None = None) -> WorkflowProgress

Parse complete workflow state from .snakemake directory.

Combines information from log files, metadata, incomplete markers, and lock files to build a complete picture of workflow state.

Parameters:

Name Type Description Default
workflow_dir Path

Root directory containing .snakemake/.

required
log_file Path | None

Optional specific log file to parse. If None, uses the latest.

None
cutoff_time float | None

Optional upper bound for filtering completions (e.g., when the next log started). Used for "time machine" view of historical logs.

None
log_reader IncrementalLogReader | None

Optional incremental log reader for efficient polling. When provided, uses cached state instead of re-parsing the entire log file on each call.

None

Returns:

Type Description
WorkflowProgress

Current workflow state as a WorkflowProgress instance.

Source code in snakesee/parser/core.py
def parse_workflow_state(
    workflow_dir: Path,
    log_file: Path | None = None,
    cutoff_time: float | None = None,
    log_reader: IncrementalLogReader | None = None,
) -> WorkflowProgress:
    """
    Parse complete workflow state from .snakemake directory.

    Combines information from log files, metadata, incomplete markers,
    and lock files to build a complete picture of workflow state.

    Args:
        workflow_dir: Root directory containing .snakemake/.
        log_file: Optional specific log file to parse. If None, uses the latest.
        cutoff_time: Optional upper bound for filtering completions (e.g., when
                     the next log started). Used for "time machine" view of
                     historical logs.
        log_reader: Optional incremental log reader for efficient polling.
                    When provided, uses cached state instead of re-parsing
                    the entire log file on each call.

    Returns:
        Current workflow state as a WorkflowProgress instance.
    """
    from snakesee.state.paths import WorkflowPaths

    paths = WorkflowPaths(workflow_dir)

    # Auto-detect persistence backend (SQLite DB or filesystem)
    from snakesee.persistence import detect_backend

    backend = detect_backend(workflow_dir)

    snakemake_dir = paths.snakemake_dir

    # Use specified log file or find latest
    latest_log = paths.find_latest_log()
    log_path = log_file if log_file is not None else latest_log
    is_latest_log = log_file is None or log_file == latest_log

    # Determine status from lock files (only relevant for latest log)
    workflow_is_running = is_latest_log and is_workflow_running(snakemake_dir, backend=backend)
    if workflow_is_running:
        status = WorkflowStatus.RUNNING
    else:
        status = WorkflowStatus.COMPLETED

    # Update incremental reader if provided and log path matches/changes
    if log_reader is not None and log_path is not None:
        log_reader.set_log_path(log_path)
        log_reader.read_new_lines()

    # Cache log file lines to avoid repeated reads when not using log_reader
    # This is a significant performance optimization for large log files
    cached_lines: list[str] | None = None
    if log_reader is None and log_path is not None:
        try:
            cached_lines = log_path.read_text().splitlines()
        except OSError:
            cached_lines = []

    # Parse progress from log file (or use reader state)
    if log_reader is not None and log_path is not None:
        completed, total = log_reader.progress
    else:
        completed, total = (
            (0, 0)
            if log_path is None
            else parse_progress_from_log(log_path, _cached_lines=cached_lines)
        )

    # Get workflow start time for filtering incomplete markers
    # Prefer the first timestamp in the log (when jobs actually started) over file ctime
    start_time: float | None = None
    if log_path is not None:
        start_time = _get_first_log_timestamp(log_path, _cached_lines=cached_lines)
        if start_time is None:
            # Fall back to file ctime if no timestamps in log yet
            try:
                start_time = log_path.stat().st_ctime
            except OSError:
                pass

    # Parse completed jobs for recent completions
    metadata_dir = snakemake_dir / "metadata"
    all_completions = list(parse_metadata_files(metadata_dir))

    # Filter completions to the relevant timeframe
    completions = _collect_filtered_completions(all_completions, log_path, cutoff_time, log_reader)

    # Augment completions with threads from log (metadata doesn't have threads)
    if log_path is not None and completions:
        completions = _augment_completions_with_threads(
            completions, log_path, _cached_lines=cached_lines
        )

    completions.sort(key=lambda j: j.end_time or 0, reverse=True)

    # Parse running jobs from log file (provides rule names)
    running: list[JobInfo] = []
    failed_list: list[JobInfo] = []
    if log_path is not None:
        if log_reader is not None:
            running = log_reader.running_jobs
            failed_list = log_reader.failed_jobs
        else:
            running = parse_running_jobs_from_log(log_path, _cached_lines=cached_lines)
            failed_list = parse_failed_jobs_from_log(log_path, _cached_lines=cached_lines)

    # Check for incomplete jobs (from DB or filesystem markers)
    if is_latest_log:
        incomplete_list = [
            JobInfo(
                rule=ij.rule or "unknown",
                start_time=ij.start_time,
                output_file=ij.output_file,
            )
            for ij in backend.iterate_incomplete_jobs(min_start_time=start_time)
        ]
    else:
        incomplete_list = []

    # Reconcile running, failed, and incomplete job lists
    running, incomplete_list = _reconcile_job_lists(
        running, failed_list, incomplete_list, workflow_is_running
    )

    # Determine final status based on running jobs, failures, and incomplete markers
    status = _determine_final_workflow_status(
        running=running,
        failed_list=failed_list,
        incomplete_list=incomplete_list,
        completed=completed,
        total=total,
        is_latest_log=is_latest_log,
        workflow_is_running=workflow_is_running,
    )

    return WorkflowProgress(
        workflow_dir=workflow_dir,
        status=status,
        total_jobs=total,
        completed_jobs=completed,
        failed_jobs=len(failed_list),
        failed_jobs_list=failed_list,
        incomplete_jobs_list=incomplete_list,
        running_jobs=running,
        recent_completions=completions,
        start_time=start_time,
        log_file=log_path,
    )

failure_tracker

Failure tracking with deduplication.

Classes

FailureTracker

Tracks job failures with deduplication.

Prevents duplicate failure entries for the same rule/jobid combination.

Source code in snakesee/parser/failure_tracker.py
class FailureTracker:
    """Tracks job failures with deduplication.

    Prevents duplicate failure entries for the same rule/jobid combination.
    """

    __slots__ = ("_failed_jobs", "_seen_failures")

    def __init__(self) -> None:
        """Initialize the failure tracker."""
        self._failed_jobs: list[JobInfo] = []
        self._seen_failures: set[tuple[str, str | None]] = set()

    def record_failure(
        self,
        rule: str,
        jobid: str | None = None,
        wildcards: dict[str, str] | None = None,
        threads: int | None = None,
        log_file: Path | None = None,
    ) -> bool:
        """Record a job failure.

        Args:
            rule: Name of the failed rule.
            jobid: Job identifier (if known).
            wildcards: Wildcard values.
            threads: Thread count.
            log_file: Path to the log file.

        Returns:
            True if this is a new failure, False if duplicate.
        """
        key = (rule, jobid)
        if key in self._seen_failures:
            return False

        self._seen_failures.add(key)
        self._failed_jobs.append(
            JobInfo(
                rule=rule,
                job_id=jobid,
                wildcards=wildcards,
                threads=threads,
                log_file=log_file,
            )
        )
        return True

    def get_failed_jobs(self) -> list[JobInfo]:
        """Get list of failed jobs.

        Returns:
            List of JobInfo for failed jobs.
        """
        return list(self._failed_jobs)

    def reset(self) -> None:
        """Clear all failure tracking state."""
        self._failed_jobs.clear()
        self._seen_failures.clear()
Functions
__init__
__init__() -> None

Initialize the failure tracker.

Source code in snakesee/parser/failure_tracker.py
def __init__(self) -> None:
    """Initialize the failure tracker."""
    self._failed_jobs: list[JobInfo] = []
    self._seen_failures: set[tuple[str, str | None]] = set()
get_failed_jobs
get_failed_jobs() -> list[JobInfo]

Get list of failed jobs.

Returns:

Type Description
list[JobInfo]

List of JobInfo for failed jobs.

Source code in snakesee/parser/failure_tracker.py
def get_failed_jobs(self) -> list[JobInfo]:
    """Get list of failed jobs.

    Returns:
        List of JobInfo for failed jobs.
    """
    return list(self._failed_jobs)
record_failure
record_failure(rule: str, jobid: str | None = None, wildcards: dict[str, str] | None = None, threads: int | None = None, log_file: Path | None = None) -> bool

Record a job failure.

Parameters:

Name Type Description Default
rule str

Name of the failed rule.

required
jobid str | None

Job identifier (if known).

None
wildcards dict[str, str] | None

Wildcard values.

None
threads int | None

Thread count.

None
log_file Path | None

Path to the log file.

None

Returns:

Type Description
bool

True if this is a new failure, False if duplicate.

Source code in snakesee/parser/failure_tracker.py
def record_failure(
    self,
    rule: str,
    jobid: str | None = None,
    wildcards: dict[str, str] | None = None,
    threads: int | None = None,
    log_file: Path | None = None,
) -> bool:
    """Record a job failure.

    Args:
        rule: Name of the failed rule.
        jobid: Job identifier (if known).
        wildcards: Wildcard values.
        threads: Thread count.
        log_file: Path to the log file.

    Returns:
        True if this is a new failure, False if duplicate.
    """
    key = (rule, jobid)
    if key in self._seen_failures:
        return False

    self._seen_failures.add(key)
    self._failed_jobs.append(
        JobInfo(
            rule=rule,
            job_id=jobid,
            wildcards=wildcards,
            threads=threads,
            log_file=log_file,
        )
    )
    return True
reset
reset() -> None

Clear all failure tracking state.

Source code in snakesee/parser/failure_tracker.py
def reset(self) -> None:
    """Clear all failure tracking state."""
    self._failed_jobs.clear()
    self._seen_failures.clear()

file_position

File position tracking with rotation detection.

Classes

LogFilePosition

Tracks file position and detects log rotation/truncation.

Handles common scenarios like log file rotation (inode change) and file truncation (size decrease).

Attributes:

Name Type Description
path

Path to the file being tracked.

Source code in snakesee/parser/file_position.py
class LogFilePosition:
    """Tracks file position and detects log rotation/truncation.

    Handles common scenarios like log file rotation (inode change)
    and file truncation (size decrease).

    Attributes:
        path: Path to the file being tracked.
    """

    __slots__ = ("path", "_offset", "_inode")

    def __init__(self, path: Path) -> None:
        """Initialize position tracker.

        Args:
            path: Path to the file to track.
        """
        self.path = path
        self._offset: int = 0
        self._inode: int | None = None

    @property
    def offset(self) -> int:
        """Current read position in the file."""
        return self._offset

    @offset.setter
    def offset(self, value: int) -> None:
        """Set the current read position."""
        self._offset = value

    def check_rotation(self) -> bool:
        """Check if the file has been rotated or truncated.

        Detects rotation by checking for inode change or file truncation
        (current size less than last known position).

        Returns:
            True if the file was rotated and position was reset.
        """
        try:
            stat = self.path.stat()
            current_inode = stat.st_ino
            current_size = stat.st_size

            if self._inode is not None and (
                current_inode != self._inode or current_size < self._offset
            ):
                self.reset()
                return True

            self._inode = current_inode
            return False
        except FileNotFoundError:
            return False
        except OSError:
            return False

    def reset(self) -> None:
        """Reset position to start of file."""
        self._offset = 0
        self._inode = None

    def clamp_to_size(self, file_size: int) -> None:
        """Clamp offset to file bounds.

        Args:
            file_size: Current file size (must be non-negative).
        """
        if file_size < 0:
            file_size = 0
        self._offset = min(self._offset, file_size)
Attributes
offset property writable
offset: int

Current read position in the file.

Functions
__init__
__init__(path: Path) -> None

Initialize position tracker.

Parameters:

Name Type Description Default
path Path

Path to the file to track.

required
Source code in snakesee/parser/file_position.py
def __init__(self, path: Path) -> None:
    """Initialize position tracker.

    Args:
        path: Path to the file to track.
    """
    self.path = path
    self._offset: int = 0
    self._inode: int | None = None
check_rotation
check_rotation() -> bool

Check if the file has been rotated or truncated.

Detects rotation by checking for inode change or file truncation (current size less than last known position).

Returns:

Type Description
bool

True if the file was rotated and position was reset.

Source code in snakesee/parser/file_position.py
def check_rotation(self) -> bool:
    """Check if the file has been rotated or truncated.

    Detects rotation by checking for inode change or file truncation
    (current size less than last known position).

    Returns:
        True if the file was rotated and position was reset.
    """
    try:
        stat = self.path.stat()
        current_inode = stat.st_ino
        current_size = stat.st_size

        if self._inode is not None and (
            current_inode != self._inode or current_size < self._offset
        ):
            self.reset()
            return True

        self._inode = current_inode
        return False
    except FileNotFoundError:
        return False
    except OSError:
        return False
clamp_to_size
clamp_to_size(file_size: int) -> None

Clamp offset to file bounds.

Parameters:

Name Type Description Default
file_size int

Current file size (must be non-negative).

required
Source code in snakesee/parser/file_position.py
def clamp_to_size(self, file_size: int) -> None:
    """Clamp offset to file bounds.

    Args:
        file_size: Current file size (must be non-negative).
    """
    if file_size < 0:
        file_size = 0
    self._offset = min(self._offset, file_size)
reset
reset() -> None

Reset position to start of file.

Source code in snakesee/parser/file_position.py
def reset(self) -> None:
    """Reset position to start of file."""
    self._offset = 0
    self._inode = None

job_tracker

Job lifecycle tracking from start to completion.

Classes

JobLifecycleTracker

Tracks jobs from start to completion.

Maintains state for started jobs, finished job IDs, and completed jobs with full timing information.

Source code in snakesee/parser/job_tracker.py
class JobLifecycleTracker:
    """Tracks jobs from start to completion.

    Maintains state for started jobs, finished job IDs, and completed
    jobs with full timing information.
    """

    __slots__ = (
        "_completed_jobs",
        "_finished_jobids",
        "_job_logs",
        "_started_jobs",
    )

    def __init__(self) -> None:
        """Initialize the job tracker."""
        self._started_jobs: dict[str, StartedJobData] = {}
        self._finished_jobids: set[str] = set()
        self._completed_jobs: list[JobInfo] = []
        self._job_logs: dict[str, str] = {}

    def start_job(
        self,
        jobid: str,
        rule: str,
        start_time: float | None = None,
        wildcards: dict[str, str] | None = None,
        threads: int | None = None,
    ) -> None:
        """Record a job starting.

        Args:
            jobid: Unique job identifier.
            rule: Name of the rule.
            start_time: Unix timestamp when job started.
            wildcards: Wildcard values for this job.
            threads: Number of threads allocated.
        """
        if jobid not in self._started_jobs:
            self._started_jobs[jobid] = StartedJobData(
                rule=rule,
                start_time=start_time,
                wildcards=wildcards,
                threads=threads,
            )

    def update_job(
        self,
        jobid: str,
        wildcards: dict[str, str] | None = None,
        threads: int | None = None,
    ) -> None:
        """Update an existing job's metadata.

        Args:
            jobid: Job identifier.
            wildcards: Wildcard values to update.
            threads: Thread count to update.
        """
        if jobid in self._started_jobs:
            if wildcards is not None:
                self._started_jobs[jobid]["wildcards"] = wildcards
            if threads is not None:
                self._started_jobs[jobid]["threads"] = threads

    def set_job_log(self, jobid: str, log_path: str) -> None:
        """Set the log file path for a job.

        Args:
            jobid: Job identifier.
            log_path: Path to the log file.
        """
        self._job_logs[jobid] = log_path

    def get_job_log(self, jobid: str) -> str | None:
        """Get the log file path for a job.

        Args:
            jobid: Job identifier.

        Returns:
            Log file path or None if not set.
        """
        return self._job_logs.get(jobid)

    def finish_job(self, jobid: str, end_time: float | None = None) -> JobInfo | None:
        """Record a job finishing.

        Args:
            jobid: Job identifier.
            end_time: Unix timestamp when job finished.

        Returns:
            JobInfo for the completed job, or None if job was not tracked.
        """
        self._finished_jobids.add(jobid)

        if jobid in self._started_jobs:
            job_data = self._started_jobs[jobid]
            log_path = self._job_logs.get(jobid)
            job_info = JobInfo(
                rule=job_data["rule"],
                job_id=jobid,
                start_time=job_data["start_time"],
                end_time=end_time,
                wildcards=job_data["wildcards"],
                threads=job_data["threads"],
                log_file=Path(log_path) if log_path else None,
            )
            self._completed_jobs.append(job_info)
            # Clean up started job data to prevent memory growth
            del self._started_jobs[jobid]
            # Also clean up the job log entry
            self._job_logs.pop(jobid, None)
            return job_info
        return None

    def get_running_jobs(self) -> list[JobInfo]:
        """Get list of jobs that started but haven't finished.

        Returns:
            List of JobInfo for running jobs.
        """
        running: list[JobInfo] = []
        for jobid, job_data in self._started_jobs.items():
            if jobid not in self._finished_jobids:
                log_path = self._job_logs.get(jobid)
                running.append(
                    JobInfo(
                        rule=job_data["rule"],
                        job_id=jobid,
                        start_time=job_data["start_time"],
                        wildcards=job_data["wildcards"],
                        threads=job_data["threads"],
                        log_file=Path(log_path) if log_path else None,
                    )
                )
        return running

    def get_completed_jobs(self) -> list[JobInfo]:
        """Get list of completed jobs sorted by end time (newest first).

        Returns:
            List of JobInfo for completed jobs.
        """
        return sorted(
            self._completed_jobs,
            key=lambda j: j.end_time or 0,
            reverse=True,
        )

    def is_job_started(self, jobid: str) -> bool:
        """Check if a job has been started.

        Args:
            jobid: Job identifier.

        Returns:
            True if the job has been started.
        """
        return jobid in self._started_jobs

    def reset(self) -> None:
        """Clear all job tracking state."""
        self._started_jobs.clear()
        self._finished_jobids.clear()
        self._completed_jobs.clear()
        self._job_logs.clear()
Functions
__init__
__init__() -> None

Initialize the job tracker.

Source code in snakesee/parser/job_tracker.py
def __init__(self) -> None:
    """Initialize the job tracker."""
    self._started_jobs: dict[str, StartedJobData] = {}
    self._finished_jobids: set[str] = set()
    self._completed_jobs: list[JobInfo] = []
    self._job_logs: dict[str, str] = {}
finish_job
finish_job(jobid: str, end_time: float | None = None) -> JobInfo | None

Record a job finishing.

Parameters:

Name Type Description Default
jobid str

Job identifier.

required
end_time float | None

Unix timestamp when job finished.

None

Returns:

Type Description
JobInfo | None

JobInfo for the completed job, or None if job was not tracked.

Source code in snakesee/parser/job_tracker.py
def finish_job(self, jobid: str, end_time: float | None = None) -> JobInfo | None:
    """Record a job finishing.

    Args:
        jobid: Job identifier.
        end_time: Unix timestamp when job finished.

    Returns:
        JobInfo for the completed job, or None if job was not tracked.
    """
    self._finished_jobids.add(jobid)

    if jobid in self._started_jobs:
        job_data = self._started_jobs[jobid]
        log_path = self._job_logs.get(jobid)
        job_info = JobInfo(
            rule=job_data["rule"],
            job_id=jobid,
            start_time=job_data["start_time"],
            end_time=end_time,
            wildcards=job_data["wildcards"],
            threads=job_data["threads"],
            log_file=Path(log_path) if log_path else None,
        )
        self._completed_jobs.append(job_info)
        # Clean up started job data to prevent memory growth
        del self._started_jobs[jobid]
        # Also clean up the job log entry
        self._job_logs.pop(jobid, None)
        return job_info
    return None
get_completed_jobs
get_completed_jobs() -> list[JobInfo]

Get list of completed jobs sorted by end time (newest first).

Returns:

Type Description
list[JobInfo]

List of JobInfo for completed jobs.

Source code in snakesee/parser/job_tracker.py
def get_completed_jobs(self) -> list[JobInfo]:
    """Get list of completed jobs sorted by end time (newest first).

    Returns:
        List of JobInfo for completed jobs.
    """
    return sorted(
        self._completed_jobs,
        key=lambda j: j.end_time or 0,
        reverse=True,
    )
get_job_log
get_job_log(jobid: str) -> str | None

Get the log file path for a job.

Parameters:

Name Type Description Default
jobid str

Job identifier.

required

Returns:

Type Description
str | None

Log file path or None if not set.

Source code in snakesee/parser/job_tracker.py
def get_job_log(self, jobid: str) -> str | None:
    """Get the log file path for a job.

    Args:
        jobid: Job identifier.

    Returns:
        Log file path or None if not set.
    """
    return self._job_logs.get(jobid)
get_running_jobs
get_running_jobs() -> list[JobInfo]

Get list of jobs that started but haven't finished.

Returns:

Type Description
list[JobInfo]

List of JobInfo for running jobs.

Source code in snakesee/parser/job_tracker.py
def get_running_jobs(self) -> list[JobInfo]:
    """Get list of jobs that started but haven't finished.

    Returns:
        List of JobInfo for running jobs.
    """
    running: list[JobInfo] = []
    for jobid, job_data in self._started_jobs.items():
        if jobid not in self._finished_jobids:
            log_path = self._job_logs.get(jobid)
            running.append(
                JobInfo(
                    rule=job_data["rule"],
                    job_id=jobid,
                    start_time=job_data["start_time"],
                    wildcards=job_data["wildcards"],
                    threads=job_data["threads"],
                    log_file=Path(log_path) if log_path else None,
                )
            )
    return running
is_job_started
is_job_started(jobid: str) -> bool

Check if a job has been started.

Parameters:

Name Type Description Default
jobid str

Job identifier.

required

Returns:

Type Description
bool

True if the job has been started.

Source code in snakesee/parser/job_tracker.py
def is_job_started(self, jobid: str) -> bool:
    """Check if a job has been started.

    Args:
        jobid: Job identifier.

    Returns:
        True if the job has been started.
    """
    return jobid in self._started_jobs
reset
reset() -> None

Clear all job tracking state.

Source code in snakesee/parser/job_tracker.py
def reset(self) -> None:
    """Clear all job tracking state."""
    self._started_jobs.clear()
    self._finished_jobids.clear()
    self._completed_jobs.clear()
    self._job_logs.clear()
set_job_log
set_job_log(jobid: str, log_path: str) -> None

Set the log file path for a job.

Parameters:

Name Type Description Default
jobid str

Job identifier.

required
log_path str

Path to the log file.

required
Source code in snakesee/parser/job_tracker.py
def set_job_log(self, jobid: str, log_path: str) -> None:
    """Set the log file path for a job.

    Args:
        jobid: Job identifier.
        log_path: Path to the log file.
    """
    self._job_logs[jobid] = log_path
start_job
start_job(jobid: str, rule: str, start_time: float | None = None, wildcards: dict[str, str] | None = None, threads: int | None = None) -> None

Record a job starting.

Parameters:

Name Type Description Default
jobid str

Unique job identifier.

required
rule str

Name of the rule.

required
start_time float | None

Unix timestamp when job started.

None
wildcards dict[str, str] | None

Wildcard values for this job.

None
threads int | None

Number of threads allocated.

None
Source code in snakesee/parser/job_tracker.py
def start_job(
    self,
    jobid: str,
    rule: str,
    start_time: float | None = None,
    wildcards: dict[str, str] | None = None,
    threads: int | None = None,
) -> None:
    """Record a job starting.

    Args:
        jobid: Unique job identifier.
        rule: Name of the rule.
        start_time: Unix timestamp when job started.
        wildcards: Wildcard values for this job.
        threads: Number of threads allocated.
    """
    if jobid not in self._started_jobs:
        self._started_jobs[jobid] = StartedJobData(
            rule=rule,
            start_time=start_time,
            wildcards=wildcards,
            threads=threads,
        )
update_job
update_job(jobid: str, wildcards: dict[str, str] | None = None, threads: int | None = None) -> None

Update an existing job's metadata.

Parameters:

Name Type Description Default
jobid str

Job identifier.

required
wildcards dict[str, str] | None

Wildcard values to update.

None
threads int | None

Thread count to update.

None
Source code in snakesee/parser/job_tracker.py
def update_job(
    self,
    jobid: str,
    wildcards: dict[str, str] | None = None,
    threads: int | None = None,
) -> None:
    """Update an existing job's metadata.

    Args:
        jobid: Job identifier.
        wildcards: Wildcard values to update.
        threads: Thread count to update.
    """
    if jobid in self._started_jobs:
        if wildcards is not None:
            self._started_jobs[jobid]["wildcards"] = wildcards
        if threads is not None:
            self._started_jobs[jobid]["threads"] = threads
StartedJobData

Bases: TypedDict

Data for a job that has started but not finished.

Source code in snakesee/parser/job_tracker.py
class StartedJobData(TypedDict):
    """Data for a job that has started but not finished."""

    rule: str
    start_time: float | None
    wildcards: dict[str, str] | None
    threads: int | None

line_parser

Log line parsing with context tracking.

Classes

LogLineParser dataclass

Parses individual Snakemake log lines.

Maintains parsing context across lines to handle multi-line log entries where information spans multiple lines.

Source code in snakesee/parser/line_parser.py
@dataclass
class LogLineParser:
    """Parses individual Snakemake log lines.

    Maintains parsing context across lines to handle multi-line
    log entries where information spans multiple lines.
    """

    context: ParsingContext = field(default_factory=ParsingContext)

    def parse_line(self, line: str) -> list[ParseEvent]:
        """Parse a single log line and return structured events.

        Uses fast-path prefix checks to skip expensive regex operations
        for lines that can't possibly match.

        Updates internal context as needed for multi-line entries.
        May return multiple events when a pending error needs to be flushed.

        Args:
            line: Log line to parse.

        Returns:
            List of ParseEvents (may be empty, one, or two events).
        """
        line = line.rstrip("\n\r")
        events: list[ParseEvent] = []

        # Fast path: empty lines
        if not line:
            return events

        first_char = line[0]

        # Timestamp lines start with '[' - this ends error blocks
        if first_char == "[":
            if match := TIMESTAMP_PATTERN.match(line):
                # Flush any pending error before emitting timestamp
                if pending := self.context.get_pending_error():
                    events.append(pending)
                timestamp = _parse_timestamp(match.group(1))
                self.context.timestamp = timestamp
                events.append(ParseEvent(ParseEventType.TIMESTAMP, {"timestamp": timestamp}))
            return events

        # Indented lines start with space/tab.  In group/pipe job blocks,
        # rule starts and timestamps are indented by 4 spaces.
        if first_char in (" ", "\t"):
            return self._parse_indented_or_group_line(line, events)

        # Rule/checkpoint start - this ends error blocks
        # Matches: "rule X:", "localrule X:", "checkpoint X:", "localcheckpoint X:"
        if first_char in ("r", "l", "c") and (
            line.startswith(("rule ", "localrule ", "checkpoint ", "localcheckpoint "))
        ):
            if match := RULE_START_PATTERN.match(line):
                # Flush any pending error before emitting rule start
                if pending := self.context.get_pending_error():
                    events.append(pending)
                rule = match.group(1)
                self.context.reset_for_new_rule(rule)
                events.append(ParseEvent(ParseEventType.RULE_START, {"rule": rule}))
            return events

        # Finished job: "Finished job X" or "Finished jobid: X"
        if first_char == "F" and line.startswith("Finished "):
            if match := FINISHED_JOB_PATTERN.search(line):
                jobid = match.group(1)
                events.append(
                    ParseEvent(
                        ParseEventType.JOB_FINISHED,
                        {"jobid": jobid, "timestamp": self.context.timestamp},
                    )
                )
            return events

        # Error detection: "Error in rule X:" - starts a pending error block
        if first_char == "E" and line.startswith("Error in rule "):
            if match := ERROR_IN_RULE_PATTERN.search(line):
                # Flush any previous pending error before starting new one
                if pending := self.context.get_pending_error():
                    events.append(pending)
                rule = match.group(1)
                # Start pending error - we'll capture jobid/log from subsequent lines
                self.context.start_error_block(rule)
            return events

        # Progress line: "X of Y steps (Z%) done" - check with substring first
        if "steps" in line and "done" in line:
            if match := PROGRESS_PATTERN.search(line):
                completed = int(match.group(1))
                total = int(match.group(2))
                events.append(
                    ParseEvent(ParseEventType.PROGRESS, {"completed": completed, "total": total})
                )

        return events

    def flush_pending_error(self) -> ParseEvent | None:
        """Flush any pending error event.

        Call this at end of file or when needing to ensure all errors are emitted.

        Returns:
            ParseEvent for the pending error, or None if no pending error.
        """
        return self.context.get_pending_error()

    def _parse_indented_or_group_line(
        self, line: str, events: list[ParseEvent]
    ) -> list[ParseEvent]:
        """Parse indented lines: group-block elements or property lines.

        In group/pipe job blocks, rule starts and timestamps are indented by
        4 spaces.  Property lines are indented by 4 (normal) or 8 (group) spaces.

        Args:
            line: Indented log line starting with space/tab.
            events: Mutable list to append events to.

        Returns:
            The events list (same object passed in).
        """
        stripped = line.lstrip()
        if not stripped:
            return events

        first_stripped = stripped[0]

        # Indented timestamp: "    [Mon Jan  6 10:00:00 2026]"
        if first_stripped == "[":
            if match := TIMESTAMP_PATTERN.match(stripped):
                if pending := self.context.get_pending_error():
                    events.append(pending)
                timestamp = _parse_timestamp(match.group(1))
                self.context.timestamp = timestamp
                events.append(ParseEvent(ParseEventType.TIMESTAMP, {"timestamp": timestamp}))
            return events

        # Indented rule start: "    rule X:", "    localrule X:",
        # "    checkpoint X:", or "    localcheckpoint X:"
        if first_stripped in ("r", "l", "c") and stripped.startswith(
            ("rule ", "localrule ", "checkpoint ", "localcheckpoint ")
        ):
            if match := RULE_START_PATTERN.match(stripped):
                if pending := self.context.get_pending_error():
                    events.append(pending)
                rule = match.group(1)
                self.context.reset_for_new_rule(rule)
                events.append(ParseEvent(ParseEventType.RULE_START, {"rule": rule}))
            return events

        # Property lines (wildcards, threads, log, jobid)
        event = self._parse_indented_line(line)
        if event:
            events.append(event)
        return events

    def _parse_indented_line(self, line: str) -> ParseEvent | None:
        """Parse indented property lines (wildcards, threads, log, jobid).

        Args:
            line: Indented log line starting with space/tab.

        Returns:
            ParseEvent if line contains a recognized property, None otherwise.
        """
        stripped = line.lstrip()

        # Check each property type by prefix (faster than regex)
        if stripped.startswith("wildcards:"):
            value = stripped[10:].strip()  # len('wildcards:') = 10
            wildcards = _parse_wildcards(value)
            self.context.wildcards = wildcards
            # Also update error context if in error block
            if self.context.has_pending_error():
                self.context.error_wildcards = wildcards
            return ParseEvent(
                ParseEventType.WILDCARDS,
                {"wildcards": wildcards, "jobid": self.context.jobid},
            )

        if stripped.startswith("threads:"):
            value = stripped[8:].strip()  # len('threads:') = 8
            threads = _parse_positive_int(value, "threads")
            if threads is not None:
                self.context.threads = threads
                # Also update error context if in error block
                if self.context.has_pending_error():
                    self.context.error_threads = threads
                return ParseEvent(
                    ParseEventType.THREADS,
                    {"threads": threads, "jobid": self.context.jobid},
                )
            return None

        if stripped.startswith("log:"):
            log_path = stripped[4:].strip()  # len('log:') = 4
            self.context.log_path = log_path
            # Also update error context if in error block
            if self.context.has_pending_error():
                self.context.error_log_path = log_path
            return ParseEvent(
                ParseEventType.LOG_PATH,
                {"log_path": log_path, "jobid": self.context.jobid},
            )

        if stripped.startswith("jobid:"):
            jobid = stripped[6:].strip()  # len('jobid:') = 6
            self.context.jobid = jobid
            # Also update error context if in error block
            if self.context.has_pending_error():
                self.context.error_jobid = jobid
            return ParseEvent(
                ParseEventType.JOBID,
                {
                    "jobid": jobid,
                    "rule": self.context.rule,
                    "wildcards": self.context.wildcards,
                    "threads": self.context.threads,
                    "timestamp": self.context.timestamp,
                    "log_path": self.context.log_path,
                },
            )

        return None

    def reset(self) -> None:
        """Reset parsing context."""
        self.context = ParsingContext()
Functions
flush_pending_error
flush_pending_error() -> ParseEvent | None

Flush any pending error event.

Call this at end of file or when needing to ensure all errors are emitted.

Returns:

Type Description
ParseEvent | None

ParseEvent for the pending error, or None if no pending error.

Source code in snakesee/parser/line_parser.py
def flush_pending_error(self) -> ParseEvent | None:
    """Flush any pending error event.

    Call this at end of file or when needing to ensure all errors are emitted.

    Returns:
        ParseEvent for the pending error, or None if no pending error.
    """
    return self.context.get_pending_error()
parse_line
parse_line(line: str) -> list[ParseEvent]

Parse a single log line and return structured events.

Uses fast-path prefix checks to skip expensive regex operations for lines that can't possibly match.

Updates internal context as needed for multi-line entries. May return multiple events when a pending error needs to be flushed.

Parameters:

Name Type Description Default
line str

Log line to parse.

required

Returns:

Type Description
list[ParseEvent]

List of ParseEvents (may be empty, one, or two events).

Source code in snakesee/parser/line_parser.py
def parse_line(self, line: str) -> list[ParseEvent]:
    """Parse a single log line and return structured events.

    Uses fast-path prefix checks to skip expensive regex operations
    for lines that can't possibly match.

    Updates internal context as needed for multi-line entries.
    May return multiple events when a pending error needs to be flushed.

    Args:
        line: Log line to parse.

    Returns:
        List of ParseEvents (may be empty, one, or two events).
    """
    line = line.rstrip("\n\r")
    events: list[ParseEvent] = []

    # Fast path: empty lines
    if not line:
        return events

    first_char = line[0]

    # Timestamp lines start with '[' - this ends error blocks
    if first_char == "[":
        if match := TIMESTAMP_PATTERN.match(line):
            # Flush any pending error before emitting timestamp
            if pending := self.context.get_pending_error():
                events.append(pending)
            timestamp = _parse_timestamp(match.group(1))
            self.context.timestamp = timestamp
            events.append(ParseEvent(ParseEventType.TIMESTAMP, {"timestamp": timestamp}))
        return events

    # Indented lines start with space/tab.  In group/pipe job blocks,
    # rule starts and timestamps are indented by 4 spaces.
    if first_char in (" ", "\t"):
        return self._parse_indented_or_group_line(line, events)

    # Rule/checkpoint start - this ends error blocks
    # Matches: "rule X:", "localrule X:", "checkpoint X:", "localcheckpoint X:"
    if first_char in ("r", "l", "c") and (
        line.startswith(("rule ", "localrule ", "checkpoint ", "localcheckpoint "))
    ):
        if match := RULE_START_PATTERN.match(line):
            # Flush any pending error before emitting rule start
            if pending := self.context.get_pending_error():
                events.append(pending)
            rule = match.group(1)
            self.context.reset_for_new_rule(rule)
            events.append(ParseEvent(ParseEventType.RULE_START, {"rule": rule}))
        return events

    # Finished job: "Finished job X" or "Finished jobid: X"
    if first_char == "F" and line.startswith("Finished "):
        if match := FINISHED_JOB_PATTERN.search(line):
            jobid = match.group(1)
            events.append(
                ParseEvent(
                    ParseEventType.JOB_FINISHED,
                    {"jobid": jobid, "timestamp": self.context.timestamp},
                )
            )
        return events

    # Error detection: "Error in rule X:" - starts a pending error block
    if first_char == "E" and line.startswith("Error in rule "):
        if match := ERROR_IN_RULE_PATTERN.search(line):
            # Flush any previous pending error before starting new one
            if pending := self.context.get_pending_error():
                events.append(pending)
            rule = match.group(1)
            # Start pending error - we'll capture jobid/log from subsequent lines
            self.context.start_error_block(rule)
        return events

    # Progress line: "X of Y steps (Z%) done" - check with substring first
    if "steps" in line and "done" in line:
        if match := PROGRESS_PATTERN.search(line):
            completed = int(match.group(1))
            total = int(match.group(2))
            events.append(
                ParseEvent(ParseEventType.PROGRESS, {"completed": completed, "total": total})
            )

    return events
reset
reset() -> None

Reset parsing context.

Source code in snakesee/parser/line_parser.py
def reset(self) -> None:
    """Reset parsing context."""
    self.context = ParsingContext()
ParseEvent

Bases: NamedTuple

Result of parsing a log line.

Source code in snakesee/parser/line_parser.py
class ParseEvent(NamedTuple):
    """Result of parsing a log line."""

    event_type: ParseEventType
    data: dict[str, object]
ParseEventType

Bases: Enum

Types of events that can be parsed from a log line.

Source code in snakesee/parser/line_parser.py
class ParseEventType(Enum):
    """Types of events that can be parsed from a log line."""

    TIMESTAMP = "timestamp"
    PROGRESS = "progress"
    RULE_START = "rule_start"
    WILDCARDS = "wildcards"
    THREADS = "threads"
    LOG_PATH = "log_path"
    JOBID = "jobid"
    JOB_FINISHED = "job_finished"
    ERROR = "error"
ParsingContext dataclass

Current parsing state for multi-line log entries.

Snakemake logs use multi-line blocks where context from earlier lines (rule, wildcards, etc.) applies to later lines (jobid).

Source code in snakesee/parser/line_parser.py
@dataclass
class ParsingContext:
    """Current parsing state for multi-line log entries.

    Snakemake logs use multi-line blocks where context from earlier
    lines (rule, wildcards, etc.) applies to later lines (jobid).
    """

    rule: str | None = None
    jobid: str | None = None
    wildcards: dict[str, str] | None = None
    threads: int | None = None
    timestamp: float | None = None
    log_path: str | None = None

    # Pending error state - we defer ERROR emission until the error block is fully parsed
    # because jobid and log come AFTER "Error in rule X:" in the error block
    pending_error_rule: str | None = None
    error_jobid: str | None = None
    error_wildcards: dict[str, str] | None = None
    error_threads: int | None = None
    error_log_path: str | None = None

    def reset_for_new_rule(self, rule: str) -> None:
        """Reset context when entering a new rule block.

        Args:
            rule: Name of the new rule.
        """
        self.rule = rule
        self.jobid = None
        self.wildcards = None
        self.threads = None
        self.log_path = None

    def start_error_block(self, rule: str) -> None:
        """Start tracking a pending error block.

        Args:
            rule: Name of the rule that errored.
        """
        self.pending_error_rule = rule
        # Initialize with current context if rule matches
        if self.rule == rule:
            self.error_jobid = self.jobid
            self.error_wildcards = self.wildcards
            self.error_threads = self.threads
            self.error_log_path = self.log_path
        else:
            self.error_jobid = None
            self.error_wildcards = None
            self.error_threads = None
            self.error_log_path = None

    def get_pending_error(self) -> ParseEvent | None:
        """Get the pending error event and clear it.

        Returns:
            ParseEvent for the error, or None if no pending error.
        """
        if self.pending_error_rule is None:
            return None

        # Strip "(check log file(s) for error details)" suffix from error blocks
        log_path = self.error_log_path
        if log_path and " (check log file" in log_path:
            log_path = log_path.split(" (check log file")[0].strip()

        event = ParseEvent(
            ParseEventType.ERROR,
            {
                "rule": self.pending_error_rule,
                "jobid": self.error_jobid,
                "wildcards": self.error_wildcards,
                "threads": self.error_threads,
                "log_path": log_path,
            },
        )
        # Clear pending error state
        self.pending_error_rule = None
        self.error_jobid = None
        self.error_wildcards = None
        self.error_threads = None
        self.error_log_path = None
        return event

    def has_pending_error(self) -> bool:
        """Check if there's a pending error to emit."""
        return self.pending_error_rule is not None
Functions
get_pending_error
get_pending_error() -> ParseEvent | None

Get the pending error event and clear it.

Returns:

Type Description
ParseEvent | None

ParseEvent for the error, or None if no pending error.

Source code in snakesee/parser/line_parser.py
def get_pending_error(self) -> ParseEvent | None:
    """Get the pending error event and clear it.

    Returns:
        ParseEvent for the error, or None if no pending error.
    """
    if self.pending_error_rule is None:
        return None

    # Strip "(check log file(s) for error details)" suffix from error blocks
    log_path = self.error_log_path
    if log_path and " (check log file" in log_path:
        log_path = log_path.split(" (check log file")[0].strip()

    event = ParseEvent(
        ParseEventType.ERROR,
        {
            "rule": self.pending_error_rule,
            "jobid": self.error_jobid,
            "wildcards": self.error_wildcards,
            "threads": self.error_threads,
            "log_path": log_path,
        },
    )
    # Clear pending error state
    self.pending_error_rule = None
    self.error_jobid = None
    self.error_wildcards = None
    self.error_threads = None
    self.error_log_path = None
    return event
has_pending_error
has_pending_error() -> bool

Check if there's a pending error to emit.

Source code in snakesee/parser/line_parser.py
def has_pending_error(self) -> bool:
    """Check if there's a pending error to emit."""
    return self.pending_error_rule is not None
reset_for_new_rule
reset_for_new_rule(rule: str) -> None

Reset context when entering a new rule block.

Parameters:

Name Type Description Default
rule str

Name of the new rule.

required
Source code in snakesee/parser/line_parser.py
def reset_for_new_rule(self, rule: str) -> None:
    """Reset context when entering a new rule block.

    Args:
        rule: Name of the new rule.
    """
    self.rule = rule
    self.jobid = None
    self.wildcards = None
    self.threads = None
    self.log_path = None
start_error_block
start_error_block(rule: str) -> None

Start tracking a pending error block.

Parameters:

Name Type Description Default
rule str

Name of the rule that errored.

required
Source code in snakesee/parser/line_parser.py
def start_error_block(self, rule: str) -> None:
    """Start tracking a pending error block.

    Args:
        rule: Name of the rule that errored.
    """
    self.pending_error_rule = rule
    # Initialize with current context if rule matches
    if self.rule == rule:
        self.error_jobid = self.jobid
        self.error_wildcards = self.wildcards
        self.error_threads = self.threads
        self.error_log_path = self.log_path
    else:
        self.error_jobid = None
        self.error_wildcards = None
        self.error_threads = None
        self.error_log_path = None

log_reader

Coordinator for incremental log reading.

Classes

IncrementalLogReader

Streaming reader for Snakemake log files with position tracking.

Reads log lines incrementally, tracking the current file position to only parse new content on subsequent calls. Maintains cumulative state for running jobs, completed jobs, failed jobs, and progress.

Handles log file rotation by detecting inode changes or file truncation.

This is a coordinator class that delegates to specialized components: - LogFilePosition: File position tracking and rotation detection - LogLineParser: Log line parsing with context tracking - JobLifecycleTracker: Job start/finish tracking - FailureTracker: Failure deduplication

Attributes:

Name Type Description
log_path

Path to the log file being monitored.

Source code in snakesee/parser/log_reader.py
class IncrementalLogReader:
    """Streaming reader for Snakemake log files with position tracking.

    Reads log lines incrementally, tracking the current file position to only
    parse new content on subsequent calls. Maintains cumulative state for
    running jobs, completed jobs, failed jobs, and progress.

    Handles log file rotation by detecting inode changes or file truncation.

    This is a coordinator class that delegates to specialized components:
    - LogFilePosition: File position tracking and rotation detection
    - LogLineParser: Log line parsing with context tracking
    - JobLifecycleTracker: Job start/finish tracking
    - FailureTracker: Failure deduplication

    Attributes:
        log_path: Path to the log file being monitored.
    """

    def __init__(self, log_path: Path) -> None:
        """Initialize the incremental log reader.

        Args:
            log_path: Path to the Snakemake log file.
        """
        self.log_path = log_path
        self._lock = threading.RLock()

        # Delegate components
        self._position = LogFilePosition(log_path)
        self._parser = LogLineParser()
        self._jobs = JobLifecycleTracker()
        self._failures = FailureTracker()

        # Progress state
        self._completed: int = 0
        self._total: int = 0

    def reset(self) -> None:
        """Reset reader to start of file and clear all state.

        Call this when switching to a different log file or to re-read
        from the beginning.
        """
        with self._lock:
            self._reset_unlocked()

    def set_log_path(self, log_path: Path) -> None:
        """Change the log file being monitored.

        Resets all state if the path changes.

        Args:
            log_path: New log file path.
        """
        with self._lock:
            if log_path != self.log_path:
                self.log_path = log_path
                self._position = LogFilePosition(log_path)
                self._reset_unlocked()

    def _reset_unlocked(self) -> None:
        """Reset state without acquiring lock (caller must hold lock)."""
        self._position.reset()
        self._parser.reset()
        self._jobs.reset()
        self._failures.reset()
        self._completed = 0
        self._total = 0

    def read_new_lines(self) -> int:
        """Read and parse new lines from the log file.

        Updates internal state based on new log content. This method
        should be called periodically to process new log entries.

        Returns:
            Number of new lines processed.
        """
        if not self.log_path.exists():
            return 0

        with self._lock:
            if self._position.check_rotation():
                # File was rotated - reset all state
                self._parser.reset()
                self._jobs.reset()
                self._failures.reset()
                self._completed = 0
                self._total = 0

            lines_processed = 0
            try:
                with open(self.log_path, "r", encoding="utf-8", errors="replace") as f:
                    # Clamp offset to file bounds
                    file_size = f.seek(0, 2)
                    self._position.clamp_to_size(file_size)
                    f.seek(self._position.offset)

                    for line in f:
                        self._process_line(line)
                        lines_processed += 1

                    self._position.offset = f.tell()

                    # Flush any pending error at end of file
                    if pending := self._parser.flush_pending_error():
                        self._process_event(pending)
            except FileNotFoundError:
                pass
            except PermissionError as e:
                logger.warning("Permission denied reading log file %s: %s", self.log_path, e)
            except OSError as e:
                logger.warning("Error reading log file %s: %s", self.log_path, e)

            return lines_processed

    def _process_line(self, line: str) -> None:
        """Process a parsed line and update state.

        Args:
            line: Raw log line to process.
        """
        events = self._parser.parse_line(line)
        for event in events:
            self._process_event(event)

    def _process_event(self, event: ParseEvent) -> None:
        """Process a single parsed event and update state.

        Args:
            event: Parsed event to process.
        """
        if event.event_type == ParseEventType.PROGRESS:
            completed = event.data["completed"]
            total = event.data["total"]
            if isinstance(completed, int) and isinstance(total, int):
                self._completed = completed
                self._total = total

        elif event.event_type == ParseEventType.JOBID:
            rule = event.data.get("rule")
            if rule is not None:
                jobid = str(event.data["jobid"])
                if not self._jobs.is_job_started(jobid):
                    timestamp = event.data.get("timestamp")
                    wildcards = event.data.get("wildcards")
                    threads = event.data.get("threads")
                    self._jobs.start_job(
                        jobid=jobid,
                        rule=str(rule),
                        start_time=float(timestamp)
                        if isinstance(timestamp, (int, float))
                        else None,
                        wildcards=wildcards if isinstance(wildcards, dict) else None,
                        threads=threads if isinstance(threads, int) else None,
                    )
                log_path = event.data.get("log_path")
                if isinstance(log_path, str):
                    self._jobs.set_job_log(jobid, log_path)

        elif event.event_type == ParseEventType.WILDCARDS:
            wc_jobid = event.data.get("jobid")
            wc_wildcards = event.data.get("wildcards")
            if isinstance(wc_jobid, str) and isinstance(wc_wildcards, dict):
                self._jobs.update_job(wc_jobid, wildcards=wc_wildcards)

        elif event.event_type == ParseEventType.THREADS:
            th_jobid = event.data.get("jobid")
            th_threads = event.data.get("threads")
            if isinstance(th_jobid, str) and isinstance(th_threads, int):
                self._jobs.update_job(th_jobid, threads=th_threads)

        elif event.event_type == ParseEventType.LOG_PATH:
            lp_jobid = event.data.get("jobid")
            lp_path = event.data.get("log_path")
            if isinstance(lp_jobid, str) and isinstance(lp_path, str):
                self._jobs.set_job_log(lp_jobid, lp_path)

        elif event.event_type == ParseEventType.JOB_FINISHED:
            fin_jobid = str(event.data["jobid"])
            fin_timestamp = event.data.get("timestamp")
            fin_end = float(fin_timestamp) if isinstance(fin_timestamp, (int, float)) else None
            self._jobs.finish_job(fin_jobid, end_time=fin_end)

        elif event.event_type == ParseEventType.ERROR:
            err_rule = str(event.data["rule"])
            err_jobid = event.data.get("jobid")
            err_log = event.data.get("log_path")
            err_wildcards = event.data.get("wildcards")
            err_threads = event.data.get("threads")
            self._failures.record_failure(
                rule=err_rule,
                jobid=str(err_jobid) if err_jobid is not None else None,
                wildcards=err_wildcards if isinstance(err_wildcards, dict) else None,
                threads=err_threads if isinstance(err_threads, int) else None,
                log_file=Path(err_log) if isinstance(err_log, str) else None,
            )

    @property
    def progress(self) -> tuple[int, int]:
        """Get current workflow progress.

        Returns:
            Tuple of (completed_count, total_count).
        """
        with self._lock:
            return self._completed, self._total

    @property
    def running_jobs(self) -> list[JobInfo]:
        """Get list of currently running jobs.

        Returns:
            List of JobInfo for jobs that started but haven't finished.
        """
        with self._lock:
            return self._jobs.get_running_jobs()

    @property
    def completed_jobs(self) -> list[JobInfo]:
        """Get list of completed jobs with timing info.

        Returns:
            List of JobInfo for completed jobs, sorted by end time (newest first).
        """
        with self._lock:
            return self._jobs.get_completed_jobs()

    @property
    def failed_jobs(self) -> list[JobInfo]:
        """Get list of failed jobs.

        Returns:
            List of JobInfo for jobs that encountered errors.
        """
        with self._lock:
            return self._failures.get_failed_jobs()
Attributes
completed_jobs property
completed_jobs: list[JobInfo]

Get list of completed jobs with timing info.

Returns:

Type Description
list[JobInfo]

List of JobInfo for completed jobs, sorted by end time (newest first).

failed_jobs property
failed_jobs: list[JobInfo]

Get list of failed jobs.

Returns:

Type Description
list[JobInfo]

List of JobInfo for jobs that encountered errors.

progress property
progress: tuple[int, int]

Get current workflow progress.

Returns:

Type Description
tuple[int, int]

Tuple of (completed_count, total_count).

running_jobs property
running_jobs: list[JobInfo]

Get list of currently running jobs.

Returns:

Type Description
list[JobInfo]

List of JobInfo for jobs that started but haven't finished.

Functions
__init__
__init__(log_path: Path) -> None

Initialize the incremental log reader.

Parameters:

Name Type Description Default
log_path Path

Path to the Snakemake log file.

required
Source code in snakesee/parser/log_reader.py
def __init__(self, log_path: Path) -> None:
    """Initialize the incremental log reader.

    Args:
        log_path: Path to the Snakemake log file.
    """
    self.log_path = log_path
    self._lock = threading.RLock()

    # Delegate components
    self._position = LogFilePosition(log_path)
    self._parser = LogLineParser()
    self._jobs = JobLifecycleTracker()
    self._failures = FailureTracker()

    # Progress state
    self._completed: int = 0
    self._total: int = 0
read_new_lines
read_new_lines() -> int

Read and parse new lines from the log file.

Updates internal state based on new log content. This method should be called periodically to process new log entries.

Returns:

Type Description
int

Number of new lines processed.

Source code in snakesee/parser/log_reader.py
def read_new_lines(self) -> int:
    """Read and parse new lines from the log file.

    Updates internal state based on new log content. This method
    should be called periodically to process new log entries.

    Returns:
        Number of new lines processed.
    """
    if not self.log_path.exists():
        return 0

    with self._lock:
        if self._position.check_rotation():
            # File was rotated - reset all state
            self._parser.reset()
            self._jobs.reset()
            self._failures.reset()
            self._completed = 0
            self._total = 0

        lines_processed = 0
        try:
            with open(self.log_path, "r", encoding="utf-8", errors="replace") as f:
                # Clamp offset to file bounds
                file_size = f.seek(0, 2)
                self._position.clamp_to_size(file_size)
                f.seek(self._position.offset)

                for line in f:
                    self._process_line(line)
                    lines_processed += 1

                self._position.offset = f.tell()

                # Flush any pending error at end of file
                if pending := self._parser.flush_pending_error():
                    self._process_event(pending)
        except FileNotFoundError:
            pass
        except PermissionError as e:
            logger.warning("Permission denied reading log file %s: %s", self.log_path, e)
        except OSError as e:
            logger.warning("Error reading log file %s: %s", self.log_path, e)

        return lines_processed
reset
reset() -> None

Reset reader to start of file and clear all state.

Call this when switching to a different log file or to re-read from the beginning.

Source code in snakesee/parser/log_reader.py
def reset(self) -> None:
    """Reset reader to start of file and clear all state.

    Call this when switching to a different log file or to re-read
    from the beginning.
    """
    with self._lock:
        self._reset_unlocked()
set_log_path
set_log_path(log_path: Path) -> None

Change the log file being monitored.

Resets all state if the path changes.

Parameters:

Name Type Description Default
log_path Path

New log file path.

required
Source code in snakesee/parser/log_reader.py
def set_log_path(self, log_path: Path) -> None:
    """Change the log file being monitored.

    Resets all state if the path changes.

    Args:
        log_path: New log file path.
    """
    with self._lock:
        if log_path != self.log_path:
            self.log_path = log_path
            self._position = LogFilePosition(log_path)
            self._reset_unlocked()

metadata

Metadata file parsing for Snakemake workflows.

This module handles parsing of .snakemake/metadata/ files which contain information about completed jobs, including timing, wildcards, and code.

Note: Currently (Snakemake <= 8.x), metadata files do NOT store wildcards. Wildcards are only available from live log events during the current session. This means combination-based estimates (wildcard+threads) only work for jobs that ran in the current session.

TODO: Once https://github.com/snakemake/snakemake/pull/3888 is merged and released, metadata files will include wildcards, enabling historical combination-based estimates across sessions.

Classes

MetadataRecord dataclass

Single metadata file parsed data for efficient single-pass collection.

Contains all fields needed by various collection functions so we only read each metadata file once.

Source code in snakesee/parser/metadata.py
@dataclass(frozen=True, slots=True)
class MetadataRecord:
    """Single metadata file parsed data for efficient single-pass collection.

    Contains all fields needed by various collection functions so we only
    read each metadata file once.
    """

    rule: str
    start_time: float | None = None
    end_time: float | None = None
    wildcards: dict[str, str] | None = None
    input_size: int | None = None
    code_hash: str | None = None

    @property
    def duration(self) -> float | None:
        """Calculate duration from start and end times."""
        if self.start_time is not None and self.end_time is not None:
            return self.end_time - self.start_time
        return None

    def to_job_info(self) -> JobInfo:
        """Convert to JobInfo for compatibility with existing code."""
        return JobInfo(
            rule=self.rule,
            start_time=self.start_time,
            end_time=self.end_time,
            wildcards=self.wildcards,
            input_size=self.input_size,
        )
Attributes
duration property
duration: float | None

Calculate duration from start and end times.

Functions
to_job_info
to_job_info() -> JobInfo

Convert to JobInfo for compatibility with existing code.

Source code in snakesee/parser/metadata.py
def to_job_info(self) -> JobInfo:
    """Convert to JobInfo for compatibility with existing code."""
    return JobInfo(
        rule=self.rule,
        start_time=self.start_time,
        end_time=self.end_time,
        wildcards=self.wildcards,
        input_size=self.input_size,
    )

Functions

calculate_metadata_input_size
calculate_metadata_input_size(input_files: list[str] | None) -> int | None

Calculate total input size from file list.

Parameters:

Name Type Description Default
input_files list[str] | None

List of input file paths from metadata.

required

Returns:

Type Description
int | None

Total size in bytes, or None if not a valid list or any file is missing.

Source code in snakesee/parser/metadata.py
def calculate_metadata_input_size(input_files: list[str] | None) -> int | None:
    """Calculate total input size from file list.

    Args:
        input_files: List of input file paths from metadata.

    Returns:
        Total size in bytes, or None if not a valid list or any file is missing.
    """
    if not isinstance(input_files, list) or not input_files:
        return None

    total_size = 0
    for f in input_files:
        try:
            total_size += Path(f).stat().st_size
        except OSError:
            return None
    return total_size
collect_rule_code_hashes
collect_rule_code_hashes(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> dict[str, set[str]]

Collect code hashes for each rule from metadata files.

This enables detection of renamed rules by matching their shell code. If two rules have the same code hash, they are likely the same rule that was renamed.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None

Returns:

Type Description
dict[str, set[str]]

Dictionary mapping code_hash -> set of rule names that use that code.

Source code in snakesee/parser/metadata.py
def collect_rule_code_hashes(
    metadata_dir: Path,
    progress_callback: ProgressCallback | None = None,
) -> dict[str, set[str]]:
    """
    Collect code hashes for each rule from metadata files.

    This enables detection of renamed rules by matching their shell code.
    If two rules have the same code hash, they are likely the same rule
    that was renamed.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress reporting.

    Returns:
        Dictionary mapping code_hash -> set of rule names that use that code.
    """
    hash_to_rules: dict[str, set[str]] = {}

    if not metadata_dir.exists():
        return hash_to_rules

    # Use optimized iterate_metadata_files (6-7x faster than rglob)
    for _path, data in iterate_metadata_files(
        metadata_dir,
        progress_callback,
        sort_by_mtime=False,  # Order doesn't matter for code hash collection
        use_cache=False,  # We need to read code field which isn't cached
    ):
        rule = data.get("rule")
        code = data.get("code")

        if rule and code:
            # Normalize whitespace before hashing to handle formatting differences
            normalized_code = " ".join(code.split())
            code_hash = hashlib.sha256(normalized_code.encode()).hexdigest()[:16]

            if code_hash not in hash_to_rules:
                hash_to_rules[code_hash] = set()
            hash_to_rules[code_hash].add(rule)

    return hash_to_rules
parse_metadata_files
parse_metadata_files(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> Iterator[JobInfo]

Parse completed job information from Snakemake metadata files.

Reads JSON metadata files from .snakemake/metadata/ to extract timing information for completed jobs, including input file sizes.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None

Yields:

Type Description
JobInfo

JobInfo instances for each completed job found.

Source code in snakesee/parser/metadata.py
def parse_metadata_files(
    metadata_dir: Path,
    progress_callback: ProgressCallback | None = None,
) -> Iterator[JobInfo]:
    """
    Parse completed job information from Snakemake metadata files.

    Reads JSON metadata files from .snakemake/metadata/ to extract
    timing information for completed jobs, including input file sizes.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress reporting.

    Yields:
        JobInfo instances for each completed job found.
    """
    for _path, data in iterate_metadata_files(metadata_dir, progress_callback):
        rule = data.get("rule")
        starttime = data.get("starttime")
        endtime = data.get("endtime")

        if rule is not None and starttime is not None and endtime is not None:
            # Extract wildcards if present (Snakemake stores as dict)
            wildcards_data = data.get("wildcards")
            wildcards: dict[str, str] | None = None
            if isinstance(wildcards_data, dict):
                wildcards = {str(k): str(v) for k, v in wildcards_data.items()}

            yield JobInfo(
                rule=rule,
                start_time=starttime,
                end_time=endtime,
                wildcards=wildcards,
                input_size=calculate_metadata_input_size(data.get("input")),
            )
parse_metadata_files_full
parse_metadata_files_full(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> Iterator[MetadataRecord]

Parse all metadata from Snakemake metadata files in a single pass.

This is more efficient than calling parse_metadata_files and collect_rule_code_hashes separately, as it reads each file only once.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None

Yields:

Type Description
MetadataRecord

MetadataRecord instances containing timing and code hash data.

Source code in snakesee/parser/metadata.py
def parse_metadata_files_full(
    metadata_dir: Path,
    progress_callback: ProgressCallback | None = None,
) -> Iterator[MetadataRecord]:
    """
    Parse all metadata from Snakemake metadata files in a single pass.

    This is more efficient than calling parse_metadata_files and
    collect_rule_code_hashes separately, as it reads each file only once.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress reporting.

    Yields:
        MetadataRecord instances containing timing and code hash data.
    """
    for _path, data in iterate_metadata_files(metadata_dir, progress_callback):
        rule = data.get("rule")
        if rule is None:
            continue

        # Extract timing data
        starttime = data.get("starttime")
        endtime = data.get("endtime")

        # Extract wildcards if present
        wildcards_data = data.get("wildcards")
        wildcards: dict[str, str] | None = None
        if isinstance(wildcards_data, dict):
            wildcards = {str(k): str(v) for k, v in wildcards_data.items()}

        # Extract and hash code
        code_hash: str | None = None
        code = data.get("code")
        if code:
            normalized_code = " ".join(code.split())
            code_hash = hashlib.sha256(normalized_code.encode()).hexdigest()[:16]

        yield MetadataRecord(
            rule=rule,
            start_time=starttime,
            end_time=endtime,
            wildcards=wildcards,
            input_size=calculate_metadata_input_size(data.get("input")),
            code_hash=code_hash,
        )

patterns

Regex patterns for parsing Snakemake log files.

This module is the single source of truth for all log parsing patterns. Both core.py and line_parser.py import from here to avoid duplication.

stats

Timing statistics collection for Snakemake workflows.

This module collects and aggregates timing statistics from metadata files for use in time estimation.

Classes

Functions

collect_rule_timing_stats
collect_rule_timing_stats(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> dict[str, RuleTimingStats]

Collect historical timing statistics per rule from metadata.

Aggregates all completed job timings by rule name, sorted chronologically by end time. Includes timestamps for time-based weighted estimation. Input sizes are included when available from job metadata.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None

Returns:

Type Description
dict[str, RuleTimingStats]

Dictionary mapping rule names to their timing statistics.

Source code in snakesee/parser/stats.py
def collect_rule_timing_stats(
    metadata_dir: Path,
    progress_callback: ProgressCallback | None = None,
) -> dict[str, RuleTimingStats]:
    """
    Collect historical timing statistics per rule from metadata.

    Aggregates all completed job timings by rule name, sorted chronologically
    by end time. Includes timestamps for time-based weighted estimation.
    Input sizes are included when available from job metadata.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress reporting.

    Returns:
        Dictionary mapping rule names to their timing statistics.
    """
    # First, collect all jobs with their timing info
    # rule -> [(duration, end_time, input_size), ...]
    jobs_by_rule: dict[str, list[tuple[float, float, int | None]]] = {}

    for job in parse_metadata_files(metadata_dir, progress_callback=progress_callback):
        duration = job.duration
        end_time = job.end_time
        if duration is None or end_time is None:
            continue

        if job.rule not in jobs_by_rule:
            jobs_by_rule[job.rule] = []
        jobs_by_rule[job.rule].append((duration, end_time, job.input_size))

    # Build stats with sorted durations, timestamps, and input sizes
    stats: dict[str, RuleTimingStats] = {}
    for rule, timing_tuples in jobs_by_rule.items():
        # Sort by end_time (oldest first) for consistent ordering
        timing_tuples.sort(key=lambda x: x[1])

        durations = [t[0] for t in timing_tuples]
        timestamps = [t[1] for t in timing_tuples]
        input_sizes = [t[2] for t in timing_tuples]

        stats[rule] = RuleTimingStats(
            rule=rule,
            durations=durations,
            timestamps=timestamps,
            input_sizes=input_sizes,
        )

    return stats
collect_wildcard_timing_stats
collect_wildcard_timing_stats(metadata_dir: Path, progress_callback: ProgressCallback | None = None) -> dict[str, dict[str, WildcardTimingStats]]

Collect timing statistics per rule, conditioned on wildcards.

Groups execution times by (rule, wildcard_key, wildcard_value) for rules that have wildcards in their metadata.

Parameters:

Name Type Description Default
metadata_dir Path

Path to .snakemake/metadata/ directory.

required
progress_callback ProgressCallback | None

Optional callback(current, total) for progress reporting.

None

Returns:

Type Description
dict[str, dict[str, WildcardTimingStats]]

Nested dictionary: rule -> wildcard_key -> WildcardTimingStats

Source code in snakesee/parser/stats.py
def collect_wildcard_timing_stats(
    metadata_dir: Path,
    progress_callback: ProgressCallback | None = None,
) -> dict[str, dict[str, WildcardTimingStats]]:
    """Collect timing statistics per rule, conditioned on wildcards.

    Groups execution times by (rule, wildcard_key, wildcard_value) for rules
    that have wildcards in their metadata.

    Args:
        metadata_dir: Path to .snakemake/metadata/ directory.
        progress_callback: Optional callback(current, total) for progress reporting.

    Returns:
        Nested dictionary: rule -> wildcard_key -> WildcardTimingStats
    """
    # Collect all jobs with wildcards
    # Structure: rule -> wildcard_key -> wildcard_value -> [(duration, end_time), ...]
    data: dict[str, dict[str, dict[str, list[tuple[float, float]]]]] = {}

    for job in parse_metadata_files(metadata_dir, progress_callback=progress_callback):
        duration = job.duration
        end_time = job.end_time
        if duration is None or end_time is None or not job.wildcards:
            continue

        if job.rule not in data:
            data[job.rule] = {}

        for wc_key, wc_value in job.wildcards.items():
            if wc_key not in data[job.rule]:
                data[job.rule][wc_key] = {}
            if wc_value not in data[job.rule][wc_key]:
                data[job.rule][wc_key][wc_value] = []

            data[job.rule][wc_key][wc_value].append((duration, end_time))

    # Build WildcardTimingStats objects
    result: dict[str, dict[str, WildcardTimingStats]] = {}
    for rule, wc_keys in data.items():
        result[rule] = {
            wc_key: _build_wildcard_stats_for_key(rule, wc_key, wc_values)
            for wc_key, wc_values in wc_keys.items()
        }

    return result

utils

Utility functions for parsing.

Small helper functions used across parser modules.

Functions

calculate_input_size
calculate_input_size(file_paths: list[Path]) -> int | None

Calculate total size of input files.

Parameters:

Name Type Description Default
file_paths list[Path]

List of input file paths.

required

Returns:

Type Description
int | None

Total size in bytes, or None if any file doesn't exist.

Source code in snakesee/parser/utils.py
def calculate_input_size(file_paths: list[Path]) -> int | None:
    """
    Calculate total size of input files.

    Args:
        file_paths: List of input file paths.

    Returns:
        Total size in bytes, or None if any file doesn't exist.
    """
    total_size = 0
    for path in file_paths:
        try:
            total_size += path.stat().st_size
        except OSError:
            return None  # File doesn't exist or can't be accessed
    return total_size if file_paths else None
estimate_input_size_from_output
estimate_input_size_from_output(output_path: Path, workflow_dir: Path) -> int | None

Try to estimate input size by looking for related input files.

This is a heuristic that works for common bioinformatics patterns where output files are derived from inputs with predictable naming conventions.

Examples:

  • sample.sorted.bam -> sample.bam
  • sample.fastq.gz -> looks for sample.fq.gz, sample.fastq.gz
  • sample.vcf.gz -> sample.bam

Parameters:

Name Type Description Default
output_path Path

Path to the output file.

required
workflow_dir Path

Workflow root directory.

required

Returns:

Type Description
int | None

Estimated input size in bytes, or None if not determinable.

Source code in snakesee/parser/utils.py
def estimate_input_size_from_output(
    output_path: Path,
    workflow_dir: Path,
) -> int | None:
    """
    Try to estimate input size by looking for related input files.

    This is a heuristic that works for common bioinformatics patterns where
    output files are derived from inputs with predictable naming conventions.

    Examples:
        - sample.sorted.bam -> sample.bam
        - sample.fastq.gz -> looks for sample.fq.gz, sample.fastq.gz
        - sample.vcf.gz -> sample.bam

    Args:
        output_path: Path to the output file.
        workflow_dir: Workflow root directory.

    Returns:
        Estimated input size in bytes, or None if not determinable.
    """
    # Common input file patterns relative to output
    suffixes_to_strip = [
        ".sorted.bam",
        ".sorted",
        ".trimmed",
        ".filtered",
        ".dedup",
        ".aligned",
    ]

    name = output_path.name

    # Try stripping common suffixes to find input
    for suffix in suffixes_to_strip:
        if name.endswith(suffix):
            input_name = name[: -len(suffix)]
            # Try common extensions
            for ext in [".bam", ".fastq.gz", ".fq.gz", ".fa.gz", ".fasta.gz"]:
                candidate = workflow_dir / (input_name + ext)
                if candidate.exists():
                    try:
                        return candidate.stat().st_size
                    except OSError:
                        continue

    # No input found
    return None