Skip to content

persistence

Persistence backend abstraction for reading Snakemake metadata.

Supports both the filesystem-based backend (.snakemake/metadata/ directory) and the newer SQLite-based backend (.snakemake/metadata.db).

Classes

DbPersistence

SQLite-based persistence backend.

Reads job metadata, incomplete markers, and lock state from Snakemake's .snakemake/metadata.db SQLite database.

Opens the database read-only to avoid interfering with Snakemake.

Source code in snakesee/persistence/db.py
class DbPersistence:
    """SQLite-based persistence backend.

    Reads job metadata, incomplete markers, and lock state from
    Snakemake's .snakemake/metadata.db SQLite database.

    Opens the database read-only to avoid interfering with Snakemake.
    """

    def __init__(self, paths: WorkflowPaths) -> None:
        self._paths = paths
        self._namespace = str(paths.snakemake_dir.resolve())

    def _connect(self) -> sqlite3.Connection | None:
        """Open a read-only connection to the metadata DB.

        Returns:
            A sqlite3 Connection, or None if the DB doesn't exist or is invalid.
        """
        db_path = self._paths.metadata_db
        if not db_path.exists():
            return None

        try:
            uri = f"{db_path.resolve().as_uri()}?mode=ro"
            conn = sqlite3.connect(uri, uri=True, timeout=10.0)
            conn.row_factory = sqlite3.Row
            return conn
        except (sqlite3.DatabaseError, OSError) as e:
            logger.debug("Failed to open metadata DB %s: %s", db_path, e)
            return None

    def iterate_metadata(
        self,
        progress_callback: ProgressCallback | None = None,
    ) -> Iterator[MetadataRecord]:
        """Iterate over completed job metadata from the SQLite DB.

        Skips rows with incomplete=1, NULL rule, or stub records
        (record_format_version=0).

        Args:
            progress_callback: Optional callback(current, total) for progress.

        Yields:
            MetadataRecord for each completed job.
        """
        conn = self._connect()
        if conn is None:
            return

        try:
            if progress_callback is not None:
                row = conn.execute(
                    """SELECT COUNT(*) FROM snakemake_metadata
                       WHERE namespace = ?
                       AND (incomplete IS NULL OR incomplete = 0)
                       AND rule IS NOT NULL
                       AND record_format_version > 0""",
                    (self._namespace,),
                ).fetchone()
                total = row[0] if row else 0
            else:
                total = 0

            cursor = conn.execute(
                """SELECT rule, starttime, endtime, code, input
                   FROM snakemake_metadata
                   WHERE namespace = ?
                   AND (incomplete IS NULL OR incomplete = 0)
                   AND rule IS NOT NULL
                   AND record_format_version > 0""",
                (self._namespace,),
            )

            current = 0
            for row in cursor:
                record = self._row_to_metadata_record(row)
                if record is not None:
                    yield record

                current += 1
                if progress_callback is not None:
                    progress_callback(current, total)

        except sqlite3.DatabaseError as e:
            logger.warning("Error reading metadata from DB: %s", e)
        finally:
            conn.close()

    def has_locks(self) -> bool:
        """Check for lock rows matching our namespace.

        Returns:
            True if any locks exist for this namespace.
        """
        conn = self._connect()
        if conn is None:
            return False

        try:
            row = conn.execute(
                "SELECT 1 FROM snakemake_locks WHERE namespace = ? LIMIT 1",
                (self._namespace,),
            ).fetchone()
            return row is not None
        except sqlite3.DatabaseError as e:
            logger.debug("Error checking locks in DB: %s", e)
            return False
        finally:
            conn.close()

    def has_incomplete_jobs(self) -> bool:
        """Check for incomplete=1 rows matching our namespace.

        Returns:
            True if any incomplete jobs exist for this namespace.
        """
        conn = self._connect()
        if conn is None:
            return False

        try:
            row = conn.execute(
                """SELECT 1 FROM snakemake_metadata
                   WHERE namespace = ? AND incomplete = 1 LIMIT 1""",
                (self._namespace,),
            ).fetchone()
            return row is not None
        except sqlite3.DatabaseError as e:
            logger.debug("Error checking incomplete jobs in DB: %s", e)
            return False
        finally:
            conn.close()

    def iterate_incomplete_jobs(
        self,
        min_start_time: float | None = None,
    ) -> Iterator[IncompleteJob]:
        """Iterate over incomplete job rows from the DB.

        Args:
            min_start_time: If set, only yield jobs started at or after this time.

        Yields:
            IncompleteJob for each in-progress job.
        """
        conn = self._connect()
        if conn is None:
            return

        try:
            if min_start_time is not None:
                cursor = conn.execute(
                    """SELECT target, rule, starttime, external_jobid
                       FROM snakemake_metadata
                       WHERE namespace = ? AND incomplete = 1
                       AND starttime >= ?""",
                    (self._namespace, min_start_time),
                )
            else:
                cursor = conn.execute(
                    """SELECT target, rule, starttime, external_jobid
                       FROM snakemake_metadata
                       WHERE namespace = ? AND incomplete = 1""",
                    (self._namespace,),
                )

            for row in cursor:
                yield IncompleteJob(
                    start_time=row["starttime"],
                    output_file=Path(row["target"]) if row["target"] else None,
                    rule=row["rule"],
                    external_jobid=row["external_jobid"],
                )

        except sqlite3.DatabaseError as e:
            logger.warning("Error reading incomplete jobs from DB: %s", e)
        finally:
            conn.close()

    @staticmethod
    def _row_to_metadata_record(row: sqlite3.Row) -> MetadataRecord | None:
        """Convert a DB row to a MetadataRecord.

        Args:
            row: A sqlite3.Row with rule, starttime, endtime, code, input columns.

        Returns:
            A MetadataRecord, or None if the row has no rule.
        """
        rule = row["rule"]
        if rule is None:
            return None

        code_hash: str | None = None
        code = row["code"]
        if code:
            normalized_code = " ".join(code.split())
            code_hash = hashlib.sha256(normalized_code.encode()).hexdigest()[:16]

        input_size: int | None = None
        input_json = row["input"]
        if input_json:
            try:
                import orjson

                input_files = orjson.loads(input_json)
                if isinstance(input_files, list):
                    input_size = calculate_metadata_input_size(input_files)
            except (orjson.JSONDecodeError, TypeError) as e:
                logger.debug("Failed to parse input JSON for rule %s: %s", rule, e)

        return MetadataRecord(
            rule=rule,
            start_time=row["starttime"],
            end_time=row["endtime"],
            code_hash=code_hash,
            input_size=input_size,
        )

