Skip to content

plugins

Plugin system for tool-specific progress parsing.

This package provides a plugin system for parsing progress from tool-specific log files. Plugins can detect and parse output from common bioinformatics tools like BWA, samtools, STAR, etc.

Plugin Discovery

Plugins are discovered from three sources: 1. Built-in plugins shipped with snakesee 2. User plugins in ~/.snakesee/plugins/ or ~/.config/snakesee/plugins/ 3. Entry points registered by third-party packages

See Also
  • :mod:snakesee.plugins.loader for file-based plugin loading
  • :mod:snakesee.plugins.discovery for entry point discovery
  • :mod:snakesee.plugins.registry for plugin lookup functions

Classes

PluginMetadata dataclass

Validated metadata for a tool progress plugin.

This dataclass provides structured access to plugin metadata with validation. Use from_plugin() to create an instance from a plugin.

Attributes:

Name Type Description
name str

The tool name (e.g., 'bwa', 'samtools', 'star').

api_version int

The plugin API version this plugin supports.

patterns tuple[str, ...]

Command patterns that indicate this tool is being used.

description str

Optional description of what this plugin parses.

Raises:

Type Description
ValueError

If validation fails during construction.

Example

metadata = PluginMetadata.from_plugin(my_plugin) print(f"Plugin: {metadata.name} (API v{metadata.api_version})")

Source code in snakesee/plugins/base.py
@dataclass(frozen=True)
class PluginMetadata:
    """Validated metadata for a tool progress plugin.

    This dataclass provides structured access to plugin metadata with
    validation. Use `from_plugin()` to create an instance from a plugin.

    Attributes:
        name: The tool name (e.g., 'bwa', 'samtools', 'star').
        api_version: The plugin API version this plugin supports.
        patterns: Command patterns that indicate this tool is being used.
        description: Optional description of what this plugin parses.

    Raises:
        ValueError: If validation fails during construction.

    Example:
        >>> metadata = PluginMetadata.from_plugin(my_plugin)
        >>> print(f"Plugin: {metadata.name} (API v{metadata.api_version})")
    """

    name: str
    api_version: int = 1
    patterns: tuple[str, ...] = field(default_factory=tuple)
    description: str = ""

    def __post_init__(self) -> None:
        """Validate metadata fields."""
        if not self.name or not self.name.strip():
            raise ValueError("Plugin name cannot be empty")
        if self.api_version < 1:
            raise ValueError(f"Plugin API version must be >= 1, got {self.api_version}")

    @classmethod
    def from_plugin(cls, plugin: "ToolProgressPlugin") -> "PluginMetadata":
        """Create metadata from a plugin instance.

        Args:
            plugin: The plugin to extract metadata from.

        Returns:
            Validated PluginMetadata instance.

        Raises:
            ValueError: If plugin metadata is invalid.
            AttributeError: If plugin is missing required attributes.
        """
        name = plugin.tool_name
        api_version = getattr(plugin, "plugin_api_version", 1)
        patterns = tuple(getattr(plugin, "tool_patterns", [name]))
        description = getattr(plugin, "__doc__", "") or ""

        return cls(
            name=name,
            api_version=api_version,
            patterns=patterns,
            description=description.split("\n")[0] if description else "",
        )

    def is_compatible(self, current_api_version: int | None = None) -> bool:
        """Check if the plugin is compatible with the current API version.

        Args:
            current_api_version: The API version to check against.
                                Defaults to PLUGIN_API_VERSION.

        Returns:
            True if the plugin is compatible, False otherwise.
        """
        if current_api_version is None:
            current_api_version = PLUGIN_API_VERSION
        return self.api_version <= current_api_version

Functions

__post_init__
__post_init__() -> None

Validate metadata fields.

Source code in snakesee/plugins/base.py
def __post_init__(self) -> None:
    """Validate metadata fields."""
    if not self.name or not self.name.strip():
        raise ValueError("Plugin name cannot be empty")
    if self.api_version < 1:
        raise ValueError(f"Plugin API version must be >= 1, got {self.api_version}")
from_plugin classmethod
from_plugin(plugin: ToolProgressPlugin) -> PluginMetadata

Create metadata from a plugin instance.

Parameters:

Name Type Description Default
plugin ToolProgressPlugin

The plugin to extract metadata from.

required

Returns:

Type Description
PluginMetadata

Validated PluginMetadata instance.

Raises:

Type Description
ValueError

If plugin metadata is invalid.

AttributeError

If plugin is missing required attributes.

Source code in snakesee/plugins/base.py
@classmethod
def from_plugin(cls, plugin: "ToolProgressPlugin") -> "PluginMetadata":
    """Create metadata from a plugin instance.

    Args:
        plugin: The plugin to extract metadata from.

    Returns:
        Validated PluginMetadata instance.

    Raises:
        ValueError: If plugin metadata is invalid.
        AttributeError: If plugin is missing required attributes.
    """
    name = plugin.tool_name
    api_version = getattr(plugin, "plugin_api_version", 1)
    patterns = tuple(getattr(plugin, "tool_patterns", [name]))
    description = getattr(plugin, "__doc__", "") or ""

    return cls(
        name=name,
        api_version=api_version,
        patterns=patterns,
        description=description.split("\n")[0] if description else "",
    )
is_compatible
is_compatible(current_api_version: int | None = None) -> bool

Check if the plugin is compatible with the current API version.

Parameters:

Name Type Description Default
current_api_version int | None

The API version to check against. Defaults to PLUGIN_API_VERSION.

None

Returns:

Type Description
bool

True if the plugin is compatible, False otherwise.

Source code in snakesee/plugins/base.py
def is_compatible(self, current_api_version: int | None = None) -> bool:
    """Check if the plugin is compatible with the current API version.

    Args:
        current_api_version: The API version to check against.
                            Defaults to PLUGIN_API_VERSION.

    Returns:
        True if the plugin is compatible, False otherwise.
    """
    if current_api_version is None:
        current_api_version = PLUGIN_API_VERSION
    return self.api_version <= current_api_version

ToolProgress dataclass

Progress information extracted from a tool's log output.

Attributes:

Name Type Description
items_processed int

Number of items processed so far.

items_total int | None

Total number of items to process (None if unknown).

unit str

Unit of items (e.g., "reads", "alignments", "variants").

percent_complete float | None

Percentage complete (0-100), None if unknown.

estimated_remaining_seconds float | None

Estimated seconds remaining, None if unknown.

Source code in snakesee/plugins/base.py
@dataclass
class ToolProgress:
    """
    Progress information extracted from a tool's log output.

    Attributes:
        items_processed: Number of items processed so far.
        items_total: Total number of items to process (None if unknown).
        unit: Unit of items (e.g., "reads", "alignments", "variants").
        percent_complete: Percentage complete (0-100), None if unknown.
        estimated_remaining_seconds: Estimated seconds remaining, None if unknown.
    """

    items_processed: int
    items_total: int | None = None
    unit: str = "items"
    percent_complete: float | None = None
    estimated_remaining_seconds: float | None = None

    def __post_init__(self) -> None:
        """Calculate percent_complete if not provided but total is known."""
        if self.percent_complete is None and self.items_total and self.items_total > 0:
            self.percent_complete = min(100.0, (self.items_processed / self.items_total) * 100)

    @property
    def progress_str(self) -> str:
        """Human-readable progress string."""
        if self.items_total:
            return f"{self.items_processed:,}/{self.items_total:,} {self.unit}"
        return f"{self.items_processed:,} {self.unit}"

    @property
    def percent_str(self) -> str:
        """Human-readable percentage string."""
        if self.percent_complete is not None:
            return f"{self.percent_complete:.1f}%"
        return "?"

Attributes

percent_str property
percent_str: str

Human-readable percentage string.

progress_str property
progress_str: str

Human-readable progress string.

Functions

__post_init__
__post_init__() -> None

Calculate percent_complete if not provided but total is known.

Source code in snakesee/plugins/base.py
def __post_init__(self) -> None:
    """Calculate percent_complete if not provided but total is known."""
    if self.percent_complete is None and self.items_total and self.items_total > 0:
        self.percent_complete = min(100.0, (self.items_processed / self.items_total) * 100)

ToolProgressPlugin

Bases: ABC

Abstract base class for tool-specific progress parsers.

Subclasses implement parsing logic for specific bioinformatics tools (e.g., BWA, STAR, samtools) to extract progress information from logs.

Plugin Versioning

Plugins can declare which API version they support via the plugin_api_version property. If not specified, version 1 is assumed. Plugins requiring a newer API than the current version will be skipped.

Example implementation::

class BWAPlugin(ToolProgressPlugin):
    @property
    def tool_name(self) -> str:
        return "bwa"

    @property
    def tool_patterns(self) -> list[str]:
        return ["bwa mem", "bwa-mem2"]

    def can_parse(self, rule_name: str, log_content: str) -> bool:
        return "bwa" in rule_name.lower() or "[M::mem_" in log_content

    def parse_progress(self, log_content: str) -> ToolProgress | None:
        # Parse BWA-specific progress patterns
        ...
