Skip to content

paths

Centralized path management for Snakemake workflow directories.

This module provides the WorkflowPaths class which centralizes all path construction for Snakemake workflows, eliminating ad-hoc path construction scattered across multiple functions.

Classes

WorkflowPaths dataclass

Centralized path management for Snakemake workflow directories.

This frozen dataclass provides computed paths for all standard Snakemake directory locations, eliminating ad-hoc path construction.

Attributes:

Name Type Description
workflow_dir Path

Root directory of the workflow (contains .snakemake/).

Example

paths = WorkflowPaths(Path("/my/workflow"))

Access computed paths

if paths.metadata_dir.exists(): for f in paths.get_metadata_files(): process(f)

Find logs

latest = paths.find_latest_log() all_logs = paths.find_all_logs()

Source code in snakesee/state/paths.py
@dataclass(frozen=True)
class WorkflowPaths:
    """Centralized path management for Snakemake workflow directories.

    This frozen dataclass provides computed paths for all standard
    Snakemake directory locations, eliminating ad-hoc path construction.

    Attributes:
        workflow_dir: Root directory of the workflow (contains .snakemake/).

    Example:
        paths = WorkflowPaths(Path("/my/workflow"))

        # Access computed paths
        if paths.metadata_dir.exists():
            for f in paths.get_metadata_files():
                process(f)

        # Find logs
        latest = paths.find_latest_log()
        all_logs = paths.find_all_logs()
    """

    workflow_dir: Path

    # =========================================================================
    # Core directory properties
    # =========================================================================

    @property
    def snakemake_dir(self) -> Path:
        """Path to .snakemake/ directory."""
        return self.workflow_dir / SNAKEMAKE_DIR

    @property
    def metadata_dir(self) -> Path:
        """Path to .snakemake/metadata/ directory."""
        return self.snakemake_dir / METADATA_DIR

    @property
    def log_dir(self) -> Path:
        """Path to .snakemake/log/ directory."""
        return self.snakemake_dir / LOG_DIR

    @property
    def incomplete_dir(self) -> Path:
        """Path to .snakemake/incomplete/ directory."""
        return self.snakemake_dir / INCOMPLETE_DIR

    @property
    def locks_dir(self) -> Path:
        """Path to .snakemake/locks/ directory."""
        return self.snakemake_dir / LOCKS_DIR

    @property
    def metadata_db(self) -> Path:
        """Path to Snakemake's SQLite metadata database."""
        return self.snakemake_dir / METADATA_DB_NAME

    # =========================================================================
    # Event and validation file paths
    # =========================================================================

    @property
    def events_file(self) -> Path:
        """Path to snakesee events file (.snakesee_events.jsonl)."""
        return self.workflow_dir / EVENT_FILE_NAME

    @property
    def validation_log(self) -> Path:
        """Path to validation log file (.snakesee_validation.log)."""
        return self.workflow_dir / VALIDATION_LOG_NAME

    @property
    def default_profile(self) -> Path:
        """Path to default profile file (.snakesee-profile.json)."""
        return self.workflow_dir / DEFAULT_PROFILE_NAME

    # =========================================================================
    # Existence checks
    # =========================================================================

    @property
    def exists(self) -> bool:
        """Check if this is a valid workflow directory."""
        return _cached_exists(self.snakemake_dir)

    @property
    def has_metadata(self) -> bool:
        """Check if metadata directory exists."""
        return _cached_exists(self.metadata_dir)

    @property
    def has_metadata_db(self) -> bool:
        """Whether a SQLite metadata database exists."""
        return _cached_exists(self.metadata_db)

    @property
    def has_logs(self) -> bool:
        """Check if log directory exists and contains logs."""
        if not _cached_exists(self.log_dir):
            return False
        return any(self.log_dir.glob(LOG_GLOB_PATTERN))

    @property
    def has_events(self) -> bool:
        """Check if events file exists and has content."""
        try:
            return self.events_file.stat().st_size > 0
        except OSError:
            return False

    @property
    def has_locks(self) -> bool:
        """Check if locks directory exists and contains files."""
        if not _cached_exists(self.locks_dir):
            return False
        try:
            return any(self.locks_dir.iterdir())
        except OSError:
            return False

    @property
    def has_incomplete(self) -> bool:
        """Check if incomplete directory exists and contains markers."""
        if not _cached_exists(self.incomplete_dir):
            return False
        try:
            return any(self.incomplete_dir.iterdir())
        except OSError:
            return False

    # =========================================================================
    # Log file discovery
    # =========================================================================

    def find_latest_log(self) -> Path | None:
        """Find the most recent snakemake log file.

        Returns:
            Path to the most recent log file, or None if no logs exist.
        """
        if not _cached_exists(self.log_dir):
            return None
        # Files from glob already exist at time of iteration; no need to re-check
        logs = list(self.log_dir.glob(LOG_GLOB_PATTERN))
        if not logs:
            return None
        logs.sort(key=safe_mtime)
        return logs[-1]

    def find_all_logs(self) -> list[Path]:
        """Find all snakemake log files, sorted by modification time.

        Returns:
            List of paths sorted oldest to newest.
        """
        if not _cached_exists(self.log_dir):
            return []
        # Files from glob already exist at time of iteration; no need to re-check
        logs = list(self.log_dir.glob(LOG_GLOB_PATTERN))
        logs.sort(key=safe_mtime)
        return logs

    def find_logs_sorted_newest_first(self) -> list[Path]:
        """Find all snakemake log files, sorted newest first.

        Returns:
            List of paths sorted newest to oldest.
        """
        logs = self.find_all_logs()
        logs.reverse()
        return logs

    # =========================================================================
    # Metadata file discovery
    # =========================================================================

    def get_metadata_files(self) -> Iterator[Path]:
        """Iterate over all metadata files.

        Yields:
            Path to each metadata file.
        """
        if not _cached_exists(self.metadata_dir):
            return
        for f in self.metadata_dir.rglob("*"):
            if f.is_file():
                yield f

    def count_metadata_files(self) -> int:
        """Count the number of metadata files.

        Returns:
            Number of metadata files.
        """
        if not _cached_exists(self.metadata_dir):
            return 0
        return sum(1 for f in self.metadata_dir.rglob("*") if f.is_file())

    # =========================================================================
    # Incomplete marker handling
    # =========================================================================

    def get_incomplete_markers(self) -> Iterator[Path]:
        """Iterate over incomplete job markers.

        Yields:
            Path to each incomplete marker file.
        """
        if not _cached_exists(self.incomplete_dir):
            return
        for marker in self.incomplete_dir.rglob("*"):
            if marker.is_file() and marker.name != "migration_underway":
                yield marker

    def decode_incomplete_marker(self, marker: Path) -> Path | None:
        """Decode an incomplete marker filename to get the output path.

        Args:
            marker: Path to the marker file.

        Returns:
            Decoded output file path, or None if decoding fails.
        """
        try:
            decoded = base64.b64decode(marker.name).decode("utf-8")
            return Path(decoded)
        except (ValueError, UnicodeDecodeError):
            return None

    # =========================================================================
    # Job log discovery
    # =========================================================================

    def get_job_log(
        self,
        rule: str,
        wildcards: dict[str, str] | None = None,
        job_id: int | str | None = None,
    ) -> Path | None:
        """Find the log file for a specific job.

        Searches common log locations for a file matching the rule
        and optional wildcards/job_id.

        Args:
            rule: Name of the rule.
            wildcards: Optional wildcard values.
            job_id: Optional job ID.

        Returns:
            Path to the log file if found, None otherwise.
        """
        search_paths: list[Path] = []

        # .snakemake/log/ directory
        if _cached_exists(self.log_dir):
            search_paths.extend(self.log_dir.glob(f"*{rule}*"))
            if job_id is not None:
                search_paths.extend(self.log_dir.glob(f"*job{job_id}*"))

        # logs/ directory (common convention)
        logs_dir = self.workflow_dir / "logs"
        search_paths.extend(self._search_log_dir(logs_dir, rule, wildcards))

        # log/ directory (another common convention)
        log_dir = self.workflow_dir / "log"
        search_paths.extend(self._search_log_dir(log_dir, rule, wildcards))

        # Sort by modification time (newest first) and return first match
        # is_file() already confirms existence, no need for additional exists check
        existing_logs = [p for p in search_paths if p.is_file()]
        if existing_logs:
            existing_logs.sort(key=safe_mtime, reverse=True)
            return existing_logs[0]

        return None

    def _search_log_dir(
        self,
        log_dir: Path,
        rule: str,
        wildcards: dict[str, str] | None,
    ) -> list[Path]:
        """Search a log directory for matching logs."""
        paths: list[Path] = []
        if not _cached_exists(log_dir):
            return paths

        paths.extend(log_dir.glob(f"**/{rule}*"))

        rule_log_dir = log_dir / rule
        if _cached_exists(rule_log_dir):
            paths.extend(rule_log_dir.glob("*"))

        if wildcards:
            for wc_value in wildcards.values():
                if wc_value:
                    paths.extend(log_dir.glob(f"**/*{wc_value}*"))

        return paths

    # =========================================================================
    # Profile discovery
    # =========================================================================

    def find_profile(self, max_levels: int = 6) -> Path | None:
        """Search for a profile file in workflow and parent directories.

        Args:
            max_levels: Maximum parent levels to search (including current).

        Returns:
            Path to the found profile, or None if not found.
        """
        current = self.workflow_dir.resolve()
        for _ in range(max_levels):
            profile_path = current / DEFAULT_PROFILE_NAME
            if _cached_exists(profile_path):
                return profile_path
            if current.parent == current:
                break
            current = current.parent
        return None

    # =========================================================================
    # Validation
    # =========================================================================

    def validate(self) -> None:
        """Validate that this is a valid workflow directory.

        Raises:
            ValueError: If .snakemake directory doesn't exist.
        """
        if not _cached_exists(self.snakemake_dir):
            raise ValueError(f"No .snakemake directory found in {self.workflow_dir}")