Functions

has_incomplete_jobs
has_incomplete_jobs() -> bool

Check for incomplete=1 rows matching our namespace.

Returns:

Type Description
bool

True if any incomplete jobs exist for this namespace.

Source code in snakesee/persistence/db.py
def has_incomplete_jobs(self) -> bool:
    """Check for incomplete=1 rows matching our namespace.

    Returns:
        True if any incomplete jobs exist for this namespace.
    """
    conn = self._connect()
    if conn is None:
        return False

    try:
        row = conn.execute(
            """SELECT 1 FROM snakemake_metadata
               WHERE namespace = ? AND incomplete = 1 LIMIT 1""",
            (self._namespace,),
        ).fetchone()
        return row is not None
    except sqlite3.DatabaseError as e:
        logger.debug("Error checking incomplete jobs in DB: %s", e)
        return False
    finally:
        conn.close()
has_locks
has_locks() -> bool

Check for lock rows matching our namespace.

Returns:

Type Description
bool

True if any locks exist for this namespace.

Source code in snakesee/persistence/db.py
def has_locks(self) -> bool:
    """Check for lock rows matching our namespace.

    Returns:
        True if any locks exist for this namespace.
    """
    conn = self._connect()
    if conn is None:
        return False

    try:
        row = conn.execute(
            "SELECT 1 FROM snakemake_locks WHERE namespace = ? LIMIT 1",
            (self._namespace,),
        ).fetchone()
        return row is not None
    except sqlite3.DatabaseError as e:
        logger.debug("Error checking locks in DB: %s", e)
        return False
    finally:
        conn.close()
iterate_incomplete_jobs
iterate_incomplete_jobs(min_start_time: float | None = None) -> Iterator[IncompleteJob]

Iterate over incomplete job rows from the DB.

Parameters:

Name Type Description Default
min_start_time float | None

If set, only yield jobs started at or after this time.

None

Yields:

Type Description
IncompleteJob

IncompleteJob for each in-progress job.

Source code in snakesee/persistence/db.py
def iterate_incomplete_jobs(
    self,
    min_start_time: float | None = None,
) -> Iterator[IncompleteJob]:
    """Iterate over incomplete job rows from the DB.

    Args:
        min_start_time: If set, only yield jobs started at or after this time.

    Yields:
        IncompleteJob for each in-progress job.
    """
    conn = self._connect()
    if conn is None:
        return

    try:
        if min_start_time is not None:
            cursor = conn.execute(
                """SELECT target, rule, starttime, external_jobid
                   FROM snakemake_metadata
                   WHERE namespace = ? AND incomplete = 1
                   AND starttime >= ?""",
                (self._namespace, min_start_time),
            )
        else:
            cursor = conn.execute(
                """SELECT target, rule, starttime, external_jobid
                   FROM snakemake_metadata
                   WHERE namespace = ? AND incomplete = 1""",
                (self._namespace,),
            )

        for row in cursor:
            yield IncompleteJob(
                start_time=row["starttime"],
                output_file=Path(row["target"]) if row["target"] else None,
                rule=row["rule"],
                external_jobid=row["external_jobid"],
            )

    except sqlite3.DatabaseError as e:
        logger.warning("Error reading incomplete jobs from DB: %s", e)
    finally:
        conn.close()
iterate_metadata
iterate_metadata(progress_callback: ProgressCallback | None = None) -> Iterator[MetadataRecord]

Iterate over completed job metadata from the SQLite DB.

Skips rows with incomplete=1, NULL rule, or stub records (record_format_version=0).

Parameters:

Name Type Description Default
progress_callback ProgressCallback | None

Optional callback(current, total) for progress.

None

Yields:

Type Description
MetadataRecord

MetadataRecord for each completed job.

Source code in snakesee/persistence/db.py
def iterate_metadata(
    self,
    progress_callback: ProgressCallback | None = None,
) -> Iterator[MetadataRecord]:
    """Iterate over completed job metadata from the SQLite DB.

    Skips rows with incomplete=1, NULL rule, or stub records
    (record_format_version=0).

    Args:
        progress_callback: Optional callback(current, total) for progress.

    Yields:
        MetadataRecord for each completed job.
    """
    conn = self._connect()
    if conn is None:
        return

    try:
        if progress_callback is not None:
            row = conn.execute(
                """SELECT COUNT(*) FROM snakemake_metadata
                   WHERE namespace = ?
                   AND (incomplete IS NULL OR incomplete = 0)
                   AND rule IS NOT NULL
                   AND record_format_version > 0""",
                (self._namespace,),
            ).fetchone()
            total = row[0] if row else 0
        else:
            total = 0

        cursor = conn.execute(
            """SELECT rule, starttime, endtime, code, input
               FROM snakemake_metadata
               WHERE namespace = ?
               AND (incomplete IS NULL OR incomplete = 0)
               AND rule IS NOT NULL
               AND record_format_version > 0""",
            (self._namespace,),
        )

        current = 0
        for row in cursor:
            record = self._row_to_metadata_record(row)
            if record is not None:
                yield record

            current += 1
            if progress_callback is not None:
                progress_callback(current, total)

    except sqlite3.DatabaseError as e:
        logger.warning("Error reading metadata from DB: %s", e)
    finally:
        conn.close()