Source code in snakesee/plugins/base.py
class ToolProgressPlugin(ABC):
    """
    Abstract base class for tool-specific progress parsers.

    Subclasses implement parsing logic for specific bioinformatics tools
    (e.g., BWA, STAR, samtools) to extract progress information from logs.

    Plugin Versioning:
        Plugins can declare which API version they support via the
        ``plugin_api_version`` property. If not specified, version 1 is assumed.
        Plugins requiring a newer API than the current version will be skipped.

    Example implementation::

        class BWAPlugin(ToolProgressPlugin):
            @property
            def tool_name(self) -> str:
                return "bwa"

            @property
            def tool_patterns(self) -> list[str]:
                return ["bwa mem", "bwa-mem2"]

            def can_parse(self, rule_name: str, log_content: str) -> bool:
                return "bwa" in rule_name.lower() or "[M::mem_" in log_content

            def parse_progress(self, log_content: str) -> ToolProgress | None:
                # Parse BWA-specific progress patterns
                ...
    """

    @property
    def plugin_api_version(self) -> int:
        """
        The plugin API version this plugin was written for.

        Override to declare compatibility with a specific API version.
        If not overridden, version 1 is assumed.
        """
        return 1

    @property
    @abstractmethod
    def tool_name(self) -> str:
        """
        Identifier for this tool (e.g., 'bwa', 'samtools', 'star').

        Should be lowercase and match common tool naming conventions.
        """

    @property
    def tool_patterns(self) -> list[str]:
        """
        Common command patterns that indicate this tool is being used.

        Used for initial filtering before detailed log parsing.
        Override in subclasses for tool-specific patterns.
        """
        return [self.tool_name]

    @abstractmethod
    def can_parse(self, rule_name: str, log_content: str) -> bool:
        """
        Determine if this plugin can parse the given log content.

        Args:
            rule_name: Name of the Snakemake rule.
            log_content: Content of the rule's log file.

        Returns:
            True if this plugin can extract progress from this log.
        """

    @abstractmethod
    def parse_progress(self, log_content: str) -> ToolProgress | None:
        """
        Extract progress information from log content.

        Args:
            log_content: Content of the rule's log file.

        Returns:
            ToolProgress if progress could be extracted, None otherwise.
        """

    def parse_progress_from_file(self, log_path: Path) -> ToolProgress | None:
        """
        Extract progress from a log file.

        Args:
            log_path: Path to the log file.

        Returns:
            ToolProgress if progress could be extracted, None otherwise.
        """
        try:
            content = log_path.read_text(errors="ignore")
            return self.parse_progress(content)
        except OSError:
            return None

Attributes

plugin_api_version property
plugin_api_version: int

The plugin API version this plugin was written for.

Override to declare compatibility with a specific API version. If not overridden, version 1 is assumed.

tool_name abstractmethod property
tool_name: str

Identifier for this tool (e.g., 'bwa', 'samtools', 'star').

Should be lowercase and match common tool naming conventions.

tool_patterns property
tool_patterns: list[str]

Common command patterns that indicate this tool is being used.

Used for initial filtering before detailed log parsing. Override in subclasses for tool-specific patterns.

Functions

can_parse abstractmethod
can_parse(rule_name: str, log_content: str) -> bool

Determine if this plugin can parse the given log content.

Parameters:

Name Type Description Default
rule_name str

Name of the Snakemake rule.

required
log_content str

Content of the rule's log file.

required

Returns:

Type Description
bool

True if this plugin can extract progress from this log.

Source code in snakesee/plugins/base.py
@abstractmethod
def can_parse(self, rule_name: str, log_content: str) -> bool:
    """
    Determine if this plugin can parse the given log content.

    Args:
        rule_name: Name of the Snakemake rule.
        log_content: Content of the rule's log file.

    Returns:
        True if this plugin can extract progress from this log.
    """
parse_progress abstractmethod
parse_progress(log_content: str) -> ToolProgress | None

Extract progress information from log content.

Parameters:

Name Type Description Default
log_content str

Content of the rule's log file.

required

Returns:

Type Description
ToolProgress | None

ToolProgress if progress could be extracted, None otherwise.

Source code in snakesee/plugins/base.py
@abstractmethod
def parse_progress(self, log_content: str) -> ToolProgress | None:
    """
    Extract progress information from log content.

    Args:
        log_content: Content of the rule's log file.

    Returns:
        ToolProgress if progress could be extracted, None otherwise.
    """
parse_progress_from_file
parse_progress_from_file(log_path: Path) -> ToolProgress | None

Extract progress from a log file.

Parameters:

Name Type Description Default
log_path Path

Path to the log file.

required

Returns:

Type Description
ToolProgress | None

ToolProgress if progress could be extracted, None otherwise.

Source code in snakesee/plugins/base.py
def parse_progress_from_file(self, log_path: Path) -> ToolProgress | None:
    """
    Extract progress from a log file.

    Args:
        log_path: Path to the log file.

    Returns:
        ToolProgress if progress could be extracted, None otherwise.
    """
    try:
        content = log_path.read_text(errors="ignore")
        return self.parse_progress(content)
    except OSError:
        return None

Functions

discover_entry_point_plugins

discover_entry_point_plugins(force_reload: bool = False) -> list[ToolProgressPlugin]

Discover plugins registered via setuptools entry points.

Third-party packages can register plugins by adding an entry point in their pyproject.toml:

[project.entry-points."snakesee.plugins"]
my_tool = "my_package.plugins:MyToolPlugin"

Parameters:

Name Type Description Default
force_reload bool

If True, re-discover plugins even if cached.

False

Returns:

Type Description
list[ToolProgressPlugin]

List of discovered plugin instances.

Source code in snakesee/plugins/discovery.py
def discover_entry_point_plugins(
    force_reload: bool = False,
) -> list[ToolProgressPlugin]:
    """
    Discover plugins registered via setuptools entry points.

    Third-party packages can register plugins by adding an entry point
    in their pyproject.toml:

        [project.entry-points."snakesee.plugins"]
        my_tool = "my_package.plugins:MyToolPlugin"

    Args:
        force_reload: If True, re-discover plugins even if cached.

    Returns:
        List of discovered plugin instances.
    """
    global _entry_point_plugins
    global _entry_point_version_hash

    version_hash = _compute_version_hash()

    # Return cached plugins if valid and not forcing reload
    if (
        _entry_point_plugins is not None
        and not force_reload
        and _entry_point_version_hash == version_hash
    ):
        return _entry_point_plugins

    plugins: list[ToolProgressPlugin] = []

    try:
        # Python 3.10+ style
        eps = entry_points(group=ENTRY_POINT_GROUP)
        for ep in eps:
            try:
                plugin_class = ep.load()
                if isinstance(plugin_class, type) and issubclass(plugin_class, ToolProgressPlugin):
                    plugin_instance = plugin_class()
                    # Validate the plugin before adding it
                    metadata = validate_plugin(plugin_instance, f"entry_point:{ep.name}")
                    if metadata is not None:
                        plugins.append(plugin_instance)
            except (ImportError, TypeError, AttributeError) as e:
                logger.debug("Failed to load entry point plugin %s: %s", ep.name, e)
                continue
    except (TypeError, OSError) as e:
        logger.debug("Error discovering entry points: %s", e)

    _entry_point_plugins = plugins
    _entry_point_version_hash = version_hash
    return plugins

find_plugin_for_log

find_plugin_for_log(rule_name: str, log_content: str, plugins: list[ToolProgressPlugin] | None = None) -> ToolProgressPlugin | None

Find a plugin that can parse the given log content.

Parameters:

Name Type Description Default
rule_name str

Name of the Snakemake rule.

required
log_content str

Content of the rule's log file.

required
plugins list[ToolProgressPlugin] | None

List of plugins to search. Defaults to all plugins.

None

Returns:

Type Description
ToolProgressPlugin | None

A plugin that can parse this log, or None if no plugin matches.

Source code in snakesee/plugins/__init__.py
def find_plugin_for_log(
    rule_name: str,
    log_content: str,
    plugins: list[ToolProgressPlugin] | None = None,
) -> ToolProgressPlugin | None:
    """
    Find a plugin that can parse the given log content.

    Args:
        rule_name: Name of the Snakemake rule.
        log_content: Content of the rule's log file.
        plugins: List of plugins to search. Defaults to all plugins.

    Returns:
        A plugin that can parse this log, or None if no plugin matches.
    """
    if plugins is None:
        plugins = get_all_plugins()
    return _find_plugin_for_log(rule_name, log_content, plugins)

find_rule_log

find_rule_log(rule_name: str, job_id: int | str | None, workflow_dir: Path, wildcards: dict[str, str] | None = None) -> Path | None

Attempt to find the log file for a running rule.

Snakemake stores rule logs in various locations depending on the workflow configuration. This function searches common locations.

Parameters:

Name Type Description Default
rule_name str

Name of the rule.

required
job_id int | str | None

Snakemake job ID (if known).

required
workflow_dir Path

Workflow root directory.

required
wildcards dict[str, str] | None

Dictionary of wildcard names to values for the job.

None

Returns:

Type Description
Path | None

Path to the log file if found, None otherwise.

