Skip to content

events

Event types and reader for Snakemake logger plugin integration.

This module provides types and utilities for reading real-time events from the snakemake-logger-plugin-snakesee plugin. Events provide more accurate and timely job status information than log parsing.

Classes

EventReader

Streaming reader for snakesee event files.

Reads events incrementally from a JSONL file, tracking the current position to only return new events on subsequent calls.

Attributes:

Name Type Description
event_file

Path to the event file.

Source code in snakesee/events.py
class EventReader:
    """Streaming reader for snakesee event files.

    Reads events incrementally from a JSONL file, tracking the current
    position to only return new events on subsequent calls.

    Attributes:
        event_file: Path to the event file.
    """

    def __init__(self, event_file: Path) -> None:
        """Initialize the event reader.

        Args:
            event_file: Path to the event file.
        """
        self.event_file = event_file
        self._offset: int = 0
        self._lock = threading.RLock()

    def read_new_events(self) -> list[SnakeseeEvent]:
        """Read events added since last call.

        Returns:
            List of new events. Empty list if no new events or file doesn't exist.
        """
        if not self.event_file.exists():
            return []

        # Get current offset under lock (minimize lock hold time)
        with self._lock:
            offset = self._offset

        # Perform file I/O without holding the lock to avoid blocking
        events: list[SnakeseeEvent] = []
        new_offset = offset
        try:
            with open(self.event_file, "r", encoding="utf-8") as f:
                f.seek(offset)
                for line in f:
                    line = line.strip()
                    if line:
                        try:
                            events.append(SnakeseeEvent.from_json(line))
                        except (orjson.JSONDecodeError, ValueError, KeyError, TypeError) as e:
                            # Skip malformed lines but log for debugging
                            logger.debug(
                                "Skipping malformed event line: %s... (%s)",
                                line[:50],
                                e,
                            )
                            continue
                new_offset = f.tell()
        except OSError as e:
            # File access error - log and return empty list
            logger.warning("Error reading event file %s: %s", self.event_file, e)
            return events

        # Update offset under lock
        with self._lock:
            self._offset = new_offset

        return events

    def reset(self) -> None:
        """Reset reader to start of file.

        Call this to re-read all events from the beginning.
        """
        with self._lock:
            self._offset = 0

    @property
    def has_events(self) -> bool:
        """Check if the event file exists and has content.

        Returns:
            True if event file exists and is non-empty.
        """
        if not self.event_file.exists():
            return False
        return self.event_file.stat().st_size > 0

Attributes

has_events property
has_events: bool

Check if the event file exists and has content.

Returns:

Type Description
bool

True if event file exists and is non-empty.

Functions

__init__
__init__(event_file: Path) -> None

Initialize the event reader.

Parameters:

Name Type Description Default
event_file Path

Path to the event file.

required
Source code in snakesee/events.py
def __init__(self, event_file: Path) -> None:
    """Initialize the event reader.

    Args:
        event_file: Path to the event file.
    """
    self.event_file = event_file
    self._offset: int = 0
    self._lock = threading.RLock()
read_new_events
read_new_events() -> list[SnakeseeEvent]

Read events added since last call.

Returns:

Type Description
list[SnakeseeEvent]

List of new events. Empty list if no new events or file doesn't exist.

Source code in snakesee/events.py
def read_new_events(self) -> list[SnakeseeEvent]:
    """Read events added since last call.

    Returns:
        List of new events. Empty list if no new events or file doesn't exist.
    """
    if not self.event_file.exists():
        return []

    # Get current offset under lock (minimize lock hold time)
    with self._lock:
        offset = self._offset

    # Perform file I/O without holding the lock to avoid blocking
    events: list[SnakeseeEvent] = []
    new_offset = offset
    try:
        with open(self.event_file, "r", encoding="utf-8") as f:
            f.seek(offset)
            for line in f:
                line = line.strip()
                if line:
                    try:
                        events.append(SnakeseeEvent.from_json(line))
                    except (orjson.JSONDecodeError, ValueError, KeyError, TypeError) as e:
                        # Skip malformed lines but log for debugging
                        logger.debug(
                            "Skipping malformed event line: %s... (%s)",
                            line[:50],
                            e,
                        )
                        continue
            new_offset = f.tell()
    except OSError as e:
        # File access error - log and return empty list
        logger.warning("Error reading event file %s: %s", self.event_file, e)
        return events

    # Update offset under lock
    with self._lock:
        self._offset = new_offset

    return events
reset
reset() -> None

Reset reader to start of file.

Call this to re-read all events from the beginning.

Source code in snakesee/events.py
def reset(self) -> None:
    """Reset reader to start of file.

    Call this to re-read all events from the beginning.
    """
    with self._lock:
        self._offset = 0

EventType

Bases: str, Enum

Event types from the snakesee logger plugin.

These mirror the event types defined in the logger plugin.

Source code in snakesee/events.py
class EventType(str, Enum):
    """Event types from the snakesee logger plugin.

    These mirror the event types defined in the logger plugin.
    """

    WORKFLOW_STARTED = "workflow_started"
    JOB_SUBMITTED = "job_submitted"
    JOB_STARTED = "job_started"
    JOB_FINISHED = "job_finished"
    JOB_ERROR = "job_error"
    PROGRESS = "progress"

SnakeseeEvent dataclass

A single event from the logger plugin.

This is a frozen dataclass to ensure events are immutable once parsed.

Attributes:

Name Type Description
event_type EventType

Type of the event.

timestamp float

Unix timestamp when the event occurred.

job_id int | None

Snakemake job ID (for job events).

rule_name str | None