FsPersistence

Filesystem-based persistence backend.

Wraps the existing metadata file iteration, incomplete marker scanning, and lock file detection into the PersistenceBackend interface.

Source code in snakesee/persistence/fs.py
class FsPersistence:
    """Filesystem-based persistence backend.

    Wraps the existing metadata file iteration, incomplete marker scanning,
    and lock file detection into the PersistenceBackend interface.
    """

    def __init__(self, paths: WorkflowPaths) -> None:
        self._paths = paths

    def iterate_metadata(
        self,
        progress_callback: ProgressCallback | None = None,
    ) -> Iterator[MetadataRecord]:
        """Iterate over metadata files in .snakemake/metadata/."""
        if not self._paths.has_metadata:
            return
        yield from parse_metadata_files_full(self._paths.metadata_dir, progress_callback)

    def has_locks(self) -> bool:
        """Check for lock files in .snakemake/locks/."""
        locks_dir = self._paths.locks_dir
        if not locks_dir.exists():
            return False
        try:
            return any(locks_dir.iterdir())
        except OSError:
            return False

    def has_incomplete_jobs(self) -> bool:
        """Check for incomplete markers in .snakemake/incomplete/."""
        inc_dir = self._paths.incomplete_dir
        if not inc_dir.exists():
            return False
        try:
            return any(
                marker.is_file() and marker.name != "migration_underway"
                for marker in inc_dir.rglob("*")
            )
        except OSError:
            return False

    def iterate_incomplete_jobs(
        self,
        min_start_time: float | None = None,
    ) -> Iterator[IncompleteJob]:
        """Iterate over incomplete markers in .snakemake/incomplete/.

        Each marker filename is a base64-encoded output file path.
        The marker's mtime approximates when the job started.
        """
        inc_dir = self._paths.incomplete_dir
        if not inc_dir.exists():
            return

        for marker in inc_dir.rglob("*"):
            if not marker.is_file() or marker.name == "migration_underway":
                continue

            try:
                marker_mtime = marker.stat().st_mtime
            except OSError:
                continue

            if min_start_time is not None and marker_mtime < min_start_time:
                continue

            output_file: Path | None = None
            try:
                decoded = base64.b64decode(marker.name).decode("utf-8")
                output_file = Path(decoded)
            except (ValueError, UnicodeDecodeError) as e:
                logger.warning("Failed to decode base64 marker filename %s: %s", marker.name, e)

            yield IncompleteJob(
                start_time=marker_mtime,
                output_file=output_file,
            )

Functions

has_incomplete_jobs
has_incomplete_jobs() -> bool

Check for incomplete markers in .snakemake/incomplete/.

Source code in snakesee/persistence/fs.py
def has_incomplete_jobs(self) -> bool:
    """Check for incomplete markers in .snakemake/incomplete/."""
    inc_dir = self._paths.incomplete_dir
    if not inc_dir.exists():
        return False
    try:
        return any(
            marker.is_file() and marker.name != "migration_underway"
            for marker in inc_dir.rglob("*")
        )
    except OSError:
        return False
has_locks
has_locks() -> bool

Check for lock files in .snakemake/locks/.

Source code in snakesee/persistence/fs.py
def has_locks(self) -> bool:
    """Check for lock files in .snakemake/locks/."""
    locks_dir = self._paths.locks_dir
    if not locks_dir.exists():
        return False
    try:
        return any(locks_dir.iterdir())
    except OSError:
        return False
iterate_incomplete_jobs
iterate_incomplete_jobs(min_start_time: float | None = None) -> Iterator[IncompleteJob]

Iterate over incomplete markers in .snakemake/incomplete/.

Each marker filename is a base64-encoded output file path. The marker's mtime approximates when the job started.

Source code in snakesee/persistence/fs.py
def iterate_incomplete_jobs(
    self,
    min_start_time: float | None = None,
) -> Iterator[IncompleteJob]:
    """Iterate over incomplete markers in .snakemake/incomplete/.

    Each marker filename is a base64-encoded output file path.
    The marker's mtime approximates when the job started.
    """
    inc_dir = self._paths.incomplete_dir
    if not inc_dir.exists():
        return

    for marker in inc_dir.rglob("*"):
        if not marker.is_file() or marker.name == "migration_underway":
            continue

        try:
            marker_mtime = marker.stat().st_mtime
        except OSError:
            continue

        if min_start_time is not None and marker_mtime < min_start_time:
            continue

        output_file: Path | None = None
        try:
            decoded = base64.b64decode(marker.name).decode("utf-8")
            output_file = Path(decoded)
        except (ValueError, UnicodeDecodeError) as e:
            logger.warning("Failed to decode base64 marker filename %s: %s", marker.name, e)

        yield IncompleteJob(
            start_time=marker_mtime,
            output_file=output_file,
        )
iterate_metadata
iterate_metadata(progress_callback: ProgressCallback | None = None) -> Iterator[MetadataRecord]

Iterate over metadata files in .snakemake/metadata/.