Source code in snakesee/plugins/__init__.py
def find_rule_log(
    rule_name: str,
    job_id: int | str | None,
    workflow_dir: Path,
    wildcards: dict[str, str] | None = None,
) -> Path | None:
    """
    Attempt to find the log file for a running rule.

    Snakemake stores rule logs in various locations depending on the
    workflow configuration. This function searches common locations.

    Args:
        rule_name: Name of the rule.
        job_id: Snakemake job ID (if known).
        workflow_dir: Workflow root directory.
        wildcards: Dictionary of wildcard names to values for the job.

    Returns:
        Path to the log file if found, None otherwise.
    """
    snakemake_dir = workflow_dir / ".snakemake"

    # Common log locations to search
    search_paths: list[Path] = []

    # First, try to find log path from .snakemake/metadata (most reliable)
    metadata_dir = snakemake_dir / "metadata"
    if metadata_dir.exists():
        for meta_file in metadata_dir.iterdir():
            try:
                data = orjson.loads(meta_file.read_bytes())
                if data.get("rule") == rule_name and data.get("log"):
                    # Get the most recent log file for this rule
                    for log_entry in data["log"]:
                        log_path = workflow_dir / log_entry
                        if log_path.exists():
                            search_paths.append(log_path)
            except (orjson.JSONDecodeError, OSError, KeyError):
                continue

    # .snakemake/log/ directory for rule-specific logs
    log_dir = snakemake_dir / "log"
    if log_dir.exists():
        # Look for logs matching the rule name
        search_paths.extend(log_dir.glob(f"*{rule_name}*"))
        search_paths.extend(log_dir.glob(f"*job{job_id}*"))

    # logs/ directory (common convention)
    logs_dir = workflow_dir / "logs"
    search_paths.extend(_search_log_dir(logs_dir, rule_name, wildcards))

    # log/ directory (another common convention)
    search_paths.extend(_search_log_dir(workflow_dir / "log", rule_name, wildcards))

    # Deduplicate and filter to existing files, then sort by mtime
    seen: set[Path] = set()
    valid_logs: list[tuple[Path, float]] = []
    for p in search_paths:
        if p in seen:
            continue
        seen.add(p)
        try:
            stat_result = p.stat()
            if stat.S_ISREG(stat_result.st_mode):
                valid_logs.append((p, stat_result.st_mtime))
        except OSError:
            continue

    if valid_logs:
        # Sort by mtime (newest first) and return
        valid_logs.sort(key=lambda x: x[1], reverse=True)
        return valid_logs[0][0]

    return None

get_all_plugins

get_all_plugins(include_user: bool = True) -> list[ToolProgressPlugin]

Get all available plugins (built-in, user file-based, and entry points).

Parameters:

Name Type Description Default
include_user bool

Whether to include user plugins (file-based and entry points).

True

Returns:

Type Description
list[ToolProgressPlugin]

Combined list of all plugins.

Source code in snakesee/plugins/__init__.py
def get_all_plugins(include_user: bool = True) -> list[ToolProgressPlugin]:
    """
    Get all available plugins (built-in, user file-based, and entry points).

    Args:
        include_user: Whether to include user plugins (file-based and entry points).

    Returns:
        Combined list of all plugins.
    """
    return _get_all_plugins(BUILTIN_PLUGINS, include_user)

load_user_plugins

load_user_plugins(plugin_dirs: list[Path] | None = None, force_reload: bool = False) -> list[ToolProgressPlugin]

Load custom user plugins from plugin directories.

User plugins are Python files in ~/.snakesee/plugins/ or ~/.config/snakesee/plugins/ that define classes inheriting from ToolProgressPlugin.

Parameters:

Name Type Description Default
plugin_dirs list[Path] | None

List of directories to search. Defaults to USER_PLUGIN_DIRS.

None
force_reload bool

If True, reload plugins even if already cached.

False

Returns:

Type Description
list[ToolProgressPlugin]

List of loaded user plugin instances.

Example plugin file (~/.snakesee/plugins/my_tool.py)::

from snakesee.plugins.base import ToolProgress, ToolProgressPlugin
import re

class MyToolPlugin(ToolProgressPlugin):
    @property
    def tool_name(self) -> str:
        return "mytool"

    def can_parse(self, rule_name: str, log_content: str) -> bool:
        return "mytool" in rule_name.lower()

    def parse_progress(self, log_content: str) -> ToolProgress | None:
        match = re.search(r"Processed (\d+) items", log_content)
        if match:
            return ToolProgress(items_processed=int(match.group(1)), unit="items")
        return None
Source code in snakesee/plugins/loader.py
def load_user_plugins(
    plugin_dirs: list[Path] | None = None,
    force_reload: bool = False,
) -> list[ToolProgressPlugin]:
    """
    Load custom user plugins from plugin directories.

    User plugins are Python files in ~/.snakesee/plugins/ or ~/.config/snakesee/plugins/
    that define classes inheriting from ToolProgressPlugin.

    Args:
        plugin_dirs: List of directories to search. Defaults to USER_PLUGIN_DIRS.
        force_reload: If True, reload plugins even if already cached.

    Returns:
        List of loaded user plugin instances.

    Example plugin file (~/.snakesee/plugins/my_tool.py)::

        from snakesee.plugins.base import ToolProgress, ToolProgressPlugin
        import re

        class MyToolPlugin(ToolProgressPlugin):
            @property
            def tool_name(self) -> str:
                return "mytool"

            def can_parse(self, rule_name: str, log_content: str) -> bool:
                return "mytool" in rule_name.lower()

            def parse_progress(self, log_content: str) -> ToolProgress | None:
                match = re.search(r"Processed (\\d+) items", log_content)
                if match:
                    return ToolProgress(items_processed=int(match.group(1)), unit="items")
                return None
    """
    global _user_plugins

    if _user_plugins is not None and not force_reload:
        return _user_plugins

    if plugin_dirs is None:
        plugin_dirs = USER_PLUGIN_DIRS

    loaded_plugins: list[ToolProgressPlugin] = []

    for plugin_dir in plugin_dirs:
        if not plugin_dir.exists() or not plugin_dir.is_dir():
            continue

        # Security checks
        _check_plugin_dir_security(plugin_dir)

        # Find all Python files in the plugin directory
        for plugin_file in plugin_dir.glob("*.py"):
            if plugin_file.name.startswith("_"):
                continue  # Skip private modules

            try:
                plugins = _load_plugins_from_file(plugin_file)
                loaded_plugins.extend(plugins)
            except (ImportError, SyntaxError, OSError) as e:
                logger.debug("Failed to load plugin from %s: %s", plugin_file, e)
                continue

    _user_plugins = loaded_plugins
    return loaded_plugins

parse_tool_progress

parse_tool_progress(rule_name: str, log_path: Path, plugins: list[ToolProgressPlugin] | None = None) -> ToolProgress | None

Parse progress from a rule's log file using available plugins.

Parameters:

Name Type Description Default
rule_name str

Name of the Snakemake rule.

required
log_path Path

Path to the rule's log file.

required
plugins list[ToolProgressPlugin] | None

List of plugins to use. Defaults to all plugins (built-in + user).

None

Returns:

Type Description
ToolProgress | None

ToolProgress if progress could be extracted, None otherwise.

Source code in snakesee/plugins/__init__.py
def parse_tool_progress(
    rule_name: str,
    log_path: Path,
    plugins: list[ToolProgressPlugin] | None = None,
) -> ToolProgress | None:
    """
    Parse progress from a rule's log file using available plugins.

    Args:
        rule_name: Name of the Snakemake rule.
        log_path: Path to the rule's log file.
        plugins: List of plugins to use. Defaults to all plugins (built-in + user).

    Returns:
        ToolProgress if progress could be extracted, None otherwise.
    """
    if plugins is None:
        plugins = get_all_plugins()
    return _parse_tool_progress(rule_name, log_path, plugins)

Modules

base

Base classes for tool-specific progress parsing plugins.

Classes

PluginMetadata dataclass

Validated metadata for a tool progress plugin.

This dataclass provides structured access to plugin metadata with validation. Use from_plugin() to create an instance from a plugin.

Attributes:

Name Type Description
name str

The tool name (e.g., 'bwa', 'samtools', 'star').

api_version int

The plugin API version this plugin supports.

patterns tuple[str, ...]

Command patterns that indicate this tool is being used.

description str

Optional description of what this plugin parses.

Raises:

Type Description
ValueError

If validation fails during construction.

Example

metadata = PluginMetadata.from_plugin(my_plugin) print(f"Plugin: {metadata.name} (API v{metadata.api_version})")