Attributes

default_profile property
default_profile: Path

Path to default profile file (.snakesee-profile.json).

events_file property
events_file: Path

Path to snakesee events file (.snakesee_events.jsonl).

exists property
exists: bool

Check if this is a valid workflow directory.

has_events property
has_events: bool

Check if events file exists and has content.

has_incomplete property
has_incomplete: bool

Check if incomplete directory exists and contains markers.

has_locks property
has_locks: bool

Check if locks directory exists and contains files.

has_logs property
has_logs: bool

Check if log directory exists and contains logs.

has_metadata property
has_metadata: bool

Check if metadata directory exists.

has_metadata_db property
has_metadata_db: bool

Whether a SQLite metadata database exists.

incomplete_dir property
incomplete_dir: Path

Path to .snakemake/incomplete/ directory.

locks_dir property
locks_dir: Path

Path to .snakemake/locks/ directory.

log_dir property
log_dir: Path

Path to .snakemake/log/ directory.

metadata_db property
metadata_db: Path

Path to Snakemake's SQLite metadata database.

metadata_dir property
metadata_dir: Path

Path to .snakemake/metadata/ directory.

snakemake_dir property
snakemake_dir: Path

Path to .snakemake/ directory.

validation_log property
validation_log: Path

Path to validation log file (.snakesee_validation.log).

