Skip to content

line_parser

Log line parsing with context tracking.

Classes

LogLineParser dataclass

Parses individual Snakemake log lines.

Maintains parsing context across lines to handle multi-line log entries where information spans multiple lines.

Source code in snakesee/parser/line_parser.py
@dataclass
class LogLineParser:
    """Parses individual Snakemake log lines.

    Maintains parsing context across lines to handle multi-line
    log entries where information spans multiple lines.
    """

    context: ParsingContext = field(default_factory=ParsingContext)

    def parse_line(self, line: str) -> list[ParseEvent]:
        """Parse a single log line and return structured events.

        Uses fast-path prefix checks to skip expensive regex operations
        for lines that can't possibly match.

        Updates internal context as needed for multi-line entries.
        May return multiple events when a pending error needs to be flushed.

        Args:
            line: Log line to parse.

        Returns:
            List of ParseEvents (may be empty, one, or two events).
        """
        line = line.rstrip("\n\r")
        events: list[ParseEvent] = []

        # Fast path: empty lines
        if not line:
            return events

        first_char = line[0]

        # Timestamp lines start with '[' - this ends error blocks
        if first_char == "[":
            if match := TIMESTAMP_PATTERN.match(line):
                # Flush any pending error before emitting timestamp
                if pending := self.context.get_pending_error():
                    events.append(pending)
                timestamp = _parse_timestamp(match.group(1))
                self.context.timestamp = timestamp
                events.append(ParseEvent(ParseEventType.TIMESTAMP, {"timestamp": timestamp}))
            return events

        # Indented lines start with space/tab.  In group/pipe job blocks,
        # rule starts and timestamps are indented by 4 spaces.
        if first_char in (" ", "\t"):
            return self._parse_indented_or_group_line(line, events)

        # Rule/checkpoint start - this ends error blocks
        # Matches: "rule X:", "localrule X:", "checkpoint X:", "localcheckpoint X:"
        if first_char in ("r", "l", "c") and (
            line.startswith(("rule ", "localrule ", "checkpoint ", "localcheckpoint "))
        ):
            if match := RULE_START_PATTERN.match(line):
                # Flush any pending error before emitting rule start
                if pending := self.context.get_pending_error():
                    events.append(pending)
                rule = match.group(1)
                self.context.reset_for_new_rule(rule)
                events.append(ParseEvent(ParseEventType.RULE_START, {"rule": rule}))
            return events

        # Finished job: "Finished job X" or "Finished jobid: X"
        if first_char == "F" and line.startswith("Finished "):
            if match := FINISHED_JOB_PATTERN.search(line):
                jobid = match.group(1)
                events.append(
                    ParseEvent(
                        ParseEventType.JOB_FINISHED,
                        {"jobid": jobid, "timestamp": self.context.timestamp},
                    )
                )
            return events

        # Error detection: "Error in rule X:" - starts a pending error block
        if first_char == "E" and line.startswith("Error in rule "):
            if match := ERROR_IN_RULE_PATTERN.search(line):
                # Flush any previous pending error before starting new one
                if pending := self.context.get_pending_error():
                    events.append(pending)
                rule = match.group(1)
                # Start pending error - we'll capture jobid/log from subsequent lines
                self.context.start_error_block(rule)
            return events

        # Progress line: "X of Y steps (Z%) done" - check with substring first
        if "steps" in line and "done" in line:
            if match := PROGRESS_PATTERN.search(line):
                completed = int(match.group(1))
                total = int(match.group(2))
                events.append(
                    ParseEvent(ParseEventType.PROGRESS, {"completed": completed, "total": total})
                )

        return events

    def flush_pending_error(self) -> ParseEvent | None:
        """Flush any pending error event.

        Call this at end of file or when needing to ensure all errors are emitted.

        Returns:
            ParseEvent for the pending error, or None if no pending error.
        """
        return self.context.get_pending_error()

    def _parse_indented_or_group_line(
        self, line: str, events: list[ParseEvent]
    ) -> list[ParseEvent]:
        """Parse indented lines: group-block elements or property lines.

        In group/pipe job blocks, rule starts and timestamps are indented by
        4 spaces.  Property lines are indented by 4 (normal) or 8 (group) spaces.

        Args:
            line: Indented log line starting with space/tab.
            events: Mutable list to append events to.

        Returns:
            The events list (same object passed in).
        """
        stripped = line.lstrip()
        if not stripped:
            return events

        first_stripped = stripped[0]

        # Indented timestamp: "    [Mon Jan  6 10:00:00 2026]"
        if first_stripped == "[":
            if match := TIMESTAMP_PATTERN.match(stripped):
                if pending := self.context.get_pending_error():
                    events.append(pending)
                timestamp = _parse_timestamp(match.group(1))
                self.context.timestamp = timestamp
                events.append(ParseEvent(ParseEventType.TIMESTAMP, {"timestamp": timestamp}))
            return events

        # Indented rule start: "    rule X:", "    localrule X:",
        # "    checkpoint X:", or "    localcheckpoint X:"
        if first_stripped in ("r", "l", "c") and stripped.startswith(
            ("rule ", "localrule ", "checkpoint ", "localcheckpoint ")
        ):
            if match := RULE_START_PATTERN.match(stripped):
                if pending := self.context.get_pending_error():
                    events.append(pending)
                rule = match.group(1)
                self.context.reset_for_new_rule(rule)
                events.append(ParseEvent(ParseEventType.RULE_START, {"rule": rule}))
            return events

        # Property lines (wildcards, threads, log, jobid)
        event = self._parse_indented_line(line)
        if event:
            events.append(event)
        return events

    def _parse_indented_line(self, line: str) -> ParseEvent | None:
        """Parse indented property lines (wildcards, threads, log, jobid).

        Args:
            line: Indented log line starting with space/tab.

        Returns:
            ParseEvent if line contains a recognized property, None otherwise.
        """
        stripped = line.lstrip()

        # Check each property type by prefix (faster than regex)
        if stripped.startswith("wildcards:"):
            value = stripped[10:].strip()  # len('wildcards:') = 10
            wildcards = _parse_wildcards(value)
            self.context.wildcards = wildcards
            # Also update error context if in error block
            if self.context.has_pending_error():
                self.context.error_wildcards = wildcards
            return ParseEvent(
                ParseEventType.WILDCARDS,
                {"wildcards": wildcards, "jobid": self.context.jobid},
            )

        if stripped.startswith("threads:"):
            value = stripped[8:].strip()  # len('threads:') = 8
            threads = _parse_positive_int(value, "threads")
            if threads is not None:
                self.context.threads = threads
                # Also update error context if in error block
                if self.context.has_pending_error():
                    self.context.error_threads = threads
                return ParseEvent(
                    ParseEventType.THREADS,
                    {"threads": threads, "jobid": self.context.jobid},
                )
            return None

        if stripped.startswith("log:"):
            log_path = stripped[4:].strip()  # len('log:') = 4
            self.context.log_path = log_path
            # Also update error context if in error block
            if self.context.has_pending_error():
                self.context.error_log_path = log_path
            return ParseEvent(
                ParseEventType.LOG_PATH,
                {"log_path": log_path, "jobid": self.context.jobid},
            )

        if stripped.startswith("jobid:"):
            jobid = stripped[6:].strip()  # len('jobid:') = 6
            self.context.jobid = jobid
            # Also update error context if in error block
            if self.context.has_pending_error():
                self.context.error_jobid = jobid
            return ParseEvent(
                ParseEventType.JOBID,
                {
                    "jobid": jobid,
                    "rule": self.context.rule,
                    "wildcards": self.context.wildcards,
                    "threads": self.context.threads,
                    "timestamp": self.context.timestamp,
                    "log_path": self.context.log_path,
                },
            )

        return None

    def reset(self) -> None:
        """Reset parsing context."""
        self.context = ParsingContext()