Source code in snakesee/plugins/base.py
@dataclass(frozen=True)
class PluginMetadata:
    """Validated metadata for a tool progress plugin.

    This dataclass provides structured access to plugin metadata with
    validation. Use `from_plugin()` to create an instance from a plugin.

    Attributes:
        name: The tool name (e.g., 'bwa', 'samtools', 'star').
        api_version: The plugin API version this plugin supports.
        patterns: Command patterns that indicate this tool is being used.
        description: Optional description of what this plugin parses.

    Raises:
        ValueError: If validation fails during construction.

    Example:
        >>> metadata = PluginMetadata.from_plugin(my_plugin)
        >>> print(f"Plugin: {metadata.name} (API v{metadata.api_version})")
    """

    name: str
    api_version: int = 1
    patterns: tuple[str, ...] = field(default_factory=tuple)
    description: str = ""

    def __post_init__(self) -> None:
        """Validate metadata fields."""
        if not self.name or not self.name.strip():
            raise ValueError("Plugin name cannot be empty")
        if self.api_version < 1:
            raise ValueError(f"Plugin API version must be >= 1, got {self.api_version}")

    @classmethod
    def from_plugin(cls, plugin: "ToolProgressPlugin") -> "PluginMetadata":
        """Create metadata from a plugin instance.

        Args:
            plugin: The plugin to extract metadata from.

        Returns:
            Validated PluginMetadata instance.

        Raises:
            ValueError: If plugin metadata is invalid.
            AttributeError: If plugin is missing required attributes.
        """
        name = plugin.tool_name
        api_version = getattr(plugin, "plugin_api_version", 1)
        patterns = tuple(getattr(plugin, "tool_patterns", [name]))
        description = getattr(plugin, "__doc__", "") or ""

        return cls(
            name=name,
            api_version=api_version,
            patterns=patterns,
            description=description.split("\n")[0] if description else "",
        )

    def is_compatible(self, current_api_version: int | None = None) -> bool:
        """Check if the plugin is compatible with the current API version.

        Args:
            current_api_version: The API version to check against.
                                Defaults to PLUGIN_API_VERSION.

        Returns:
            True if the plugin is compatible, False otherwise.
        """
        if current_api_version is None:
            current_api_version = PLUGIN_API_VERSION
        return self.api_version <= current_api_version
Functions
__post_init__
__post_init__() -> None

Validate metadata fields.

Source code in snakesee/plugins/base.py
def __post_init__(self) -> None:
    """Validate metadata fields."""
    if not self.name or not self.name.strip():
        raise ValueError("Plugin name cannot be empty")
    if self.api_version < 1:
        raise ValueError(f"Plugin API version must be >= 1, got {self.api_version}")
from_plugin classmethod
from_plugin(plugin: ToolProgressPlugin) -> PluginMetadata

Create metadata from a plugin instance.

Parameters:

Name Type Description Default
plugin ToolProgressPlugin

The plugin to extract metadata from.

required

Returns:

Type Description
PluginMetadata

Validated PluginMetadata instance.

Raises:

Type Description
ValueError

If plugin metadata is invalid.

AttributeError

If plugin is missing required attributes.

Source code in snakesee/plugins/base.py
@classmethod
def from_plugin(cls, plugin: "ToolProgressPlugin") -> "PluginMetadata":
    """Create metadata from a plugin instance.

    Args:
        plugin: The plugin to extract metadata from.

    Returns:
        Validated PluginMetadata instance.

    Raises:
        ValueError: If plugin metadata is invalid.
        AttributeError: If plugin is missing required attributes.
    """
    name = plugin.tool_name
    api_version = getattr(plugin, "plugin_api_version", 1)
    patterns = tuple(getattr(plugin, "tool_patterns", [name]))
    description = getattr(plugin, "__doc__", "") or ""

    return cls(
        name=name,
        api_version=api_version,
        patterns=patterns,
        description=description.split("\n")[0] if description else "",
    )
is_compatible
is_compatible(current_api_version: int | None = None) -> bool

Check if the plugin is compatible with the current API version.

Parameters:

Name Type Description Default
current_api_version int | None

The API version to check against. Defaults to PLUGIN_API_VERSION.

None

Returns:

Type Description
bool

True if the plugin is compatible, False otherwise.

Source code in snakesee/plugins/base.py
def is_compatible(self, current_api_version: int | None = None) -> bool:
    """Check if the plugin is compatible with the current API version.

    Args:
        current_api_version: The API version to check against.
                            Defaults to PLUGIN_API_VERSION.

    Returns:
        True if the plugin is compatible, False otherwise.
    """
    if current_api_version is None:
        current_api_version = PLUGIN_API_VERSION
    return self.api_version <= current_api_version
ToolProgress dataclass

Progress information extracted from a tool's log output.

Attributes:

Name Type Description
items_processed int

Number of items processed so far.

items_total int | None

Total number of items to process (None if unknown).

unit str

Unit of items (e.g., "reads", "alignments", "variants").

percent_complete float | None

Percentage complete (0-100), None if unknown.

estimated_remaining_seconds float | None

Estimated seconds remaining, None if unknown.

Source code in snakesee/plugins/base.py
@dataclass
class ToolProgress:
    """
    Progress information extracted from a tool's log output.

    Attributes:
        items_processed: Number of items processed so far.
        items_total: Total number of items to process (None if unknown).
        unit: Unit of items (e.g., "reads", "alignments", "variants").
        percent_complete: Percentage complete (0-100), None if unknown.
        estimated_remaining_seconds: Estimated seconds remaining, None if unknown.
    """

    items_processed: int
    items_total: int | None = None
    unit: str = "items"
    percent_complete: float | None = None
    estimated_remaining_seconds: float | None = None

    def __post_init__(self) -> None:
        """Calculate percent_complete if not provided but total is known."""
        if self.percent_complete is None and self.items_total and self.items_total > 0:
            self.percent_complete = min(100.0, (self.items_processed / self.items_total) * 100)

    @property
    def progress_str(self) -> str:
        """Human-readable progress string."""
        if self.items_total:
            return f"{self.items_processed:,}/{self.items_total:,} {self.unit}"
        return f"{self.items_processed:,} {self.unit}"

    @property
    def percent_str(self) -> str:
        """Human-readable percentage string."""
        if self.percent_complete is not None:
            return f"{self.percent_complete:.1f}%"
        return "?"
Attributes
percent_str property
percent_str: str

Human-readable percentage string.

progress_str property
progress_str: str

Human-readable progress string.

Functions
__post_init__
__post_init__() -> None

Calculate percent_complete if not provided but total is known.

Source code in snakesee/plugins/base.py
def __post_init__(self) -> None:
    """Calculate percent_complete if not provided but total is known."""
    if self.percent_complete is None and self.items_total and self.items_total > 0:
        self.percent_complete = min(100.0, (self.items_processed / self.items_total) * 100)
ToolProgressPlugin

Bases: ABC

Abstract base class for tool-specific progress parsers.

Subclasses implement parsing logic for specific bioinformatics tools (e.g., BWA, STAR, samtools) to extract progress information from logs.

Plugin Versioning

Plugins can declare which API version they support via the plugin_api_version property. If not specified, version 1 is assumed. Plugins requiring a newer API than the current version will be skipped.

Example implementation::

class BWAPlugin(ToolProgressPlugin):
    @property
    def tool_name(self) -> str:
        return "bwa"

    @property
    def tool_patterns(self) -> list[str]:
        return ["bwa mem", "bwa-mem2"]

    def can_parse(self, rule_name: str, log_content: str) -> bool:
        return "bwa" in rule_name.lower() or "[M::mem_" in log_content

    def parse_progress(self, log_content: str) -> ToolProgress | None:
        # Parse BWA-specific progress patterns
        ...
Source code in snakesee/plugins/base.py
class ToolProgressPlugin(ABC):
    """
    Abstract base class for tool-specific progress parsers.

    Subclasses implement parsing logic for specific bioinformatics tools
    (e.g., BWA, STAR, samtools) to extract progress information from logs.

    Plugin Versioning:
        Plugins can declare which API version they support via the
        ``plugin_api_version`` property. If not specified, version 1 is assumed.
        Plugins requiring a newer API than the current version will be skipped.

    Example implementation::

        class BWAPlugin(ToolProgressPlugin):
            @property
            def tool_name(self) -> str:
                return "bwa"

            @property
            def tool_patterns(self) -> list[str]:
                return ["bwa mem", "bwa-mem2"]

            def can_parse(self, rule_name: str, log_content: str) -> bool:
                return "bwa" in rule_name.lower() or "[M::mem_" in log_content

            def parse_progress(self, log_content: str) -> ToolProgress | None:
                # Parse BWA-specific progress patterns
                ...
    """

    @property
    def plugin_api_version(self) -> int:
        """
        The plugin API version this plugin was written for.

        Override to declare compatibility with a specific API version.
        If not overridden, version 1 is assumed.
        """
        return 1

    @property
    @abstractmethod
    def tool_name(self) -> str:
        """
        Identifier for this tool (e.g., 'bwa', 'samtools', 'star').

        Should be lowercase and match common tool naming conventions.
        """

    @property
    def tool_patterns(self) -> list[str]:
        """
        Common command patterns that indicate this tool is being used.

        Used for initial filtering before detailed log parsing.
        Override in subclasses for tool-specific patterns.
        """
        return [self.tool_name]

    @abstractmethod
    def can_parse(self, rule_name: str, log_content: str) -> bool:
        """
        Determine if this plugin can parse the given log content.

        Args:
            rule_name: Name of the Snakemake rule.
            log_content: Content of the rule's log file.

        Returns:
            True if this plugin can extract progress from this log.
        """

    @abstractmethod
    def parse_progress(self, log_content: str) -> ToolProgress | None:
        """
        Extract progress information from log content.

        Args:
            log_content: Content of the rule's log file.

        Returns:
            ToolProgress if progress could be extracted, None otherwise.
        """

    def parse_progress_from_file(self, log_path: Path) -> ToolProgress | None:
        """
        Extract progress from a log file.

        Args:
            log_path: Path to the log file.

        Returns:
            ToolProgress if progress could be extracted, None otherwise.
        """
        try:
            content = log_path.read_text(errors="ignore")
            return self.parse_progress(content)
        except OSError:
            return None
Attributes
plugin_api_version property
plugin_api_version: int

The plugin API version this plugin was written for.