Name of the rule (for job events).

wildcards tuple[tuple[str, str], ...] | None

Wildcard values for the job.

threads int | None

Number of threads allocated to the job.

resources tuple[tuple[str, Any], ...] | None

Resource requirements for the job.

input_files tuple[str, ...] | None

Tuple of input file paths.

output_files tuple[str, ...] | None

Tuple of output file paths.

duration float | None

Job duration in seconds (for finished/error events).

error_message str | None

Error message (for error events).

completed_jobs int | None

Number of completed jobs (for progress events).

total_jobs int | None

Total number of jobs (for progress events).

workflow_id str | None

Unique workflow identifier.

Source code in snakesee/events.py
@dataclass(frozen=True, slots=True)
class SnakeseeEvent:
    """A single event from the logger plugin.

    This is a frozen dataclass to ensure events are immutable once parsed.

    Attributes:
        event_type: Type of the event.
        timestamp: Unix timestamp when the event occurred.
        job_id: Snakemake job ID (for job events).
        rule_name: Name of the rule (for job events).
        wildcards: Wildcard values for the job.
        threads: Number of threads allocated to the job.
        resources: Resource requirements for the job.
        input_files: Tuple of input file paths.
        output_files: Tuple of output file paths.
        duration: Job duration in seconds (for finished/error events).
        error_message: Error message (for error events).
        completed_jobs: Number of completed jobs (for progress events).
        total_jobs: Total number of jobs (for progress events).
        workflow_id: Unique workflow identifier.
    """

    event_type: EventType
    timestamp: float
    job_id: int | None = None
    rule_name: str | None = None
    wildcards: tuple[tuple[str, str], ...] | None = None
    threads: int | None = None
    resources: tuple[tuple[str, Any], ...] | None = None
    input_files: tuple[str, ...] | None = None
    output_files: tuple[str, ...] | None = None
    duration: float | None = None
    error_message: str | None = None
    completed_jobs: int | None = None
    total_jobs: int | None = None
    workflow_id: str | None = None

    @classmethod
    def from_json(cls, json_str: str | bytes) -> "SnakeseeEvent":
        """Parse from JSON line.

        Args:
            json_str: JSON string or bytes to parse.

        Returns:
            Parsed SnakeseeEvent instance.

        Raises:
            ValueError: If the JSON is invalid or has an unknown event type.
            orjson.JSONDecodeError: If the JSON cannot be parsed.
        """
        data = orjson.loads(json_str)
        data["event_type"] = EventType(data["event_type"])

        # Convert dicts to tuples for frozen dataclass compatibility
        if "wildcards" in data and data["wildcards"] is not None:
            data["wildcards"] = tuple(sorted(data["wildcards"].items()))
        if "resources" in data and data["resources"] is not None:
            data["resources"] = tuple(sorted(data["resources"].items()))
        if "input_files" in data and data["input_files"] is not None:
            data["input_files"] = tuple(data["input_files"])
        if "output_files" in data and data["output_files"] is not None:
            data["output_files"] = tuple(data["output_files"])

        return cls(**data)

    @property
    def wildcards_dict(self) -> dict[str, str] | None:
        """Get wildcards as a dictionary.

        Returns:
            Wildcards as a dict, or None if not set.
        """
        if self.wildcards is None:
            return None
        return dict(self.wildcards)

Attributes

wildcards_dict property
wildcards_dict: dict[str, str] | None

Get wildcards as a dictionary.

Returns:

Type Description
dict[str, str] | None

Wildcards as a dict, or None if not set.

Functions

from_json classmethod
from_json(json_str: str | bytes) -> SnakeseeEvent

Parse from JSON line.

Parameters:

Name Type Description Default
json_str str | bytes

JSON string or bytes to parse.

required

Returns:

Type Description
SnakeseeEvent

Parsed SnakeseeEvent instance.

Raises:

Type Description
ValueError

If the JSON is invalid or has an unknown event type.

JSONDecodeError

If the JSON cannot be parsed.

Source code in snakesee/events.py
@classmethod
def from_json(cls, json_str: str | bytes) -> "SnakeseeEvent":
    """Parse from JSON line.

    Args:
        json_str: JSON string or bytes to parse.

    Returns:
        Parsed SnakeseeEvent instance.

    Raises:
        ValueError: If the JSON is invalid or has an unknown event type.
        orjson.JSONDecodeError: If the JSON cannot be parsed.
    """
    data = orjson.loads(json_str)
    data["event_type"] = EventType(data["event_type"])

    # Convert dicts to tuples for frozen dataclass compatibility
    if "wildcards" in data and data["wildcards"] is not None:
        data["wildcards"] = tuple(sorted(data["wildcards"].items()))
    if "resources" in data and data["resources"] is not None:
        data["resources"] = tuple(sorted(data["resources"].items()))
    if "input_files" in data and data["input_files"] is not None:
        data["input_files"] = tuple(data["input_files"])
    if "output_files" in data and data["output_files"] is not None:
        data["output_files"] = tuple(data["output_files"])

    return cls(**data)

Functions

get_event_file_path

get_event_file_path(workflow_dir: Path) -> Path

Get the path to the event file for a workflow.

Parameters:

Name Type Description Default
workflow_dir Path

Path to the workflow directory.

required

Returns:

Type Description
Path

Path to the event file.

Source code in snakesee/events.py
def get_event_file_path(workflow_dir: Path) -> Path:
    """Get the path to the event file for a workflow.

    Args:
        workflow_dir: Path to the workflow directory.

    Returns:
        Path to the event file.
    """
    return workflow_dir / EVENT_FILE_NAME