Functions

flush_pending_error
flush_pending_error() -> ParseEvent | None

Flush any pending error event.

Call this at end of file or when needing to ensure all errors are emitted.

Returns:

Type Description
ParseEvent | None

ParseEvent for the pending error, or None if no pending error.

Source code in snakesee/parser/line_parser.py
def flush_pending_error(self) -> ParseEvent | None:
    """Flush any pending error event.

    Call this at end of file or when needing to ensure all errors are emitted.

    Returns:
        ParseEvent for the pending error, or None if no pending error.
    """
    return self.context.get_pending_error()
parse_line
parse_line(line: str) -> list[ParseEvent]

Parse a single log line and return structured events.

Uses fast-path prefix checks to skip expensive regex operations for lines that can't possibly match.

Updates internal context as needed for multi-line entries. May return multiple events when a pending error needs to be flushed.

Parameters:

Name Type Description Default
line str

Log line to parse.

required

Returns:

Type Description
list[ParseEvent]

List of ParseEvents (may be empty, one, or two events).

Source code in snakesee/parser/line_parser.py
def parse_line(self, line: str) -> list[ParseEvent]:
    """Parse a single log line and return structured events.

    Uses fast-path prefix checks to skip expensive regex operations
    for lines that can't possibly match.

    Updates internal context as needed for multi-line entries.
    May return multiple events when a pending error needs to be flushed.

    Args:
        line: Log line to parse.

    Returns:
        List of ParseEvents (may be empty, one, or two events).
    """
    line = line.rstrip("\n\r")
    events: list[ParseEvent] = []

    # Fast path: empty lines
    if not line:
        return events

    first_char = line[0]

    # Timestamp lines start with '[' - this ends error blocks
    if first_char == "[":
        if match := TIMESTAMP_PATTERN.match(line):
            # Flush any pending error before emitting timestamp
            if pending := self.context.get_pending_error():
                events.append(pending)
            timestamp = _parse_timestamp(match.group(1))
            self.context.timestamp = timestamp
            events.append(ParseEvent(ParseEventType.TIMESTAMP, {"timestamp": timestamp}))
        return events

    # Indented lines start with space/tab.  In group/pipe job blocks,
    # rule starts and timestamps are indented by 4 spaces.
    if first_char in (" ", "\t"):
        return self._parse_indented_or_group_line(line, events)

    # Rule/checkpoint start - this ends error blocks
    # Matches: "rule X:", "localrule X:", "checkpoint X:", "localcheckpoint X:"
    if first_char in ("r", "l", "c") and (
        line.startswith(("rule ", "localrule ", "checkpoint ", "localcheckpoint "))
    ):
        if match := RULE_START_PATTERN.match(line):
            # Flush any pending error before emitting rule start
            if pending := self.context.get_pending_error():
                events.append(pending)
            rule = match.group(1)
            self.context.reset_for_new_rule(rule)
            events.append(ParseEvent(ParseEventType.RULE_START, {"rule": rule}))
        return events

    # Finished job: "Finished job X" or "Finished jobid: X"
    if first_char == "F" and line.startswith("Finished "):
        if match := FINISHED_JOB_PATTERN.search(line):
            jobid = match.group(1)
            events.append(
                ParseEvent(
                    ParseEventType.JOB_FINISHED,
                    {"jobid": jobid, "timestamp": self.context.timestamp},
                )
            )
        return events

    # Error detection: "Error in rule X:" - starts a pending error block
    if first_char == "E" and line.startswith("Error in rule "):
        if match := ERROR_IN_RULE_PATTERN.search(line):
            # Flush any previous pending error before starting new one
            if pending := self.context.get_pending_error():
                events.append(pending)
            rule = match.group(1)
            # Start pending error - we'll capture jobid/log from subsequent lines
            self.context.start_error_block(rule)
        return events

    # Progress line: "X of Y steps (Z%) done" - check with substring first
    if "steps" in line and "done" in line:
        if match := PROGRESS_PATTERN.search(line):
            completed = int(match.group(1))
            total = int(match.group(2))
            events.append(
                ParseEvent(ParseEventType.PROGRESS, {"completed": completed, "total": total})
            )

    return events
