Skip to content

fs

Filesystem persistence backend.

Reads Snakemake metadata from the traditional .snakemake/metadata/, .snakemake/incomplete/, and .snakemake/locks/ directory layout.

Classes

FsPersistence

Filesystem-based persistence backend.

Wraps the existing metadata file iteration, incomplete marker scanning, and lock file detection into the PersistenceBackend interface.

Source code in snakesee/persistence/fs.py
class FsPersistence:
    """Filesystem-based persistence backend.

    Wraps the existing metadata file iteration, incomplete marker scanning,
    and lock file detection into the PersistenceBackend interface.
    """

    def __init__(self, paths: WorkflowPaths) -> None:
        self._paths = paths

    def iterate_metadata(
        self,
        progress_callback: ProgressCallback | None = None,
    ) -> Iterator[MetadataRecord]:
        """Iterate over metadata files in .snakemake/metadata/."""
        if not self._paths.has_metadata:
            return
        yield from parse_metadata_files_full(self._paths.metadata_dir, progress_callback)

    def has_locks(self) -> bool:
        """Check for lock files in .snakemake/locks/."""
        locks_dir = self._paths.locks_dir
        if not locks_dir.exists():
            return False
        try:
            return any(locks_dir.iterdir())
        except OSError:
            return False

    def has_incomplete_jobs(self) -> bool:
        """Check for incomplete markers in .snakemake/incomplete/."""
        inc_dir = self._paths.incomplete_dir
        if not inc_dir.exists():
            return False
        try:
            return any(
                marker.is_file() and marker.name != "migration_underway"
                for marker in inc_dir.rglob("*")
            )
        except OSError:
            return False

    def iterate_incomplete_jobs(
        self,
        min_start_time: float | None = None,
    ) -> Iterator[IncompleteJob]:
        """Iterate over incomplete markers in .snakemake/incomplete/.

        Each marker filename is a base64-encoded output file path.
        The marker's mtime approximates when the job started.
        """
        inc_dir = self._paths.incomplete_dir
        if not inc_dir.exists():
            return

        for marker in inc_dir.rglob("*"):
            if not marker.is_file() or marker.name == "migration_underway":
                continue

            try:
                marker_mtime = marker.stat().st_mtime
            except OSError:
                continue

            if min_start_time is not None and marker_mtime < min_start_time:
                continue

            output_file: Path | None = None
            try:
                decoded = base64.b64decode(marker.name).decode("utf-8")
                output_file = Path(decoded)
            except (ValueError, UnicodeDecodeError) as e:
                logger.warning("Failed to decode base64 marker filename %s: %s", marker.name, e)

            yield IncompleteJob(
                start_time=marker_mtime,
                output_file=output_file,
            )

Functions

has_incomplete_jobs
has_incomplete_jobs() -> bool

Check for incomplete markers in .snakemake/incomplete/.

Source code in snakesee/persistence/fs.py
def has_incomplete_jobs(self) -> bool:
    """Check for incomplete markers in .snakemake/incomplete/."""
    inc_dir = self._paths.incomplete_dir
    if not inc_dir.exists():
        return False
    try:
        return any(
            marker.is_file() and marker.name != "migration_underway"
            for marker in inc_dir.rglob("*")
        )
    except OSError:
        return False
has_locks
has_locks() -> bool

Check for lock files in .snakemake/locks/.

Source code in snakesee/persistence/fs.py
def has_locks(self) -> bool:
    """Check for lock files in .snakemake/locks/."""
    locks_dir = self._paths.locks_dir
    if not locks_dir.exists():
        return False
    try:
        return any(locks_dir.iterdir())
    except OSError:
        return False
iterate_incomplete_jobs
iterate_incomplete_jobs(min_start_time: float | None = None) -> Iterator[IncompleteJob]

Iterate over incomplete markers in .snakemake/incomplete/.

Each marker filename is a base64-encoded output file path. The marker's mtime approximates when the job started.

Source code in snakesee/persistence/fs.py
def iterate_incomplete_jobs(
    self,
    min_start_time: float | None = None,
) -> Iterator[IncompleteJob]:
    """Iterate over incomplete markers in .snakemake/incomplete/.

    Each marker filename is a base64-encoded output file path.
    The marker's mtime approximates when the job started.
    """
    inc_dir = self._paths.incomplete_dir
    if not inc_dir.exists():
        return

    for marker in inc_dir.rglob("*"):
        if not marker.is_file() or marker.name == "migration_underway":
            continue

        try:
            marker_mtime = marker.stat().st_mtime
        except OSError:
            continue

        if min_start_time is not None and marker_mtime < min_start_time:
            continue

        output_file: Path | None = None
        try:
            decoded = base64.b64decode(marker.name).decode("utf-8")
            output_file = Path(decoded)
        except (ValueError, UnicodeDecodeError) as e:
            logger.warning("Failed to decode base64 marker filename %s: %s", marker.name, e)

        yield IncompleteJob(
            start_time=marker_mtime,
            output_file=output_file,
        )
iterate_metadata
iterate_metadata(progress_callback: ProgressCallback | None = None) -> Iterator[MetadataRecord]

Iterate over metadata files in .snakemake/metadata/.

Source code in snakesee/persistence/fs.py
def iterate_metadata(
    self,
    progress_callback: ProgressCallback | None = None,
) -> Iterator[MetadataRecord]:
    """Iterate over metadata files in .snakemake/metadata/."""
    if not self._paths.has_metadata:
        return
    yield from parse_metadata_files_full(self._paths.metadata_dir, progress_callback)

Functions