Centralized path management for Snakemake workflow directories.
This frozen dataclass provides computed paths for all standard
Snakemake directory locations, eliminating ad-hoc path construction.
Attributes:
| Name |
Type |
Description |
workflow_dir |
Path
|
Root directory of the workflow (contains .snakemake/).
|
Example
paths = WorkflowPaths(Path("/my/workflow"))
Access computed paths
if paths.metadata_dir.exists():
for f in paths.get_metadata_files():
process(f)
Find logs
latest = paths.find_latest_log()
all_logs = paths.find_all_logs()
Source code in snakesee/state/paths.py
| @dataclass(frozen=True)
class WorkflowPaths:
"""Centralized path management for Snakemake workflow directories.
This frozen dataclass provides computed paths for all standard
Snakemake directory locations, eliminating ad-hoc path construction.
Attributes:
workflow_dir: Root directory of the workflow (contains .snakemake/).
Example:
paths = WorkflowPaths(Path("/my/workflow"))
# Access computed paths
if paths.metadata_dir.exists():
for f in paths.get_metadata_files():
process(f)
# Find logs
latest = paths.find_latest_log()
all_logs = paths.find_all_logs()
"""
workflow_dir: Path
# =========================================================================
# Core directory properties
# =========================================================================
@property
def snakemake_dir(self) -> Path:
"""Path to .snakemake/ directory."""
return self.workflow_dir / SNAKEMAKE_DIR
@property
def metadata_dir(self) -> Path:
"""Path to .snakemake/metadata/ directory."""
return self.snakemake_dir / METADATA_DIR
@property
def log_dir(self) -> Path:
"""Path to .snakemake/log/ directory."""
return self.snakemake_dir / LOG_DIR
@property
def incomplete_dir(self) -> Path:
"""Path to .snakemake/incomplete/ directory."""
return self.snakemake_dir / INCOMPLETE_DIR
@property
def locks_dir(self) -> Path:
"""Path to .snakemake/locks/ directory."""
return self.snakemake_dir / LOCKS_DIR
@property
def metadata_db(self) -> Path:
"""Path to Snakemake's SQLite metadata database."""
return self.snakemake_dir / METADATA_DB_NAME
# =========================================================================
# Event and validation file paths
# =========================================================================
@property
def events_file(self) -> Path:
"""Path to snakesee events file (.snakesee_events.jsonl)."""
return self.workflow_dir / EVENT_FILE_NAME
@property
def validation_log(self) -> Path:
"""Path to validation log file (.snakesee_validation.log)."""
return self.workflow_dir / VALIDATION_LOG_NAME
@property
def default_profile(self) -> Path:
"""Path to default profile file (.snakesee-profile.json)."""
return self.workflow_dir / DEFAULT_PROFILE_NAME
# =========================================================================
# Existence checks
# =========================================================================
@property
def exists(self) -> bool:
"""Check if this is a valid workflow directory."""
return _cached_exists(self.snakemake_dir)
@property
def has_metadata(self) -> bool:
"""Check if metadata directory exists."""
return _cached_exists(self.metadata_dir)
@property
def has_metadata_db(self) -> bool:
"""Whether a SQLite metadata database exists."""
return _cached_exists(self.metadata_db)
@property
def has_logs(self) -> bool:
"""Check if log directory exists and contains logs."""
if not _cached_exists(self.log_dir):
return False
return any(self.log_dir.glob(LOG_GLOB_PATTERN))
@property
def has_events(self) -> bool:
"""Check if events file exists and has content."""
try:
return self.events_file.stat().st_size > 0
except OSError:
return False
@property
def has_locks(self) -> bool:
"""Check if locks directory exists and contains files."""
if not _cached_exists(self.locks_dir):
return False
try:
return any(self.locks_dir.iterdir())
except OSError:
return False
@property
def has_incomplete(self) -> bool:
"""Check if incomplete directory exists and contains markers."""
if not _cached_exists(self.incomplete_dir):
return False
try:
return any(self.incomplete_dir.iterdir())
except OSError:
return False
# =========================================================================
# Log file discovery
# =========================================================================
def find_latest_log(self) -> Path | None:
"""Find the most recent snakemake log file.
Returns:
Path to the most recent log file, or None if no logs exist.
"""
if not _cached_exists(self.log_dir):
return None
# Files from glob already exist at time of iteration; no need to re-check
logs = list(self.log_dir.glob(LOG_GLOB_PATTERN))
if not logs:
return None
logs.sort(key=safe_mtime)
return logs[-1]
def find_all_logs(self) -> list[Path]:
"""Find all snakemake log files, sorted by modification time.
Returns:
List of paths sorted oldest to newest.
"""
if not _cached_exists(self.log_dir):
return []
# Files from glob already exist at time of iteration; no need to re-check
logs = list(self.log_dir.glob(LOG_GLOB_PATTERN))
logs.sort(key=safe_mtime)
return logs
def find_logs_sorted_newest_first(self) -> list[Path]:
"""Find all snakemake log files, sorted newest first.
Returns:
List of paths sorted newest to oldest.
"""
logs = self.find_all_logs()
logs.reverse()
return logs
# =========================================================================
# Metadata file discovery
# =========================================================================
def get_metadata_files(self) -> Iterator[Path]:
"""Iterate over all metadata files.
Yields:
Path to each metadata file.
"""
if not _cached_exists(self.metadata_dir):
return
for f in self.metadata_dir.rglob("*"):
if f.is_file():
yield f
def count_metadata_files(self) -> int:
"""Count the number of metadata files.
Returns:
Number of metadata files.
"""
if not _cached_exists(self.metadata_dir):
return 0
return sum(1 for f in self.metadata_dir.rglob("*") if f.is_file())
# =========================================================================
# Incomplete marker handling
# =========================================================================
def get_incomplete_markers(self) -> Iterator[Path]:
"""Iterate over incomplete job markers.
Yields:
Path to each incomplete marker file.
"""
if not _cached_exists(self.incomplete_dir):
return
for marker in self.incomplete_dir.rglob("*"):
if marker.is_file() and marker.name != "migration_underway":
yield marker
def decode_incomplete_marker(self, marker: Path) -> Path | None:
"""Decode an incomplete marker filename to get the output path.
Args:
marker: Path to the marker file.
Returns:
Decoded output file path, or None if decoding fails.
"""
try:
decoded = base64.b64decode(marker.name).decode("utf-8")
return Path(decoded)
except (ValueError, UnicodeDecodeError):
return None
# =========================================================================
# Job log discovery
# =========================================================================
def get_job_log(
self,
rule: str,
wildcards: dict[str, str] | None = None,
job_id: int | str | None = None,
) -> Path | None:
"""Find the log file for a specific job.
Searches common log locations for a file matching the rule
and optional wildcards/job_id.
Args:
rule: Name of the rule.
wildcards: Optional wildcard values.
job_id: Optional job ID.
Returns:
Path to the log file if found, None otherwise.
"""
search_paths: list[Path] = []
# .snakemake/log/ directory
if _cached_exists(self.log_dir):
search_paths.extend(self.log_dir.glob(f"*{rule}*"))
if job_id is not None:
search_paths.extend(self.log_dir.glob(f"*job{job_id}*"))
# logs/ directory (common convention)
logs_dir = self.workflow_dir / "logs"
search_paths.extend(self._search_log_dir(logs_dir, rule, wildcards))
# log/ directory (another common convention)
log_dir = self.workflow_dir / "log"
search_paths.extend(self._search_log_dir(log_dir, rule, wildcards))
# Sort by modification time (newest first) and return first match
# is_file() already confirms existence, no need for additional exists check
existing_logs = [p for p in search_paths if p.is_file()]
if existing_logs:
existing_logs.sort(key=safe_mtime, reverse=True)
return existing_logs[0]
return None
def _search_log_dir(
self,
log_dir: Path,
rule: str,
wildcards: dict[str, str] | None,
) -> list[Path]:
"""Search a log directory for matching logs."""
paths: list[Path] = []
if not _cached_exists(log_dir):
return paths
paths.extend(log_dir.glob(f"**/{rule}*"))
rule_log_dir = log_dir / rule
if _cached_exists(rule_log_dir):
paths.extend(rule_log_dir.glob("*"))
if wildcards:
for wc_value in wildcards.values():
if wc_value:
paths.extend(log_dir.glob(f"**/*{wc_value}*"))
return paths
# =========================================================================
# Profile discovery
# =========================================================================
def find_profile(self, max_levels: int = 6) -> Path | None:
"""Search for a profile file in workflow and parent directories.
Args:
max_levels: Maximum parent levels to search (including current).
Returns:
Path to the found profile, or None if not found.
"""
current = self.workflow_dir.resolve()
for _ in range(max_levels):
profile_path = current / DEFAULT_PROFILE_NAME
if _cached_exists(profile_path):
return profile_path
if current.parent == current:
break
current = current.parent
return None
# =========================================================================
# Validation
# =========================================================================
def validate(self) -> None:
"""Validate that this is a valid workflow directory.
Raises:
ValueError: If .snakemake directory doesn't exist.
"""
if not _cached_exists(self.snakemake_dir):
raise ValueError(f"No .snakemake directory found in {self.workflow_dir}")
|
Attributes
default_profile
property
Path to default profile file (.snakesee-profile.json).
events_file
property
Path to snakesee events file (.snakesee_events.jsonl).
exists
property
Check if this is a valid workflow directory.
has_events
property
Check if events file exists and has content.
has_incomplete
property
Check if incomplete directory exists and contains markers.
has_locks
property
Check if locks directory exists and contains files.
has_logs
property
Check if log directory exists and contains logs.
Check if metadata directory exists.
Whether a SQLite metadata database exists.
incomplete_dir
property
Path to .snakemake/incomplete/ directory.
locks_dir
property
Path to .snakemake/locks/ directory.
log_dir
property
Path to .snakemake/log/ directory.
Path to Snakemake's SQLite metadata database.
Path to .snakemake/metadata/ directory.
snakemake_dir
property
Path to .snakemake/ directory.
validation_log
property
Path to validation log file (.snakesee_validation.log).
Functions
count_metadata_files() -> int
Count the number of metadata files.
Returns:
| Type |
Description |
int
|
Number of metadata files.
|
Source code in snakesee/state/paths.py
| def count_metadata_files(self) -> int:
"""Count the number of metadata files.
Returns:
Number of metadata files.
"""
if not _cached_exists(self.metadata_dir):
return 0
return sum(1 for f in self.metadata_dir.rglob("*") if f.is_file())
|
decode_incomplete_marker
decode_incomplete_marker(marker: Path) -> Path | None
Decode an incomplete marker filename to get the output path.
Parameters:
| Name |
Type |
Description |
Default |
marker
|
Path
|
|
required
|
Returns:
| Type |
Description |
Path | None
|
Decoded output file path, or None if decoding fails.
|
Source code in snakesee/state/paths.py
| def decode_incomplete_marker(self, marker: Path) -> Path | None:
"""Decode an incomplete marker filename to get the output path.
Args:
marker: Path to the marker file.
Returns:
Decoded output file path, or None if decoding fails.
"""
try:
decoded = base64.b64decode(marker.name).decode("utf-8")
return Path(decoded)
except (ValueError, UnicodeDecodeError):
return None
|
find_all_logs
find_all_logs() -> list[Path]
Find all snakemake log files, sorted by modification time.
Returns:
| Type |
Description |
list[Path]
|
List of paths sorted oldest to newest.
|
Source code in snakesee/state/paths.py
| def find_all_logs(self) -> list[Path]:
"""Find all snakemake log files, sorted by modification time.
Returns:
List of paths sorted oldest to newest.
"""
if not _cached_exists(self.log_dir):
return []
# Files from glob already exist at time of iteration; no need to re-check
logs = list(self.log_dir.glob(LOG_GLOB_PATTERN))
logs.sort(key=safe_mtime)
return logs
|
find_latest_log
find_latest_log() -> Path | None
Find the most recent snakemake log file.
Returns:
| Type |
Description |
Path | None
|
Path to the most recent log file, or None if no logs exist.
|
Source code in snakesee/state/paths.py
| def find_latest_log(self) -> Path | None:
"""Find the most recent snakemake log file.
Returns:
Path to the most recent log file, or None if no logs exist.
"""
if not _cached_exists(self.log_dir):
return None
# Files from glob already exist at time of iteration; no need to re-check
logs = list(self.log_dir.glob(LOG_GLOB_PATTERN))
if not logs:
return None
logs.sort(key=safe_mtime)
return logs[-1]
|
find_logs_sorted_newest_first
find_logs_sorted_newest_first() -> list[Path]
Find all snakemake log files, sorted newest first.
Returns:
| Type |
Description |
list[Path]
|
List of paths sorted newest to oldest.
|
Source code in snakesee/state/paths.py
| def find_logs_sorted_newest_first(self) -> list[Path]:
"""Find all snakemake log files, sorted newest first.
Returns:
List of paths sorted newest to oldest.
"""
logs = self.find_all_logs()
logs.reverse()
return logs
|
find_profile
find_profile(max_levels: int = 6) -> Path | None
Search for a profile file in workflow and parent directories.
Parameters:
| Name |
Type |
Description |
Default |
max_levels
|
int
|
Maximum parent levels to search (including current).
|
6
|
Returns:
| Type |
Description |
Path | None
|
Path to the found profile, or None if not found.
|
Source code in snakesee/state/paths.py
| def find_profile(self, max_levels: int = 6) -> Path | None:
"""Search for a profile file in workflow and parent directories.
Args:
max_levels: Maximum parent levels to search (including current).
Returns:
Path to the found profile, or None if not found.
"""
current = self.workflow_dir.resolve()
for _ in range(max_levels):
profile_path = current / DEFAULT_PROFILE_NAME
if _cached_exists(profile_path):
return profile_path
if current.parent == current:
break
current = current.parent
return None
|
get_incomplete_markers
get_incomplete_markers() -> Iterator[Path]
Iterate over incomplete job markers.
Yields:
| Type |
Description |
Path
|
Path to each incomplete marker file.
|
Source code in snakesee/state/paths.py
| def get_incomplete_markers(self) -> Iterator[Path]:
"""Iterate over incomplete job markers.
Yields:
Path to each incomplete marker file.
"""
if not _cached_exists(self.incomplete_dir):
return
for marker in self.incomplete_dir.rglob("*"):
if marker.is_file() and marker.name != "migration_underway":
yield marker
|
get_job_log
get_job_log(rule: str, wildcards: dict[str, str] | None = None, job_id: int | str | None = None) -> Path | None
Find the log file for a specific job.
Searches common log locations for a file matching the rule
and optional wildcards/job_id.
Parameters:
| Name |
Type |
Description |
Default |
rule
|
str
|
|
required
|
wildcards
|
dict[str, str] | None
|
Optional wildcard values.
|
None
|
job_id
|
int | str | None
|
|
None
|
Returns:
| Type |
Description |
Path | None
|
Path to the log file if found, None otherwise.
|
Source code in snakesee/state/paths.py
| def get_job_log(
self,
rule: str,
wildcards: dict[str, str] | None = None,
job_id: int | str | None = None,
) -> Path | None:
"""Find the log file for a specific job.
Searches common log locations for a file matching the rule
and optional wildcards/job_id.
Args:
rule: Name of the rule.
wildcards: Optional wildcard values.
job_id: Optional job ID.
Returns:
Path to the log file if found, None otherwise.
"""
search_paths: list[Path] = []
# .snakemake/log/ directory
if _cached_exists(self.log_dir):
search_paths.extend(self.log_dir.glob(f"*{rule}*"))
if job_id is not None:
search_paths.extend(self.log_dir.glob(f"*job{job_id}*"))
# logs/ directory (common convention)
logs_dir = self.workflow_dir / "logs"
search_paths.extend(self._search_log_dir(logs_dir, rule, wildcards))
# log/ directory (another common convention)
log_dir = self.workflow_dir / "log"
search_paths.extend(self._search_log_dir(log_dir, rule, wildcards))
# Sort by modification time (newest first) and return first match
# is_file() already confirms existence, no need for additional exists check
existing_logs = [p for p in search_paths if p.is_file()]
if existing_logs:
existing_logs.sort(key=safe_mtime, reverse=True)
return existing_logs[0]
return None
|
get_metadata_files() -> Iterator[Path]
Iterate over all metadata files.
Yields:
| Type |
Description |
Path
|
Path to each metadata file.
|
Source code in snakesee/state/paths.py
| def get_metadata_files(self) -> Iterator[Path]:
"""Iterate over all metadata files.
Yields:
Path to each metadata file.
"""
if not _cached_exists(self.metadata_dir):
return
for f in self.metadata_dir.rglob("*"):
if f.is_file():
yield f
|
validate
Validate that this is a valid workflow directory.
Raises:
| Type |
Description |
ValueError
|
If .snakemake directory doesn't exist.
|
Source code in snakesee/state/paths.py
| def validate(self) -> None:
"""Validate that this is a valid workflow directory.
Raises:
ValueError: If .snakemake directory doesn't exist.
"""
if not _cached_exists(self.snakemake_dir):
raise ValueError(f"No .snakemake directory found in {self.workflow_dir}")
|