Override to declare compatibility with a specific API version. If not overridden, version 1 is assumed.

tool_name abstractmethod property
tool_name: str

Identifier for this tool (e.g., 'bwa', 'samtools', 'star').

Should be lowercase and match common tool naming conventions.

tool_patterns property
tool_patterns: list[str]

Common command patterns that indicate this tool is being used.

Used for initial filtering before detailed log parsing. Override in subclasses for tool-specific patterns.

Functions
can_parse abstractmethod
can_parse(rule_name: str, log_content: str) -> bool

Determine if this plugin can parse the given log content.

Parameters:

Name Type Description Default
rule_name str

Name of the Snakemake rule.

required
log_content str

Content of the rule's log file.

required

Returns:

Type Description
bool

True if this plugin can extract progress from this log.

Source code in snakesee/plugins/base.py
@abstractmethod
def can_parse(self, rule_name: str, log_content: str) -> bool:
    """
    Determine if this plugin can parse the given log content.

    Args:
        rule_name: Name of the Snakemake rule.
        log_content: Content of the rule's log file.

    Returns:
        True if this plugin can extract progress from this log.
    """
parse_progress abstractmethod
parse_progress(log_content: str) -> ToolProgress | None

Extract progress information from log content.

Parameters:

Name Type Description Default
log_content str

Content of the rule's log file.

required

Returns:

Type Description
ToolProgress | None

ToolProgress if progress could be extracted, None otherwise.

Source code in snakesee/plugins/base.py
@abstractmethod
def parse_progress(self, log_content: str) -> ToolProgress | None:
    """
    Extract progress information from log content.

    Args:
        log_content: Content of the rule's log file.

    Returns:
        ToolProgress if progress could be extracted, None otherwise.
    """
parse_progress_from_file
parse_progress_from_file(log_path: Path) -> ToolProgress | None

Extract progress from a log file.

Parameters:

Name Type Description Default
log_path Path

Path to the log file.

required

Returns:

Type Description
ToolProgress | None

ToolProgress if progress could be extracted, None otherwise.

Source code in snakesee/plugins/base.py
def parse_progress_from_file(self, log_path: Path) -> ToolProgress | None:
    """
    Extract progress from a log file.

    Args:
        log_path: Path to the log file.

    Returns:
        ToolProgress if progress could be extracted, None otherwise.
    """
    try:
        content = log_path.read_text(errors="ignore")
        return self.parse_progress(content)
    except OSError:
        return None

bwa

Plugin for BWA aligner progress parsing.

Classes

BWAPlugin

Bases: ToolProgressPlugin

Progress parser for BWA (Burrows-Wheeler Aligner).

Parses BWA mem/mem2 progress output which shows processed reads.

Example BWA output:: [M::bwa_idx_load_from_disk] read 0 ALT contigs [M::process] read 10000 sequences (1500000 bp)... [M::mem_pestat] skip orientation FF as there are not enough pairs [M::mem_process_seqs] Processed 10000 reads in 1.234 CPU sec [M::main] Real time: 2.345 sec; CPU: 1.234 sec

Source code in snakesee/plugins/bwa.py
class BWAPlugin(ToolProgressPlugin):
    """
    Progress parser for BWA (Burrows-Wheeler Aligner).

    Parses BWA mem/mem2 progress output which shows processed reads.

    Example BWA output::
        [M::bwa_idx_load_from_disk] read 0 ALT contigs
        [M::process] read 10000 sequences (1500000 bp)...
        [M::mem_pestat] skip orientation FF as there are not enough pairs
        [M::mem_process_seqs] Processed 10000 reads in 1.234 CPU sec
        [M::main] Real time: 2.345 sec; CPU: 1.234 sec
    """

    # Pattern: [M::mem_process_seqs] Processed 10000 reads in 1.234 CPU sec
    PROCESSED_PATTERN = re.compile(r"\[M::mem_process_seqs\]\s+Processed\s+(\d+)\s+reads")

    # Pattern: [M::process] read 10000 sequences (1500000 bp)...
    READ_PATTERN = re.compile(r"\[M::process\]\s+read\s+(\d+)\s+sequences")

    @property
    def tool_name(self) -> str:
        return "bwa"

    @property
    def tool_patterns(self) -> list[str]:
        return ["bwa mem", "bwa-mem2", "bwa aln"]

    def can_parse(self, rule_name: str, log_content: str) -> bool:
        """Check if this looks like BWA output."""
        # Check rule name
        if "bwa" in rule_name.lower():
            return True
        # Check for BWA-specific markers in log
        return "[M::bwa_" in log_content or "[M::mem_" in log_content

    def parse_progress(self, log_content: str) -> ToolProgress | None:
        """Extract progress from BWA log output."""
        total_processed = 0

        # Find all "Processed X reads" lines and sum them
        for match in self.PROCESSED_PATTERN.finditer(log_content):
            total_processed += int(match.group(1))

        # Also check for "read X sequences" pattern
        if total_processed == 0:
            for match in self.READ_PATTERN.finditer(log_content):
                total_processed += int(match.group(1))

        if total_processed > 0:
            return ToolProgress(
                items_processed=total_processed,
                items_total=None,  # BWA doesn't report total
                unit="reads",
            )

        return None
Functions
can_parse
can_parse(rule_name: str, log_content: str) -> bool

Check if this looks like BWA output.

Source code in snakesee/plugins/bwa.py
def can_parse(self, rule_name: str, log_content: str) -> bool:
    """Check if this looks like BWA output."""
    # Check rule name
    if "bwa" in rule_name.lower():
        return True
    # Check for BWA-specific markers in log
    return "[M::bwa_" in log_content or "[M::mem_" in log_content
parse_progress
parse_progress(log_content: str) -> ToolProgress | None

Extract progress from BWA log output.

Source code in snakesee/plugins/bwa.py
def parse_progress(self, log_content: str) -> ToolProgress | None:
    """Extract progress from BWA log output."""
    total_processed = 0

    # Find all "Processed X reads" lines and sum them
    for match in self.PROCESSED_PATTERN.finditer(log_content):
        total_processed += int(match.group(1))

    # Also check for "read X sequences" pattern
    if total_processed == 0:
        for match in self.READ_PATTERN.finditer(log_content):
            total_processed += int(match.group(1))

    if total_processed > 0:
        return ToolProgress(
            items_processed=total_processed,
            items_total=None,  # BWA doesn't report total
            unit="reads",
        )

    return None

discovery

Entry point-based plugin discovery.

This module handles discovering plugins registered via setuptools entry points. Third-party packages can register plugins in their pyproject.toml.

Classes

Functions

clear_discovery_cache
clear_discovery_cache() -> None

Clear the cached entry point plugins, forcing a rediscovery on next access.

Source code in snakesee/plugins/discovery.py
def clear_discovery_cache() -> None:
    """Clear the cached entry point plugins, forcing a rediscovery on next access."""
    global _entry_point_plugins
    global _entry_point_version_hash
    _entry_point_plugins = None
    _entry_point_version_hash = 0
discover_entry_point_plugins
discover_entry_point_plugins(force_reload: bool = False) -> list[ToolProgressPlugin]

Discover plugins registered via setuptools entry points.

Third-party packages can register plugins by adding an entry point in their pyproject.toml:

[project.entry-points."snakesee.plugins"]
my_tool = "my_package.plugins:MyToolPlugin"

Parameters:

Name Type Description Default
force_reload bool

If True, re-discover plugins even if cached.

False

Returns:

Type Description
list[ToolProgressPlugin]

List of discovered plugin instances.

Source code in snakesee/plugins/discovery.py
def discover_entry_point_plugins(
    force_reload: bool = False,
) -> list[ToolProgressPlugin]:
    """
    Discover plugins registered via setuptools entry points.

    Third-party packages can register plugins by adding an entry point
    in their pyproject.toml:

        [project.entry-points."snakesee.plugins"]
        my_tool = "my_package.plugins:MyToolPlugin"

    Args:
        force_reload: If True, re-discover plugins even if cached.

    Returns:
        List of discovered plugin instances.
    """
    global _entry_point_plugins
    global _entry_point_version_hash

    version_hash = _compute_version_hash()

    # Return cached plugins if valid and not forcing reload
    if (
        _entry_point_plugins is not None
        and not force_reload
        and _entry_point_version_hash == version_hash
    ):
        return _entry_point_plugins

    plugins: list[ToolProgressPlugin] = []

    try:
        # Python 3.10+ style
        eps = entry_points(group=ENTRY_POINT_GROUP)
        for ep in eps:
            try:
                plugin_class = ep.load()
                if isinstance(plugin_class, type) and issubclass(plugin_class, ToolProgressPlugin):
                    plugin_instance = plugin_class()
                    # Validate the plugin before adding it
                    metadata = validate_plugin(plugin_instance, f"entry_point:{ep.name}")
                    if metadata is not None:
                        plugins.append(plugin_instance)
            except (ImportError, TypeError, AttributeError) as e:
                logger.debug("Failed to load entry point plugin %s: %s", ep.name, e)
                continue
    except (TypeError, OSError) as e:
        logger.debug("Error discovering entry points: %s", e)

    _entry_point_plugins = plugins
    _entry_point_version_hash = version_hash
    return plugins

fastp

Plugin for fastp QC tool progress parsing.

Classes

FastpPlugin

Bases: ToolProgressPlugin