Source code in snakesee/persistence/fs.py
def iterate_metadata(
    self,
    progress_callback: ProgressCallback | None = None,
) -> Iterator[MetadataRecord]:
    """Iterate over metadata files in .snakemake/metadata/."""
    if not self._paths.has_metadata:
        return
    yield from parse_metadata_files_full(self._paths.metadata_dir, progress_callback)

IncompleteJob dataclass

A job marked as incomplete/in-progress.

Attributes:

Name Type Description
start_time float | None

Approximate start time (mtime for FS, starttime for DB).

output_file Path | None

Output file path if known.

rule str | None

Rule name if known (DB backend provides this; FS does not).

external_jobid str | None

External executor job ID if known.

Source code in snakesee/persistence/backend.py
@dataclass(frozen=True, slots=True)
class IncompleteJob:
    """A job marked as incomplete/in-progress.

    Attributes:
        start_time: Approximate start time (mtime for FS, starttime for DB).
        output_file: Output file path if known.
        rule: Rule name if known (DB backend provides this; FS does not).
        external_jobid: External executor job ID if known.
    """

    start_time: float | None = None
    output_file: Path | None = None
    rule: str | None = None
    external_jobid: str | None = None

PersistenceBackend

Bases: Protocol

Protocol for reading Snakemake workflow metadata.

Implementations provide access to job metadata, incomplete markers, and lock state from either the filesystem or SQLite backends.

Source code in snakesee/persistence/backend.py
class PersistenceBackend(Protocol):
    """Protocol for reading Snakemake workflow metadata.

    Implementations provide access to job metadata, incomplete markers,
    and lock state from either the filesystem or SQLite backends.
    """

    def iterate_metadata(
        self,
        progress_callback: ProgressCallback | None = None,
    ) -> Iterator[MetadataRecord]:
        """Iterate over all job metadata records.

        Args:
            progress_callback: Optional callback(current, total) for progress.

        Yields:
            MetadataRecord for each completed job.
        """
        ...

    def has_incomplete_jobs(self) -> bool:
        """Check whether any jobs are currently marked incomplete."""
        ...

    def iterate_incomplete_jobs(
        self,
        min_start_time: float | None = None,
    ) -> Iterator[IncompleteJob]:
        """Iterate over incomplete (in-progress) job markers.

        Args:
            min_start_time: If set, only yield jobs started at or after this time.

        Yields:
            IncompleteJob for each in-progress job.
        """
        ...

    def has_locks(self) -> bool:
        """Check whether any workflow locks are held."""
        ...

Functions

has_incomplete_jobs
has_incomplete_jobs() -> bool

Check whether any jobs are currently marked incomplete.

Source code in snakesee/persistence/backend.py
def has_incomplete_jobs(self) -> bool:
    """Check whether any jobs are currently marked incomplete."""
    ...
has_locks
has_locks() -> bool

Check whether any workflow locks are held.

Source code in snakesee/persistence/backend.py
def has_locks(self) -> bool:
    """Check whether any workflow locks are held."""
    ...
iterate_incomplete_jobs
iterate_incomplete_jobs(min_start_time: float | None = None) -> Iterator[IncompleteJob]

Iterate over incomplete (in-progress) job markers.

Parameters:

Name Type Description Default
min_start_time float | None

If set, only yield jobs started at or after this time.

None

Yields:

Type Description
IncompleteJob

IncompleteJob for each in-progress job.

Source code in snakesee/persistence/backend.py
def iterate_incomplete_jobs(
    self,
    min_start_time: float | None = None,
) -> Iterator[IncompleteJob]:
    """Iterate over incomplete (in-progress) job markers.

    Args:
        min_start_time: If set, only yield jobs started at or after this time.

    Yields:
        IncompleteJob for each in-progress job.
    """
    ...
iterate_metadata
iterate_metadata(progress_callback: ProgressCallback | None = None) -> Iterator[MetadataRecord]

Iterate over all job metadata records.

Parameters:

Name Type Description Default
progress_callback ProgressCallback | None

Optional callback(current, total) for progress.

None

Yields:

Type Description
MetadataRecord

MetadataRecord for each completed job.

Source code in snakesee/persistence/backend.py
def iterate_metadata(
    self,
    progress_callback: ProgressCallback | None = None,
) -> Iterator[MetadataRecord]:
    """Iterate over all job metadata records.

    Args:
        progress_callback: Optional callback(current, total) for progress.

    Yields:
        MetadataRecord for each completed job.
    """
    ...

Functions

detect_backend

detect_backend(workflow_dir: Path) -> PersistenceBackend

Auto-detect and return the appropriate persistence backend.

Prefers the SQLite DB backend when .snakemake/metadata.db exists. Falls back to filesystem backend otherwise.

Parameters:

Name Type Description Default
workflow_dir Path

Root workflow directory containing .snakemake/.

required

Returns:

Type Description
PersistenceBackend

A PersistenceBackend implementation.

Source code in snakesee/persistence/backend.py
def detect_backend(workflow_dir: Path) -> PersistenceBackend:
    """Auto-detect and return the appropriate persistence backend.

    Prefers the SQLite DB backend when .snakemake/metadata.db exists.
    Falls back to filesystem backend otherwise.

    Args:
        workflow_dir: Root workflow directory containing .snakemake/.

    Returns:
        A PersistenceBackend implementation.
    """
    from snakesee.state.paths import WorkflowPaths

    paths = WorkflowPaths(workflow_dir)

    if paths.metadata_db.exists():
        from snakesee.persistence.db import DbPersistence

        return DbPersistence(paths)

    from snakesee.persistence.fs import FsPersistence

    return FsPersistence(paths)

Modules

backend

Persistence backend protocol and auto-detection.

Classes

IncompleteJob dataclass