reset
reset() -> None

Reset parsing context.

Source code in snakesee/parser/line_parser.py
def reset(self) -> None:
    """Reset parsing context."""
    self.context = ParsingContext()

ParseEvent

Bases: NamedTuple

Result of parsing a log line.

Source code in snakesee/parser/line_parser.py
class ParseEvent(NamedTuple):
    """Result of parsing a log line."""

    event_type: ParseEventType
    data: dict[str, object]

ParseEventType

Bases: Enum

Types of events that can be parsed from a log line.

Source code in snakesee/parser/line_parser.py
class ParseEventType(Enum):
    """Types of events that can be parsed from a log line."""

    TIMESTAMP = "timestamp"
    PROGRESS = "progress"
    RULE_START = "rule_start"
    WILDCARDS = "wildcards"
    THREADS = "threads"
    LOG_PATH = "log_path"
    JOBID = "jobid"
    JOB_FINISHED = "job_finished"
    ERROR = "error"

ParsingContext dataclass

Current parsing state for multi-line log entries.

Snakemake logs use multi-line blocks where context from earlier lines (rule, wildcards, etc.) applies to later lines (jobid).

Source code in snakesee/parser/line_parser.py
@dataclass
class ParsingContext:
    """Current parsing state for multi-line log entries.

    Snakemake logs use multi-line blocks where context from earlier
    lines (rule, wildcards, etc.) applies to later lines (jobid).
    """

    rule: str | None = None
    jobid: str | None = None
    wildcards: dict[str, str] | None = None
    threads: int | None = None
    timestamp: float | None = None
    log_path: str | None = None

    # Pending error state - we defer ERROR emission until the error block is fully parsed
    # because jobid and log come AFTER "Error in rule X:" in the error block
    pending_error_rule: str | None = None
    error_jobid: str | None = None
    error_wildcards: dict[str, str] | None = None
    error_threads: int | None = None
    error_log_path: str | None = None

    def reset_for_new_rule(self, rule: str) -> None:
        """Reset context when entering a new rule block.

        Args:
            rule: Name of the new rule.
        """
        self.rule = rule
        self.jobid = None
        self.wildcards = None
        self.threads = None
        self.log_path = None

    def start_error_block(self, rule: str) -> None:
        """Start tracking a pending error block.

        Args:
            rule: Name of the rule that errored.
        """
        self.pending_error_rule = rule
        # Initialize with current context if rule matches
        if self.rule == rule:
            self.error_jobid = self.jobid
            self.error_wildcards = self.wildcards
            self.error_threads = self.threads
            self.error_log_path = self.log_path
        else:
            self.error_jobid = None
            self.error_wildcards = None
            self.error_threads = None
            self.error_log_path = None

    def get_pending_error(self) -> ParseEvent | None:
        """Get the pending error event and clear it.

        Returns:
            ParseEvent for the error, or None if no pending error.
        """
        if self.pending_error_rule is None:
            return None

        # Strip "(check log file(s) for error details)" suffix from error blocks
        log_path = self.error_log_path
        if log_path and " (check log file" in log_path:
            log_path = log_path.split(" (check log file")[0].strip()

        event = ParseEvent(
            ParseEventType.ERROR,
            {
                "rule": self.pending_error_rule,
                "jobid": self.error_jobid,
                "wildcards": self.error_wildcards,
                "threads": self.error_threads,
                "log_path": log_path,
            },
        )
        # Clear pending error state
        self.pending_error_rule = None
        self.error_jobid = None
        self.error_wildcards = None
        self.error_threads = None
        self.error_log_path = None
        return event

    def has_pending_error(self) -> bool:
        """Check if there's a pending error to emit."""
        return self.pending_error_rule is not None