Progress parser for fastp (FASTQ preprocessor).

Parses fastp progress output which shows processed reads.

Example fastp output:: Read1 before filtering: total reads: 10000000 total bases: 1500000000 ... Filtering result: reads passed filter: 9800000 reads failed due to low quality: 100000

Source code in snakesee/plugins/fastp.py
class FastpPlugin(ToolProgressPlugin):
    """
    Progress parser for fastp (FASTQ preprocessor).

    Parses fastp progress output which shows processed reads.

    Example fastp output::
        Read1 before filtering:
        total reads: 10000000
        total bases: 1500000000
        ...
        Filtering result:
        reads passed filter: 9800000
        reads failed due to low quality: 100000
    """

    # Pattern: total reads: 10000000
    TOTAL_READS_PATTERN = re.compile(r"total reads:\s+(\d+)")

    # Pattern: reads passed filter: 9800000
    PASSED_PATTERN = re.compile(r"reads passed filter:\s+(\d+)")

    # Progress pattern: Processing 50.00% of reads
    PROGRESS_PATTERN = re.compile(r"Processing\s+([\d.]+)%")

    @property
    def tool_name(self) -> str:
        return "fastp"

    @property
    def tool_patterns(self) -> list[str]:
        return ["fastp"]

    def can_parse(self, rule_name: str, log_content: str) -> bool:
        """Check if this looks like fastp output."""
        if "fastp" in rule_name.lower():
            return True
        return "fastp" in log_content.lower() or "Read1 before filtering" in log_content

    def parse_progress(self, log_content: str) -> ToolProgress | None:
        """Extract progress from fastp log output."""
        # Check for explicit progress percentage
        progress_matches = list(self.PROGRESS_PATTERN.finditer(log_content))
        if progress_matches:
            last_progress = float(progress_matches[-1].group(1))
            return ToolProgress(
                items_processed=int(last_progress),
                items_total=100,
                unit="%",
                percent_complete=last_progress,
            )

        # Look for total reads and passed reads
        total_match = self.TOTAL_READS_PATTERN.search(log_content)
        passed_match = self.PASSED_PATTERN.search(log_content)

        if total_match and passed_match:
            total = int(total_match.group(1))
            passed = int(passed_match.group(1))
            return ToolProgress(
                items_processed=passed,
                items_total=total,
                unit="reads",
            )

        if total_match:
            total = int(total_match.group(1))
            return ToolProgress(
                items_processed=total,
                items_total=None,
                unit="reads",
            )

        return None
Functions
can_parse
can_parse(rule_name: str, log_content: str) -> bool

Check if this looks like fastp output.

Source code in snakesee/plugins/fastp.py
def can_parse(self, rule_name: str, log_content: str) -> bool:
    """Check if this looks like fastp output."""
    if "fastp" in rule_name.lower():
        return True
    return "fastp" in log_content.lower() or "Read1 before filtering" in log_content
parse_progress
parse_progress(log_content: str) -> ToolProgress | None

Extract progress from fastp log output.

Source code in snakesee/plugins/fastp.py
def parse_progress(self, log_content: str) -> ToolProgress | None:
    """Extract progress from fastp log output."""
    # Check for explicit progress percentage
    progress_matches = list(self.PROGRESS_PATTERN.finditer(log_content))
    if progress_matches:
        last_progress = float(progress_matches[-1].group(1))
        return ToolProgress(
            items_processed=int(last_progress),
            items_total=100,
            unit="%",
            percent_complete=last_progress,
        )

    # Look for total reads and passed reads
    total_match = self.TOTAL_READS_PATTERN.search(log_content)
    passed_match = self.PASSED_PATTERN.search(log_content)

    if total_match and passed_match:
        total = int(total_match.group(1))
        passed = int(passed_match.group(1))
        return ToolProgress(
            items_processed=passed,
            items_total=total,
            unit="reads",
        )

    if total_match:
        total = int(total_match.group(1))
        return ToolProgress(
            items_processed=total,
            items_total=None,
            unit="reads",
        )

    return None

fgbio

Plugin for fgbio tool progress parsing.

Classes

FgbioPlugin

Bases: ToolProgressPlugin

Progress parser for fgbio tools.

fgbio is a toolkit for working with genomic and high throughput sequencing data. It uses HTSJDK's ProgressLogger which outputs messages like:

[INFO] Processed 1,000,000 records. Elapsed time: 00:01:30s.
[progress] Read 5000000 records from BAM file.
[INFO] Grouped 1234567 records into 123456 read pairs.

This plugin detects fgbio by rule name or log content patterns.

Source code in snakesee/plugins/fgbio.py
class FgbioPlugin(ToolProgressPlugin):
    """
    Progress parser for fgbio tools.

    fgbio is a toolkit for working with genomic and high throughput sequencing data.
    It uses HTSJDK's ProgressLogger which outputs messages like:

        [INFO] Processed 1,000,000 records. Elapsed time: 00:01:30s.
        [progress] Read 5000000 records from BAM file.
        [INFO] Grouped 1234567 records into 123456 read pairs.

    This plugin detects fgbio by rule name or log content patterns.
    """

    # Pattern: "Processed 1,000,000 records" (HTSJDK ProgressLogger format)
    PROCESSED_PATTERN = re.compile(
        r"[Pp]rocessed\s+([\d,]+)\s+(\w+)",
        re.IGNORECASE,
    )

    # Pattern: "Read 5000000 records from BAM"
    READ_PATTERN = re.compile(
        r"[Rr]ead\s+([\d,]+)\s+(\w+)",
        re.IGNORECASE,
    )

    # Pattern: "Grouped 1234567 records into X"
    GROUPED_PATTERN = re.compile(
        r"[Gg]rouped\s+([\d,]+)\s+(\w+)",
        re.IGNORECASE,
    )

    # Pattern: "Wrote 1000000 records"
    WROTE_PATTERN = re.compile(
        r"[Ww]rote\s+([\d,]+)\s+(\w+)",
        re.IGNORECASE,
    )

    # Pattern: "Finished. Processed X records in Y seconds."
    FINISHED_PATTERN = re.compile(
        r"[Ff]inished.*?[Pp]rocessed\s+([\d,]+)\s+(\w+)",
        re.IGNORECASE,
    )

    @property
    def tool_name(self) -> str:
        return "fgbio"

    @property
    def tool_patterns(self) -> list[str]:
        return ["fgbio", "CallMolecularConsensusReads", "GroupReadsByUmi", "FilterConsensusReads"]

    def can_parse(self, rule_name: str, log_content: str) -> bool:
        """Check if this looks like fgbio output."""
        rule_lower = rule_name.lower()
        if "fgbio" in rule_lower:
            return True
        # Common fgbio tool names in rules
        fgbio_tools = [
            "callmolecularconsensusreads",
            "groupreadsbyumi",
            "filterconsensusreads",
            "callduplexxxconsensusreads",
            "annotatebamwithumi",
            "correctumis",
            "extractumisfrombam",
            "demuxfastqs",
            "trimfastq",
        ]
        if any(tool in rule_lower for tool in fgbio_tools):
            return True
        # Check log content for fgbio signatures
        if "com.fulcrumgenomics" in log_content:
            return True
        if "fgbio" in log_content.lower() and "Processed" in log_content:
            return True
        return False

    def parse_progress(self, log_content: str) -> ToolProgress | None:
        """Extract progress from fgbio log output."""
        # Try different patterns in order of preference
        patterns = [
            (self.FINISHED_PATTERN, "finished"),
            (self.PROCESSED_PATTERN, "processed"),
            (self.READ_PATTERN, "read"),
            (self.GROUPED_PATTERN, "grouped"),
            (self.WROTE_PATTERN, "wrote"),
        ]

        best_count = 0
        best_unit = "records"

        for pattern, _ in patterns:
            matches = list(pattern.finditer(log_content))
            if matches:
                # Get the last (most recent) match
                last_match = matches[-1]
                count_str = last_match.group(1).replace(",", "")
                count = int(count_str)
                unit = last_match.group(2).lower()

                # Normalize unit names
                if unit in ("record", "records"):
                    unit = "records"
                elif unit in ("read", "reads"):
                    unit = "reads"
                elif unit in ("pair", "pairs"):
                    unit = "read pairs"

                if count > best_count:
                    best_count = count
                    best_unit = unit

        if best_count > 0:
            return ToolProgress(
                items_processed=best_count,
                items_total=None,
                unit=best_unit,
            )

        return None
Functions
can_parse
can_parse(rule_name: str, log_content: str) -> bool

Check if this looks like fgbio output.

Source code in snakesee/plugins/fgbio.py
def can_parse(self, rule_name: str, log_content: str) -> bool:
    """Check if this looks like fgbio output."""
    rule_lower = rule_name.lower()
    if "fgbio" in rule_lower:
        return True
    # Common fgbio tool names in rules
    fgbio_tools = [
        "callmolecularconsensusreads",
        "groupreadsbyumi",
        "filterconsensusreads",
        "callduplexxxconsensusreads",
        "annotatebamwithumi",
        "correctumis",
        "extractumisfrombam",
        "demuxfastqs",
        "trimfastq",
    ]
    if any(tool in rule_lower for tool in fgbio_tools):
        return True
    # Check log content for fgbio signatures
    if "com.fulcrumgenomics" in log_content:
        return True
    if "fgbio" in log_content.lower() and "Processed" in log_content:
        return True
    return False