A job marked as incomplete/in-progress.

Attributes:

Name Type Description
start_time float | None

Approximate start time (mtime for FS, starttime for DB).

output_file Path | None

Output file path if known.

rule str | None

Rule name if known (DB backend provides this; FS does not).

external_jobid str | None

External executor job ID if known.

Source code in snakesee/persistence/backend.py
@dataclass(frozen=True, slots=True)
class IncompleteJob:
    """A job marked as incomplete/in-progress.

    Attributes:
        start_time: Approximate start time (mtime for FS, starttime for DB).
        output_file: Output file path if known.
        rule: Rule name if known (DB backend provides this; FS does not).
        external_jobid: External executor job ID if known.
    """

    start_time: float | None = None
    output_file: Path | None = None
    rule: str | None = None
    external_jobid: str | None = None
PersistenceBackend

Bases: Protocol

Protocol for reading Snakemake workflow metadata.

Implementations provide access to job metadata, incomplete markers, and lock state from either the filesystem or SQLite backends.

Source code in snakesee/persistence/backend.py
class PersistenceBackend(Protocol):
    """Protocol for reading Snakemake workflow metadata.

    Implementations provide access to job metadata, incomplete markers,
    and lock state from either the filesystem or SQLite backends.
    """

    def iterate_metadata(
        self,
        progress_callback: ProgressCallback | None = None,
    ) -> Iterator[MetadataRecord]:
        """Iterate over all job metadata records.

        Args:
            progress_callback: Optional callback(current, total) for progress.

        Yields:
            MetadataRecord for each completed job.
        """
        ...

    def has_incomplete_jobs(self) -> bool:
        """Check whether any jobs are currently marked incomplete."""
        ...

    def iterate_incomplete_jobs(
        self,
        min_start_time: float | None = None,
    ) -> Iterator[IncompleteJob]:
        """Iterate over incomplete (in-progress) job markers.

        Args:
            min_start_time: If set, only yield jobs started at or after this time.

        Yields:
            IncompleteJob for each in-progress job.
        """
        ...

    def has_locks(self) -> bool:
        """Check whether any workflow locks are held."""
        ...
Functions
has_incomplete_jobs
has_incomplete_jobs() -> bool

Check whether any jobs are currently marked incomplete.

Source code in snakesee/persistence/backend.py
def has_incomplete_jobs(self) -> bool:
    """Check whether any jobs are currently marked incomplete."""
    ...
has_locks
has_locks() -> bool

Check whether any workflow locks are held.

Source code in snakesee/persistence/backend.py
def has_locks(self) -> bool:
    """Check whether any workflow locks are held."""
    ...
iterate_incomplete_jobs
iterate_incomplete_jobs(min_start_time: float | None = None) -> Iterator[IncompleteJob]

Iterate over incomplete (in-progress) job markers.

Parameters:

Name Type Description Default
min_start_time float | None

If set, only yield jobs started at or after this time.

None

Yields:

Type Description
IncompleteJob

IncompleteJob for each in-progress job.

Source code in snakesee/persistence/backend.py
def iterate_incomplete_jobs(
    self,
    min_start_time: float | None = None,
) -> Iterator[IncompleteJob]:
    """Iterate over incomplete (in-progress) job markers.

    Args:
        min_start_time: If set, only yield jobs started at or after this time.

    Yields:
        IncompleteJob for each in-progress job.
    """
    ...
iterate_metadata
iterate_metadata(progress_callback: ProgressCallback | None = None) -> Iterator[MetadataRecord]

Iterate over all job metadata records.

Parameters:

Name Type Description Default
progress_callback ProgressCallback | None

Optional callback(current, total) for progress.

None

Yields:

Type Description
MetadataRecord

MetadataRecord for each completed job.

Source code in snakesee/persistence/backend.py
def iterate_metadata(
    self,
    progress_callback: ProgressCallback | None = None,
) -> Iterator[MetadataRecord]:
    """Iterate over all job metadata records.

    Args:
        progress_callback: Optional callback(current, total) for progress.

    Yields:
        MetadataRecord for each completed job.
    """
    ...

Functions

detect_backend
detect_backend(workflow_dir: Path) -> PersistenceBackend

Auto-detect and return the appropriate persistence backend.

Prefers the SQLite DB backend when .snakemake/metadata.db exists. Falls back to filesystem backend otherwise.

Parameters:

Name Type Description Default
workflow_dir Path

Root workflow directory containing .snakemake/.

required

Returns:

Type Description
PersistenceBackend

A PersistenceBackend implementation.

Source code in snakesee/persistence/backend.py
def detect_backend(workflow_dir: Path) -> PersistenceBackend:
    """Auto-detect and return the appropriate persistence backend.

    Prefers the SQLite DB backend when .snakemake/metadata.db exists.
    Falls back to filesystem backend otherwise.

    Args:
        workflow_dir: Root workflow directory containing .snakemake/.

    Returns:
        A PersistenceBackend implementation.
    """
    from snakesee.state.paths import WorkflowPaths

    paths = WorkflowPaths(workflow_dir)

    if paths.metadata_db.exists():
        from snakesee.persistence.db import DbPersistence

        return DbPersistence(paths)

    from snakesee.persistence.fs import FsPersistence

    return FsPersistence(paths)

db

SQLite persistence backend.

Reads Snakemake metadata from the .snakemake/metadata.db SQLite database introduced as an alternative persistence backend in Snakemake.

Classes

DbPersistence

SQLite-based persistence backend.

Reads job metadata, incomplete markers, and lock state from Snakemake's .snakemake/metadata.db SQLite database.

Opens the database read-only to avoid interfering with Snakemake.