Functions

get_pending_error
get_pending_error() -> ParseEvent | None

Get the pending error event and clear it.

Returns:

Type Description
ParseEvent | None

ParseEvent for the error, or None if no pending error.

Source code in snakesee/parser/line_parser.py
def get_pending_error(self) -> ParseEvent | None:
    """Get the pending error event and clear it.

    Returns:
        ParseEvent for the error, or None if no pending error.
    """
    if self.pending_error_rule is None:
        return None

    # Strip "(check log file(s) for error details)" suffix from error blocks
    log_path = self.error_log_path
    if log_path and " (check log file" in log_path:
        log_path = log_path.split(" (check log file")[0].strip()

    event = ParseEvent(
        ParseEventType.ERROR,
        {
            "rule": self.pending_error_rule,
            "jobid": self.error_jobid,
            "wildcards": self.error_wildcards,
            "threads": self.error_threads,
            "log_path": log_path,
        },
    )
    # Clear pending error state
    self.pending_error_rule = None
    self.error_jobid = None
    self.error_wildcards = None
    self.error_threads = None
    self.error_log_path = None
    return event
has_pending_error
has_pending_error() -> bool

Check if there's a pending error to emit.

Source code in snakesee/parser/line_parser.py
def has_pending_error(self) -> bool:
    """Check if there's a pending error to emit."""
    return self.pending_error_rule is not None
reset_for_new_rule
reset_for_new_rule(rule: str) -> None

Reset context when entering a new rule block.

Parameters:

Name Type Description Default
rule str

Name of the new rule.

required
Source code in snakesee/parser/line_parser.py
def reset_for_new_rule(self, rule: str) -> None:
    """Reset context when entering a new rule block.

    Args:
        rule: Name of the new rule.
    """
    self.rule = rule
    self.jobid = None
    self.wildcards = None
    self.threads = None
    self.log_path = None
start_error_block
start_error_block(rule: str) -> None

Start tracking a pending error block.

Parameters:

Name Type Description Default
rule str

Name of the rule that errored.

required
Source code in snakesee/parser/line_parser.py
def start_error_block(self, rule: str) -> None:
    """Start tracking a pending error block.

    Args:
        rule: Name of the rule that errored.
    """
    self.pending_error_rule = rule
    # Initialize with current context if rule matches
    if self.rule == rule:
        self.error_jobid = self.jobid
        self.error_wildcards = self.wildcards
        self.error_threads = self.threads
        self.error_log_path = self.log_path
    else:
        self.error_jobid = None
        self.error_wildcards = None
        self.error_threads = None
        self.error_log_path = None