Functions

count_metadata_files
count_metadata_files() -> int

Count the number of metadata files.

Returns:

Type Description
int

Number of metadata files.

Source code in snakesee/state/paths.py
def count_metadata_files(self) -> int:
    """Count the number of metadata files.

    Returns:
        Number of metadata files.
    """
    if not _cached_exists(self.metadata_dir):
        return 0
    return sum(1 for f in self.metadata_dir.rglob("*") if f.is_file())
decode_incomplete_marker
decode_incomplete_marker(marker: Path) -> Path | None

Decode an incomplete marker filename to get the output path.

Parameters:

Name Type Description Default
marker Path

Path to the marker file.

required

Returns:

Type Description
Path | None

Decoded output file path, or None if decoding fails.

Source code in snakesee/state/paths.py
def decode_incomplete_marker(self, marker: Path) -> Path | None:
    """Decode an incomplete marker filename to get the output path.

    Args:
        marker: Path to the marker file.

    Returns:
        Decoded output file path, or None if decoding fails.
    """
    try:
        decoded = base64.b64decode(marker.name).decode("utf-8")
        return Path(decoded)
    except (ValueError, UnicodeDecodeError):
        return None
find_all_logs
find_all_logs() -> list[Path]

Find all snakemake log files, sorted by modification time.

Returns:

Type Description
list[Path]

List of paths sorted oldest to newest.

Source code in snakesee/state/paths.py
def find_all_logs(self) -> list[Path]:
    """Find all snakemake log files, sorted by modification time.

    Returns:
        List of paths sorted oldest to newest.
    """
    if not _cached_exists(self.log_dir):
        return []
    # Files from glob already exist at time of iteration; no need to re-check
    logs = list(self.log_dir.glob(LOG_GLOB_PATTERN))
    logs.sort(key=safe_mtime)
    return logs
find_latest_log
find_latest_log() -> Path | None

Find the most recent snakemake log file.

Returns:

Type Description
Path | None

Path to the most recent log file, or None if no logs exist.

Source code in snakesee/state/paths.py
def find_latest_log(self) -> Path | None:
    """Find the most recent snakemake log file.

    Returns:
        Path to the most recent log file, or None if no logs exist.
    """
    if not _cached_exists(self.log_dir):
        return None
    # Files from glob already exist at time of iteration; no need to re-check
    logs = list(self.log_dir.glob(LOG_GLOB_PATTERN))
    if not logs:
        return None
    logs.sort(key=safe_mtime)
    return logs[-1]
find_logs_sorted_newest_first
find_logs_sorted_newest_first() -> list[Path]

Find all snakemake log files, sorted newest first.

Returns:

Type Description
list[Path]

List of paths sorted newest to oldest.