Source code in snakesee/persistence/db.py
class DbPersistence:
    """SQLite-based persistence backend.

    Reads job metadata, incomplete markers, and lock state from
    Snakemake's .snakemake/metadata.db SQLite database.

    Opens the database read-only to avoid interfering with Snakemake.
    """

    def __init__(self, paths: WorkflowPaths) -> None:
        self._paths = paths
        self._namespace = str(paths.snakemake_dir.resolve())

    def _connect(self) -> sqlite3.Connection | None:
        """Open a read-only connection to the metadata DB.

        Returns:
            A sqlite3 Connection, or None if the DB doesn't exist or is invalid.
        """
        db_path = self._paths.metadata_db
        if not db_path.exists():
            return None

        try:
            uri = f"{db_path.resolve().as_uri()}?mode=ro"
            conn = sqlite3.connect(uri, uri=True, timeout=10.0)
            conn.row_factory = sqlite3.Row
            return conn
        except (sqlite3.DatabaseError, OSError) as e:
            logger.debug("Failed to open metadata DB %s: %s", db_path, e)
            return None

    def iterate_metadata(
        self,
        progress_callback: ProgressCallback | None = None,
    ) -> Iterator[MetadataRecord]:
        """Iterate over completed job metadata from the SQLite DB.

        Skips rows with incomplete=1, NULL rule, or stub records
        (record_format_version=0).

        Args:
            progress_callback: Optional callback(current, total) for progress.

        Yields:
            MetadataRecord for each completed job.
        """
        conn = self._connect()
        if conn is None:
            return

        try:
            if progress_callback is not None:
                row = conn.execute(
                    """SELECT COUNT(*) FROM snakemake_metadata
                       WHERE namespace = ?
                       AND (incomplete IS NULL OR incomplete = 0)
                       AND rule IS NOT NULL
                       AND record_format_version > 0""",
                    (self._namespace,),
                ).fetchone()
                total = row[0] if row else 0
            else:
                total = 0

            cursor = conn.execute(
                """SELECT rule, starttime, endtime, code, input
                   FROM snakemake_metadata
                   WHERE namespace = ?
                   AND (incomplete IS NULL OR incomplete = 0)
                   AND rule IS NOT NULL
                   AND record_format_version > 0""",
                (self._namespace,),
            )

            current = 0
            for row in cursor:
                record = self._row_to_metadata_record(row)
                if record is not None:
                    yield record

                current += 1
                if progress_callback is not None:
                    progress_callback(current, total)

        except sqlite3.DatabaseError as e:
            logger.warning("Error reading metadata from DB: %s", e)
        finally:
            conn.close()

    def has_locks(self) -> bool:
        """Check for lock rows matching our namespace.

        Returns:
            True if any locks exist for this namespace.
        """
        conn = self._connect()
        if conn is None:
            return False

        try:
            row = conn.execute(
                "SELECT 1 FROM snakemake_locks WHERE namespace = ? LIMIT 1",
                (self._namespace,),
            ).fetchone()
            return row is not None
        except sqlite3.DatabaseError as e:
            logger.debug("Error checking locks in DB: %s", e)
            return False
        finally:
            conn.close()

    def has_incomplete_jobs(self) -> bool:
        """Check for incomplete=1 rows matching our namespace.

        Returns:
            True if any incomplete jobs exist for this namespace.
        """
        conn = self._connect()
        if conn is None:
            return False

        try:
            row = conn.execute(
                """SELECT 1 FROM snakemake_metadata
                   WHERE namespace = ? AND incomplete = 1 LIMIT 1""",
                (self._namespace,),
            ).fetchone()
            return row is not None
        except sqlite3.DatabaseError as e:
            logger.debug("Error checking incomplete jobs in DB: %s", e)
            return False
        finally:
            conn.close()

    def iterate_incomplete_jobs(
        self,
        min_start_time: float | None = None,
    ) -> Iterator[IncompleteJob]:
        """Iterate over incomplete job rows from the DB.

        Args:
            min_start_time: If set, only yield jobs started at or after this time.

        Yields:
            IncompleteJob for each in-progress job.
        """
        conn = self._connect()
        if conn is None:
            return

        try:
            if min_start_time is not None:
                cursor = conn.execute(
                    """SELECT target, rule, starttime, external_jobid
                       FROM snakemake_metadata
                       WHERE namespace = ? AND incomplete = 1
                       AND starttime >= ?""",
                    (self._namespace, min_start_time),
                )
            else:
                cursor = conn.execute(
                    """SELECT target, rule, starttime, external_jobid
                       FROM snakemake_metadata
                       WHERE namespace = ? AND incomplete = 1""",
                    (self._namespace,),
                )

            for row in cursor:
                yield IncompleteJob(
                    start_time=row["starttime"],
                    output_file=Path(row["target"]) if row["target"] else None,
                    rule=row["rule"],
                    external_jobid=row["external_jobid"],
                )

        except sqlite3.DatabaseError as e:
            logger.warning("Error reading incomplete jobs from DB: %s", e)
        finally:
            conn.close()

    @staticmethod
    def _row_to_metadata_record(row: sqlite3.Row) -> MetadataRecord | None:
        """Convert a DB row to a MetadataRecord.

        Args:
            row: A sqlite3.Row with rule, starttime, endtime, code, input columns.

        Returns:
            A MetadataRecord, or None if the row has no rule.
        """
        rule = row["rule"]
        if rule is None:
            return None

        code_hash: str | None = None
        code = row["code"]
        if code:
            normalized_code = " ".join(code.split())
            code_hash = hashlib.sha256(normalized_code.encode()).hexdigest()[:16]

        input_size: int | None = None
        input_json = row["input"]
        if input_json:
            try:
                import orjson

                input_files = orjson.loads(input_json)
                if isinstance(input_files, list):
                    input_size = calculate_metadata_input_size(input_files)
            except (orjson.JSONDecodeError, TypeError) as e:
                logger.debug("Failed to parse input JSON for rule %s: %s", rule, e)

        return MetadataRecord(
            rule=rule,
            start_time=row["starttime"],
            end_time=row["endtime"],
            code_hash=code_hash,
            input_size=input_size,
        )