parse_progress
parse_progress(log_content: str) -> ToolProgress | None

Extract progress from fgbio log output.

Source code in snakesee/plugins/fgbio.py
def parse_progress(self, log_content: str) -> ToolProgress | None:
    """Extract progress from fgbio log output."""
    # Try different patterns in order of preference
    patterns = [
        (self.FINISHED_PATTERN, "finished"),
        (self.PROCESSED_PATTERN, "processed"),
        (self.READ_PATTERN, "read"),
        (self.GROUPED_PATTERN, "grouped"),
        (self.WROTE_PATTERN, "wrote"),
    ]

    best_count = 0
    best_unit = "records"

    for pattern, _ in patterns:
        matches = list(pattern.finditer(log_content))
        if matches:
            # Get the last (most recent) match
            last_match = matches[-1]
            count_str = last_match.group(1).replace(",", "")
            count = int(count_str)
            unit = last_match.group(2).lower()

            # Normalize unit names
            if unit in ("record", "records"):
                unit = "records"
            elif unit in ("read", "reads"):
                unit = "reads"
            elif unit in ("pair", "pairs"):
                unit = "read pairs"

            if count > best_count:
                best_count = count
                best_unit = unit

    if best_count > 0:
        return ToolProgress(
            items_processed=best_count,
            items_total=None,
            unit=best_unit,
        )

    return None

loader

File-based plugin loading with security checks.

This module handles loading plugins from Python files in user directories. It includes security checks for symlinks and world-writable directories.

Classes

Functions

clear_plugin_cache
clear_plugin_cache() -> None

Clear the cached user plugins, forcing a reload on next access.

Source code in snakesee/plugins/loader.py
def clear_plugin_cache() -> None:
    """Clear the cached user plugins, forcing a reload on next access."""
    global _user_plugins
    _user_plugins = None
load_user_plugins
load_user_plugins(plugin_dirs: list[Path] | None = None, force_reload: bool = False) -> list[ToolProgressPlugin]

Load custom user plugins from plugin directories.

User plugins are Python files in ~/.snakesee/plugins/ or ~/.config/snakesee/plugins/ that define classes inheriting from ToolProgressPlugin.

Parameters:

Name Type Description Default
plugin_dirs list[Path] | None

List of directories to search. Defaults to USER_PLUGIN_DIRS.

None
force_reload bool

If True, reload plugins even if already cached.

False

Returns:

Type Description
list[ToolProgressPlugin]

List of loaded user plugin instances.

Example plugin file (~/.snakesee/plugins/my_tool.py)::

from snakesee.plugins.base import ToolProgress, ToolProgressPlugin
import re

class MyToolPlugin(ToolProgressPlugin):
    @property
    def tool_name(self) -> str:
        return "mytool"

    def can_parse(self, rule_name: str, log_content: str) -> bool:
        return "mytool" in rule_name.lower()

    def parse_progress(self, log_content: str) -> ToolProgress | None:
        match = re.search(r"Processed (\d+) items", log_content)
        if match:
            return ToolProgress(items_processed=int(match.group(1)), unit="items")
        return None
Source code in snakesee/plugins/loader.py
def load_user_plugins(
    plugin_dirs: list[Path] | None = None,
    force_reload: bool = False,
) -> list[ToolProgressPlugin]:
    """
    Load custom user plugins from plugin directories.

    User plugins are Python files in ~/.snakesee/plugins/ or ~/.config/snakesee/plugins/
    that define classes inheriting from ToolProgressPlugin.

    Args:
        plugin_dirs: List of directories to search. Defaults to USER_PLUGIN_DIRS.
        force_reload: If True, reload plugins even if already cached.

    Returns:
        List of loaded user plugin instances.

    Example plugin file (~/.snakesee/plugins/my_tool.py)::

        from snakesee.plugins.base import ToolProgress, ToolProgressPlugin
        import re

        class MyToolPlugin(ToolProgressPlugin):
            @property
            def tool_name(self) -> str:
                return "mytool"

            def can_parse(self, rule_name: str, log_content: str) -> bool:
                return "mytool" in rule_name.lower()

            def parse_progress(self, log_content: str) -> ToolProgress | None:
                match = re.search(r"Processed (\\d+) items", log_content)
                if match:
                    return ToolProgress(items_processed=int(match.group(1)), unit="items")
                return None
    """
    global _user_plugins

    if _user_plugins is not None and not force_reload:
        return _user_plugins

    if plugin_dirs is None:
        plugin_dirs = USER_PLUGIN_DIRS

    loaded_plugins: list[ToolProgressPlugin] = []

    for plugin_dir in plugin_dirs:
        if not plugin_dir.exists() or not plugin_dir.is_dir():
            continue

        # Security checks
        _check_plugin_dir_security(plugin_dir)

        # Find all Python files in the plugin directory
        for plugin_file in plugin_dir.glob("*.py"):
            if plugin_file.name.startswith("_"):
                continue  # Skip private modules

            try:
                plugins = _load_plugins_from_file(plugin_file)
                loaded_plugins.extend(plugins)
            except (ImportError, SyntaxError, OSError) as e:
                logger.debug("Failed to load plugin from %s: %s", plugin_file, e)
                continue

    _user_plugins = loaded_plugins
    return loaded_plugins
validate_plugin
validate_plugin(plugin: ToolProgressPlugin, source: str = 'unknown') -> PluginMetadata | None

Validate that a plugin instance is compatible and properly implemented.

Uses PluginMetadata for structured validation of plugin attributes.

Parameters:

Name Type Description Default
plugin ToolProgressPlugin

The plugin instance to validate.

required
source str

Description of where the plugin came from (for logging).

'unknown'

Returns:

Type Description
PluginMetadata | None

PluginMetadata if the plugin is valid and compatible, None otherwise.

Source code in snakesee/plugins/loader.py
def validate_plugin(plugin: ToolProgressPlugin, source: str = "unknown") -> PluginMetadata | None:
    """
    Validate that a plugin instance is compatible and properly implemented.

    Uses PluginMetadata for structured validation of plugin attributes.

    Args:
        plugin: The plugin instance to validate.
        source: Description of where the plugin came from (for logging).

    Returns:
        PluginMetadata if the plugin is valid and compatible, None otherwise.
    """
    # Validate required interface methods exist and are callable
    required_methods = ["can_parse", "parse_progress"]

    for method_name in required_methods:
        method = getattr(plugin, method_name, None)
        if method is None or not callable(method):
            logger.warning(
                "Plugin from %s is missing required method '%s'. Skipping.",
                source,
                method_name,
            )
            return None

    # Use PluginMetadata for structured validation
    try:
        metadata = PluginMetadata.from_plugin(plugin)
    except (ValueError, AttributeError) as e:
        logger.warning(
            "Plugin from %s failed metadata validation: %s. Skipping.",
            source,
            e,
        )
        return None

    # Check API version compatibility
    if not metadata.is_compatible(PLUGIN_API_VERSION):
        logger.warning(
            "Plugin %s from %s requires API version %d, but current version is %d. Skipping.",
            metadata.name,
            source,
            metadata.api_version,
            PLUGIN_API_VERSION,
        )
        return None

    return metadata

registry

Plugin registry and lookup functions.

This module provides functions for finding and using plugins to parse tool-specific progress from log files.

Classes

Functions

find_plugin_for_log
find_plugin_for_log(rule_name: str, log_content: str, plugins: list[ToolProgressPlugin]) -> ToolProgressPlugin | None

Find a plugin that can parse the given log content.

Parameters:

Name Type Description Default
rule_name str

Name of the Snakemake rule.

required
log_content str

Content of the rule's log file.

required
plugins list[ToolProgressPlugin]

List of plugins to search.

required

Returns:

Type Description
ToolProgressPlugin | None

A plugin that can parse this log, or None if no plugin matches.

Source code in snakesee/plugins/registry.py
def find_plugin_for_log(
    rule_name: str,
    log_content: str,
    plugins: list[ToolProgressPlugin],
) -> ToolProgressPlugin | None:
    """
    Find a plugin that can parse the given log content.

    Args:
        rule_name: Name of the Snakemake rule.
        log_content: Content of the rule's log file.
        plugins: List of plugins to search.

    Returns:
        A plugin that can parse this log, or None if no plugin matches.
    """
    for plugin in plugins:
        if plugin.can_parse(rule_name, log_content):
            return plugin

    return None
get_all_plugins
get_all_plugins(builtin_plugins: list[ToolProgressPlugin], include_user: bool = True) -> list[ToolProgressPlugin]

Get all available plugins (built-in, user file-based, and entry points).

Parameters:

Name Type Description Default
builtin_plugins list[ToolProgressPlugin]

List of built-in plugin instances.

required
include_user bool

Whether to include user plugins (file-based and entry points).

True

Returns:

Type Description
list[ToolProgressPlugin]

Combined list of all plugins.

Source code in snakesee/plugins/registry.py
def get_all_plugins(
    builtin_plugins: list[ToolProgressPlugin],
    include_user: bool = True,
) -> list[ToolProgressPlugin]:
    """
    Get all available plugins (built-in, user file-based, and entry points).

    Args:
        builtin_plugins: List of built-in plugin instances.
        include_user: Whether to include user plugins (file-based and entry points).

    Returns:
        Combined list of all plugins.
    """
    from snakesee.plugins.discovery import discover_entry_point_plugins
    from snakesee.plugins.loader import load_user_plugins

    all_plugins = list(builtin_plugins)
    if include_user:
        all_plugins.extend(load_user_plugins())
        all_plugins.extend(discover_entry_point_plugins())
    return all_plugins