Source code in snakesee/state/paths.py
def find_logs_sorted_newest_first(self) -> list[Path]:
    """Find all snakemake log files, sorted newest first.

    Returns:
        List of paths sorted newest to oldest.
    """
    logs = self.find_all_logs()
    logs.reverse()
    return logs
find_profile
find_profile(max_levels: int = 6) -> Path | None

Search for a profile file in workflow and parent directories.

Parameters:

Name Type Description Default
max_levels int

Maximum parent levels to search (including current).

6

Returns:

Type Description
Path | None

Path to the found profile, or None if not found.

Source code in snakesee/state/paths.py
def find_profile(self, max_levels: int = 6) -> Path | None:
    """Search for a profile file in workflow and parent directories.

    Args:
        max_levels: Maximum parent levels to search (including current).

    Returns:
        Path to the found profile, or None if not found.
    """
    current = self.workflow_dir.resolve()
    for _ in range(max_levels):
        profile_path = current / DEFAULT_PROFILE_NAME
        if _cached_exists(profile_path):
            return profile_path
        if current.parent == current:
            break
        current = current.parent
    return None
get_incomplete_markers
get_incomplete_markers() -> Iterator[Path]

Iterate over incomplete job markers.

Yields:

Type Description
Path

Path to each incomplete marker file.

Source code in snakesee/state/paths.py
def get_incomplete_markers(self) -> Iterator[Path]:
    """Iterate over incomplete job markers.

    Yields:
        Path to each incomplete marker file.
    """
    if not _cached_exists(self.incomplete_dir):
        return
    for marker in self.incomplete_dir.rglob("*"):
        if marker.is_file() and marker.name != "migration_underway":
            yield marker
get_job_log
get_job_log(rule: str, wildcards: dict[str, str] | None = None, job_id: int | str | None = None) -> Path | None

Find the log file for a specific job.

Searches common log locations for a file matching the rule and optional wildcards/job_id.

Parameters:

Name Type Description Default
rule str

Name of the rule.

required
wildcards dict[str, str] | None

Optional wildcard values.

None
job_id int | str | None

Optional job ID.

None

Returns:

Type Description
Path | None

Path to the log file if found, None otherwise.

Source code in snakesee/state/paths.py
def get_job_log(
    self,
    rule: str,
    wildcards: dict[str, str] | None = None,
    job_id: int | str | None = None,
) -> Path | None:
    """Find the log file for a specific job.

    Searches common log locations for a file matching the rule
    and optional wildcards/job_id.

    Args:
        rule: Name of the rule.
        wildcards: Optional wildcard values.
        job_id: Optional job ID.

    Returns:
        Path to the log file if found, None otherwise.
    """
    search_paths: list[Path] = []

    # .snakemake/log/ directory
    if _cached_exists(self.log_dir):
        search_paths.extend(self.log_dir.glob(f"*{rule}*"))
        if job_id is not None:
            search_paths.extend(self.log_dir.glob(f"*job{job_id}*"))

    # logs/ directory (common convention)
    logs_dir = self.workflow_dir / "logs"
    search_paths.extend(self._search_log_dir(logs_dir, rule, wildcards))

    # log/ directory (another common convention)
    log_dir = self.workflow_dir / "log"
    search_paths.extend(self._search_log_dir(log_dir, rule, wildcards))

    # Sort by modification time (newest first) and return first match
    # is_file() already confirms existence, no need for additional exists check
    existing_logs = [p for p in search_paths if p.is_file()]
    if existing_logs:
        existing_logs.sort(key=safe_mtime, reverse=True)
        return existing_logs[0]

    return None
get_metadata_files
get_metadata_files() -> Iterator[Path]

Iterate over all metadata files.

Yields:

Type Description
Path

Path to each metadata file.

Source code in snakesee/state/paths.py
def get_metadata_files(self) -> Iterator[Path]:
    """Iterate over all metadata files.

    Yields:
        Path to each metadata file.
    """
    if not _cached_exists(self.metadata_dir):
        return
    for f in self.metadata_dir.rglob("*"):
        if f.is_file():
            yield f
validate
validate() -> None

Validate that this is a valid workflow directory.

Raises:

Type Description
ValueError

If .snakemake directory doesn't exist.

Source code in snakesee/state/paths.py
def validate(self) -> None:
    """Validate that this is a valid workflow directory.

    Raises:
        ValueError: If .snakemake directory doesn't exist.
    """
    if not _cached_exists(self.snakemake_dir):
        raise ValueError(f"No .snakemake directory found in {self.workflow_dir}")

Functions

clear_exists_cache

clear_exists_cache() -> None

Clear the filesystem existence cache.

Useful for testing or after significant filesystem operations.

Source code in snakesee/state/paths.py
def clear_exists_cache() -> None:
    """Clear the filesystem existence cache.

    Useful for testing or after significant filesystem operations.
    """
    _exists_cache.clear()