Functions
has_incomplete_jobs
has_incomplete_jobs() -> bool

Check for incomplete=1 rows matching our namespace.

Returns:

Type Description
bool

True if any incomplete jobs exist for this namespace.

Source code in snakesee/persistence/db.py
def has_incomplete_jobs(self) -> bool:
    """Check for incomplete=1 rows matching our namespace.

    Returns:
        True if any incomplete jobs exist for this namespace.
    """
    conn = self._connect()
    if conn is None:
        return False

    try:
        row = conn.execute(
            """SELECT 1 FROM snakemake_metadata
               WHERE namespace = ? AND incomplete = 1 LIMIT 1""",
            (self._namespace,),
        ).fetchone()
        return row is not None
    except sqlite3.DatabaseError as e:
        logger.debug("Error checking incomplete jobs in DB: %s", e)
        return False
    finally:
        conn.close()
has_locks
has_locks() -> bool

Check for lock rows matching our namespace.

Returns:

Type Description
bool

True if any locks exist for this namespace.

Source code in snakesee/persistence/db.py
def has_locks(self) -> bool:
    """Check for lock rows matching our namespace.

    Returns:
        True if any locks exist for this namespace.
    """
    conn = self._connect()
    if conn is None:
        return False

    try:
        row = conn.execute(
            "SELECT 1 FROM snakemake_locks WHERE namespace = ? LIMIT 1",
            (self._namespace,),
        ).fetchone()
        return row is not None
    except sqlite3.DatabaseError as e:
        logger.debug("Error checking locks in DB: %s", e)
        return False
    finally:
        conn.close()
iterate_incomplete_jobs
iterate_incomplete_jobs(min_start_time: float | None = None) -> Iterator[IncompleteJob]

Iterate over incomplete job rows from the DB.

Parameters:

Name Type Description Default
min_start_time float | None

If set, only yield jobs started at or after this time.

None

Yields:

Type Description
IncompleteJob

IncompleteJob for each in-progress job.

Source code in snakesee/persistence/db.py
def iterate_incomplete_jobs(
    self,
    min_start_time: float | None = None,
) -> Iterator[IncompleteJob]:
    """Iterate over incomplete job rows from the DB.

    Args:
        min_start_time: If set, only yield jobs started at or after this time.

    Yields:
        IncompleteJob for each in-progress job.
    """
    conn = self._connect()
    if conn is None:
        return

    try:
        if min_start_time is not None:
            cursor = conn.execute(
                """SELECT target, rule, starttime, external_jobid
                   FROM snakemake_metadata
                   WHERE namespace = ? AND incomplete = 1
                   AND starttime >= ?""",
                (self._namespace, min_start_time),
            )
        else:
            cursor = conn.execute(
                """SELECT target, rule, starttime, external_jobid
                   FROM snakemake_metadata
                   WHERE namespace = ? AND incomplete = 1""",
                (self._namespace,),
            )

        for row in cursor:
            yield IncompleteJob(
                start_time=row["starttime"],
                output_file=Path(row["target"]) if row["target"] else None,
                rule=row["rule"],
                external_jobid=row["external_jobid"],
            )

    except sqlite3.DatabaseError as e:
        logger.warning("Error reading incomplete jobs from DB: %s", e)
    finally:
        conn.close()
iterate_metadata
iterate_metadata(progress_callback: ProgressCallback | None = None) -> Iterator[MetadataRecord]

Iterate over completed job metadata from the SQLite DB.

Skips rows with incomplete=1, NULL rule, or stub records (record_format_version=0).

Parameters:

Name Type Description Default
progress_callback ProgressCallback | None

Optional callback(current, total) for progress.

None

Yields:

Type Description
MetadataRecord

MetadataRecord for each completed job.

Source code in snakesee/persistence/db.py
def iterate_metadata(
    self,
    progress_callback: ProgressCallback | None = None,
) -> Iterator[MetadataRecord]:
    """Iterate over completed job metadata from the SQLite DB.

    Skips rows with incomplete=1, NULL rule, or stub records
    (record_format_version=0).

    Args:
        progress_callback: Optional callback(current, total) for progress.

    Yields:
        MetadataRecord for each completed job.
    """
    conn = self._connect()
    if conn is None:
        return

    try:
        if progress_callback is not None:
            row = conn.execute(
                """SELECT COUNT(*) FROM snakemake_metadata
                   WHERE namespace = ?
                   AND (incomplete IS NULL OR incomplete = 0)
                   AND rule IS NOT NULL
                   AND record_format_version > 0""",
                (self._namespace,),
            ).fetchone()
            total = row[0] if row else 0
        else:
            total = 0

        cursor = conn.execute(
            """SELECT rule, starttime, endtime, code, input
               FROM snakemake_metadata
               WHERE namespace = ?
               AND (incomplete IS NULL OR incomplete = 0)
               AND rule IS NOT NULL
               AND record_format_version > 0""",
            (self._namespace,),
        )

        current = 0
        for row in cursor:
            record = self._row_to_metadata_record(row)
            if record is not None:
                yield record

            current += 1
            if progress_callback is not None:
                progress_callback(current, total)

    except sqlite3.DatabaseError as e:
        logger.warning("Error reading metadata from DB: %s", e)
    finally:
        conn.close()