parse_tool_progress
parse_tool_progress(rule_name: str, log_path: Path, plugins: list[ToolProgressPlugin]) -> ToolProgress | None

Parse progress from a rule's log file using available plugins.

Parameters:

Name Type Description Default
rule_name str

Name of the Snakemake rule.

required
log_path Path

Path to the rule's log file.

required
plugins list[ToolProgressPlugin]

List of plugins to use.

required

Returns:

Type Description
ToolProgress | None

ToolProgress if progress could be extracted, None otherwise.

Source code in snakesee/plugins/registry.py
def parse_tool_progress(
    rule_name: str,
    log_path: Path,
    plugins: list[ToolProgressPlugin],
) -> ToolProgress | None:
    """
    Parse progress from a rule's log file using available plugins.

    Args:
        rule_name: Name of the Snakemake rule.
        log_path: Path to the rule's log file.
        plugins: List of plugins to use.

    Returns:
        ToolProgress if progress could be extracted, None otherwise.
    """
    if not log_path.exists():
        return None

    try:
        content = log_path.read_text(errors="ignore")
    except OSError:
        return None

    plugin = find_plugin_for_log(rule_name, content, plugins)
    if plugin is None:
        return None

    return plugin.parse_progress(content)

samtools

Plugin for samtools progress parsing.

Classes

SamtoolsIndexPlugin

Bases: ToolProgressPlugin

Progress parser for samtools index.

samtools index doesn't provide progress, but we can detect completion.

Source code in snakesee/plugins/samtools.py
class SamtoolsIndexPlugin(ToolProgressPlugin):
    """
    Progress parser for samtools index.

    samtools index doesn't provide progress, but we can detect completion.
    """

    @property
    def tool_name(self) -> str:
        return "samtools-index"

    @property
    def tool_patterns(self) -> list[str]:
        return ["samtools index"]

    def can_parse(self, rule_name: str, log_content: str) -> bool:
        """Check if this looks like samtools index."""
        return "index" in rule_name.lower() and (
            "sam" in rule_name.lower() or "bam" in rule_name.lower()
        )

    def parse_progress(self, log_content: str) -> ToolProgress | None:
        """samtools index doesn't report progress."""
        return None
Functions
can_parse
can_parse(rule_name: str, log_content: str) -> bool

Check if this looks like samtools index.

Source code in snakesee/plugins/samtools.py
def can_parse(self, rule_name: str, log_content: str) -> bool:
    """Check if this looks like samtools index."""
    return "index" in rule_name.lower() and (
        "sam" in rule_name.lower() or "bam" in rule_name.lower()
    )
parse_progress
parse_progress(log_content: str) -> ToolProgress | None

samtools index doesn't report progress.

Source code in snakesee/plugins/samtools.py
def parse_progress(self, log_content: str) -> ToolProgress | None:
    """samtools index doesn't report progress."""
    return None
SamtoolsSortPlugin

Bases: ToolProgressPlugin

Progress parser for samtools sort.

Parses samtools sort progress output which shows processed reads.

Example samtools sort output:: [bam_sort_core] merging from 4 files and 1 in-memory blocks... [bam_sort_core] read 1000000 records...

Source code in snakesee/plugins/samtools.py
class SamtoolsSortPlugin(ToolProgressPlugin):
    """
    Progress parser for samtools sort.

    Parses samtools sort progress output which shows processed reads.

    Example samtools sort output::
        [bam_sort_core] merging from 4 files and 1 in-memory blocks...
        [bam_sort_core] read 1000000 records...
    """

    # Pattern: read 1000000 records
    RECORDS_PATTERN = re.compile(r"read\s+(\d+)\s+records")

    @property
    def tool_name(self) -> str:
        return "samtools-sort"

    @property
    def tool_patterns(self) -> list[str]:
        return ["samtools sort", "samtools view"]

    def can_parse(self, rule_name: str, log_content: str) -> bool:
        """Check if this looks like samtools sort output."""
        if "sort" in rule_name.lower() and "sam" in rule_name.lower():
            return True
        return "[bam_sort_core]" in log_content

    def parse_progress(self, log_content: str) -> ToolProgress | None:
        """Extract progress from samtools sort log output."""
        total_records = 0

        for match in self.RECORDS_PATTERN.finditer(log_content):
            records = int(match.group(1))
            if records > total_records:
                total_records = records

        if total_records > 0:
            return ToolProgress(
                items_processed=total_records,
                items_total=None,
                unit="records",
            )

        return None
Functions
can_parse
can_parse(rule_name: str, log_content: str) -> bool

Check if this looks like samtools sort output.

Source code in snakesee/plugins/samtools.py
def can_parse(self, rule_name: str, log_content: str) -> bool:
    """Check if this looks like samtools sort output."""
    if "sort" in rule_name.lower() and "sam" in rule_name.lower():
        return True
    return "[bam_sort_core]" in log_content
parse_progress
parse_progress(log_content: str) -> ToolProgress | None

Extract progress from samtools sort log output.

Source code in snakesee/plugins/samtools.py
def parse_progress(self, log_content: str) -> ToolProgress | None:
    """Extract progress from samtools sort log output."""
    total_records = 0

    for match in self.RECORDS_PATTERN.finditer(log_content):
        records = int(match.group(1))
        if records > total_records:
            total_records = records

    if total_records > 0:
        return ToolProgress(
            items_processed=total_records,
            items_total=None,
            unit="records",
        )

    return None

star

Plugin for STAR aligner progress parsing.

Classes

STARPlugin

Bases: ToolProgressPlugin

Progress parser for STAR (Spliced Transcripts Alignment to a Reference).

Parses STAR progress output which shows alignment progress.

Example STAR output:: STAR version=2.7.10a ... Finished 10000000 paired reads Finished 20000000 paired reads ... Uniquely mapped reads % | 85.00%

Source code in snakesee/plugins/star.py
class STARPlugin(ToolProgressPlugin):
    """
    Progress parser for STAR (Spliced Transcripts Alignment to a Reference).

    Parses STAR progress output which shows alignment progress.

    Example STAR output::
        STAR version=2.7.10a
        ...
        Finished 10000000 paired reads
        Finished 20000000 paired reads
        ...
        Uniquely mapped reads % |	85.00%
    """

    # Pattern: Finished 10000000 paired reads
    FINISHED_PATTERN = re.compile(r"Finished\s+(\d+)\s+(?:paired\s+)?reads")

    # Pattern from Log.progress.out: 10000000 reads processed
    PROGRESS_PATTERN = re.compile(r"(\d+)\s+reads?\s+processed")

    # Percentage from Log.final.out
    MAPPED_PATTERN = re.compile(r"Uniquely mapped reads %\s*\|\s*([\d.]+)%")

    @property
    def tool_name(self) -> str:
        return "star"

    @property
    def tool_patterns(self) -> list[str]:
        return ["STAR", "star"]

    def can_parse(self, rule_name: str, log_content: str) -> bool:
        """Check if this looks like STAR output."""
        if "star" in rule_name.lower():
            return True
        return "STAR version" in log_content or "STAR --" in log_content

    def parse_progress(self, log_content: str) -> ToolProgress | None:
        """Extract progress from STAR log output."""
        # Find all "Finished X reads" lines
        finished_matches = list(self.FINISHED_PATTERN.finditer(log_content))
        if finished_matches:
            last_count = int(finished_matches[-1].group(1))
            return ToolProgress(
                items_processed=last_count,
                items_total=None,
                unit="reads",
            )

        # Check for progress file format
        progress_matches = list(self.PROGRESS_PATTERN.finditer(log_content))
        if progress_matches:
            last_count = int(progress_matches[-1].group(1))
            return ToolProgress(
                items_processed=last_count,
                items_total=None,
                unit="reads",
            )

        return None
Functions
can_parse
can_parse(rule_name: str, log_content: str) -> bool

Check if this looks like STAR output.

Source code in snakesee/plugins/star.py
def can_parse(self, rule_name: str, log_content: str) -> bool:
    """Check if this looks like STAR output."""
    if "star" in rule_name.lower():
        return True
    return "STAR version" in log_content or "STAR --" in log_content
parse_progress
parse_progress(log_content: str) -> ToolProgress | None

Extract progress from STAR log output.

Source code in snakesee/plugins/star.py
def parse_progress(self, log_content: str) -> ToolProgress | None:
    """Extract progress from STAR log output."""
    # Find all "Finished X reads" lines
    finished_matches = list(self.FINISHED_PATTERN.finditer(log_content))
    if finished_matches:
        last_count = int(finished_matches[-1].group(1))
        return ToolProgress(
            items_processed=last_count,
            items_total=None,
            unit="reads",
        )

    # Check for progress file format
    progress_matches = list(self.PROGRESS_PATTERN.finditer(log_content))
    if progress_matches:
        last_count = int(progress_matches[-1].group(1))
        return ToolProgress(
            items_processed=last_count,
            items_total=None,
            unit="reads",
        )

    return None