Functions

fs

Filesystem persistence backend.

Reads Snakemake metadata from the traditional .snakemake/metadata/, .snakemake/incomplete/, and .snakemake/locks/ directory layout.

Classes

FsPersistence

Filesystem-based persistence backend.

Wraps the existing metadata file iteration, incomplete marker scanning, and lock file detection into the PersistenceBackend interface.

Source code in snakesee/persistence/fs.py
class FsPersistence:
    """Filesystem-based persistence backend.

    Wraps the existing metadata file iteration, incomplete marker scanning,
    and lock file detection into the PersistenceBackend interface.
    """

    def __init__(self, paths: WorkflowPaths) -> None:
        self._paths = paths

    def iterate_metadata(
        self,
        progress_callback: ProgressCallback | None = None,
    ) -> Iterator[MetadataRecord]:
        """Iterate over metadata files in .snakemake/metadata/."""
        if not self._paths.has_metadata:
            return
        yield from parse_metadata_files_full(self._paths.metadata_dir, progress_callback)

    def has_locks(self) -> bool:
        """Check for lock files in .snakemake/locks/."""
        locks_dir = self._paths.locks_dir
        if not locks_dir.exists():
            return False
        try:
            return any(locks_dir.iterdir())
        except OSError:
            return False

    def has_incomplete_jobs(self) -> bool:
        """Check for incomplete markers in .snakemake/incomplete/."""
        inc_dir = self._paths.incomplete_dir
        if not inc_dir.exists():
            return False
        try:
            return any(
                marker.is_file() and marker.name != "migration_underway"
                for marker in inc_dir.rglob("*")
            )
        except OSError:
            return False

    def iterate_incomplete_jobs(
        self,
        min_start_time: float | None = None,
    ) -> Iterator[IncompleteJob]:
        """Iterate over incomplete markers in .snakemake/incomplete/.

        Each marker filename is a base64-encoded output file path.
        The marker's mtime approximates when the job started.
        """
        inc_dir = self._paths.incomplete_dir
        if not inc_dir.exists():
            return

        for marker in inc_dir.rglob("*"):
            if not marker.is_file() or marker.name == "migration_underway":
                continue

            try:
                marker_mtime = marker.stat().st_mtime
            except OSError:
                continue

            if min_start_time is not None and marker_mtime < min_start_time:
                continue

            output_file: Path | None = None
            try:
                decoded = base64.b64decode(marker.name).decode("utf-8")
                output_file = Path(decoded)
            except (ValueError, UnicodeDecodeError) as e:
                logger.warning("Failed to decode base64 marker filename %s: %s", marker.name, e)

            yield IncompleteJob(
                start_time=marker_mtime,
                output_file=output_file,
            )
Functions
has_incomplete_jobs
has_incomplete_jobs() -> bool

Check for incomplete markers in .snakemake/incomplete/.

Source code in snakesee/persistence/fs.py
def has_incomplete_jobs(self) -> bool:
    """Check for incomplete markers in .snakemake/incomplete/."""
    inc_dir = self._paths.incomplete_dir
    if not inc_dir.exists():
        return False
    try:
        return any(
            marker.is_file() and marker.name != "migration_underway"
            for marker in inc_dir.rglob("*")
        )
    except OSError:
        return False
has_locks
has_locks() -> bool

Check for lock files in .snakemake/locks/.

Source code in snakesee/persistence/fs.py
def has_locks(self) -> bool:
    """Check for lock files in .snakemake/locks/."""
    locks_dir = self._paths.locks_dir
    if not locks_dir.exists():
        return False
    try:
        return any(locks_dir.iterdir())
    except OSError:
        return False
iterate_incomplete_jobs
iterate_incomplete_jobs(min_start_time: float | None = None) -> Iterator[IncompleteJob]

Iterate over incomplete markers in .snakemake/incomplete/.

Each marker filename is a base64-encoded output file path. The marker's mtime approximates when the job started.

Source code in snakesee/persistence/fs.py
def iterate_incomplete_jobs(
    self,
    min_start_time: float | None = None,
) -> Iterator[IncompleteJob]:
    """Iterate over incomplete markers in .snakemake/incomplete/.

    Each marker filename is a base64-encoded output file path.
    The marker's mtime approximates when the job started.
    """
    inc_dir = self._paths.incomplete_dir
    if not inc_dir.exists():
        return

    for marker in inc_dir.rglob("*"):
        if not marker.is_file() or marker.name == "migration_underway":
            continue

        try:
            marker_mtime = marker.stat().st_mtime
        except OSError:
            continue

        if min_start_time is not None and marker_mtime < min_start_time:
            continue

        output_file: Path | None = None
        try:
            decoded = base64.b64decode(marker.name).decode("utf-8")
            output_file = Path(decoded)
        except (ValueError, UnicodeDecodeError) as e:
            logger.warning("Failed to decode base64 marker filename %s: %s", marker.name, e)

        yield IncompleteJob(
            start_time=marker_mtime,
            output_file=output_file,
        )
iterate_metadata
iterate_metadata(progress_callback: ProgressCallback | None = None) -> Iterator[MetadataRecord]

Iterate over metadata files in .snakemake/metadata/.

Source code in snakesee/persistence/fs.py
def iterate_metadata(
    self,
    progress_callback: ProgressCallback | None = None,
) -> Iterator[MetadataRecord]:
    """Iterate over metadata files in .snakemake/metadata/."""
    if not self._paths.has_metadata:
        return
    yield from parse_metadata_files_full(self._paths.metadata_dir, progress_callback)

Functions