Skip to content

job_registry

Job registry for centralized job state management.

This module provides a single source of truth for all job state in a workflow, replacing the scattered job tracking across multiple components.

Classes

Job dataclass

Mutable job state container.

Unlike the frozen JobInfo dataclass, this class allows state updates as jobs progress through their lifecycle.

Attributes:

Name Type Description
key str

Unique key for this job (rule + output hash or job_id).

rule str

The Snakemake rule name.

status JobStatus

Current job status.

job_id str | None

Snakemake job ID (may be None initially).

start_time float | None

Unix timestamp when job started.

end_time float | None

Unix timestamp when job completed.

wildcards dict[str, str]

Dictionary of wildcard values.

threads int | None

Number of threads allocated.

input_size int | None

Total input file size in bytes.

log_file Path | None

Path to job's log file.

Source code in snakesee/state/job_registry.py
@dataclass
class Job:
    """Mutable job state container.

    Unlike the frozen JobInfo dataclass, this class allows state updates
    as jobs progress through their lifecycle.

    Attributes:
        key: Unique key for this job (rule + output hash or job_id).
        rule: The Snakemake rule name.
        status: Current job status.
        job_id: Snakemake job ID (may be None initially).
        start_time: Unix timestamp when job started.
        end_time: Unix timestamp when job completed.
        wildcards: Dictionary of wildcard values.
        threads: Number of threads allocated.
        input_size: Total input file size in bytes.
        log_file: Path to job's log file.
    """

    key: str
    rule: str
    status: JobStatus = JobStatus.PENDING
    job_id: str | None = None
    start_time: float | None = None
    end_time: float | None = None
    wildcards: dict[str, str] = field(default_factory=dict)
    threads: int | None = None
    input_size: int | None = None
    log_file: Path | None = None
    stats_recorded: bool = False

    @property
    def elapsed(self) -> float | None:
        """Elapsed time in seconds since job started."""
        if self.start_time is None:
            return None
        if self.end_time is not None:
            return self.end_time - self.start_time
        from snakesee.state.clock import get_clock

        return get_clock().now() - self.start_time

    @property
    def duration(self) -> float | None:
        """Total duration for completed jobs."""
        if self.start_time is not None and self.end_time is not None:
            return self.end_time - self.start_time
        return None

    def to_job_info(self) -> JobInfo:
        """Convert to immutable JobInfo for backward compatibility."""
        from snakesee.models import JobInfo

        return JobInfo(
            rule=self.rule,
            job_id=self.job_id,
            wildcards=self.wildcards if self.wildcards else None,
            start_time=self.start_time,
            end_time=self.end_time,
            input_size=self.input_size,
            threads=self.threads,
            log_file=self.log_file,
        )

    @classmethod
    def from_job_info(cls, job_info: JobInfo, key: str | None = None) -> Job:
        """Create a Job from a JobInfo instance."""
        # Determine status based on JobInfo fields
        if job_info.end_time is not None:
            status = JobStatus.COMPLETED
        elif job_info.start_time is not None:
            status = JobStatus.RUNNING
        else:
            status = JobStatus.PENDING

        # Generate key if not provided
        if key is None:
            # Use job_id if available, otherwise use deterministic hash
            # This matches apply_job_info() to avoid duplicate entries
            key = job_info.job_id or f"{job_info.rule}:{hash(job_info)}"

        return cls(
            key=key,
            rule=job_info.rule,
            status=status,
            job_id=job_info.job_id,
            start_time=job_info.start_time,
            end_time=job_info.end_time,
            wildcards=dict(job_info.wildcards) if job_info.wildcards else {},
            threads=job_info.threads,
            input_size=job_info.input_size,
            log_file=job_info.log_file,
        )

Attributes

duration property
duration: float | None

Total duration for completed jobs.

elapsed property
elapsed: float | None

Elapsed time in seconds since job started.

Functions

from_job_info classmethod
from_job_info(job_info: JobInfo, key: str | None = None) -> Job

Create a Job from a JobInfo instance.

Source code in snakesee/state/job_registry.py
@classmethod
def from_job_info(cls, job_info: JobInfo, key: str | None = None) -> Job:
    """Create a Job from a JobInfo instance."""
    # Determine status based on JobInfo fields
    if job_info.end_time is not None:
        status = JobStatus.COMPLETED
    elif job_info.start_time is not None:
        status = JobStatus.RUNNING
    else:
        status = JobStatus.PENDING

    # Generate key if not provided
    if key is None:
        # Use job_id if available, otherwise use deterministic hash
        # This matches apply_job_info() to avoid duplicate entries
        key = job_info.job_id or f"{job_info.rule}:{hash(job_info)}"

    return cls(
        key=key,
        rule=job_info.rule,
        status=status,
        job_id=job_info.job_id,
        start_time=job_info.start_time,
        end_time=job_info.end_time,
        wildcards=dict(job_info.wildcards) if job_info.wildcards else {},
        threads=job_info.threads,
        input_size=job_info.input_size,
        log_file=job_info.log_file,
    )
to_job_info
to_job_info() -> JobInfo

Convert to immutable JobInfo for backward compatibility.

Source code in snakesee/state/job_registry.py
def to_job_info(self) -> JobInfo:
    """Convert to immutable JobInfo for backward compatibility."""
    from snakesee.models import JobInfo

    return JobInfo(
        rule=self.rule,
        job_id=self.job_id,
        wildcards=self.wildcards if self.wildcards else None,
        start_time=self.start_time,
        end_time=self.end_time,
        input_size=self.input_size,
        threads=self.threads,
        log_file=self.log_file,
    )

JobRegistry

Central registry for all job state.

Provides O(1) lookup by job key or job_id, and efficient iteration over jobs by status.

Example

registry = JobRegistry() job = registry.get_or_create("job_1", "align") job.status = JobStatus.RUNNING job.start_time = time.time() registry.update_indexes(job) running = registry.running() # [job]

Source code in snakesee/state/job_registry.py
class JobRegistry:
    """Central registry for all job state.

    Provides O(1) lookup by job key or job_id, and efficient iteration
    over jobs by status.

    Example:
        >>> registry = JobRegistry()
        >>> job = registry.get_or_create("job_1", "align")
        >>> job.status = JobStatus.RUNNING
        >>> job.start_time = time.time()
        >>> registry.update_indexes(job)
        >>> running = registry.running()  # [job]
    """

    def __init__(self) -> None:
        """Initialize empty registry."""
        self._lock = threading.RLock()
        self._jobs: dict[str, Job] = {}
        self._by_job_id: dict[str, str] = {}  # job_id -> key
        self._by_status: dict[JobStatus, set[str]] = {status: set() for status in JobStatus}
        self._by_rule: dict[str, set[str]] = {}

    def __len__(self) -> int:
        """Return number of jobs in registry."""
        with self._lock:
            return len(self._jobs)

    def __contains__(self, key: str) -> bool:
        """Check if job exists by key."""
        with self._lock:
            return key in self._jobs

    def get(self, key: str) -> Job | None:
        """Get job by key."""
        with self._lock:
            return self._jobs.get(key)

    def get_by_job_id(self, job_id: str) -> Job | None:
        """Get job by Snakemake job_id."""
        with self._lock:
            key = self._by_job_id.get(job_id)
            return self._jobs.get(key) if key else None

    def get_or_create(self, key: str, rule: str) -> tuple[Job, bool]:
        """Get existing job or create a new one.

        Args:
            key: Unique job key.
            rule: Rule name for new job.

        Returns:
            Tuple of (job, created) where created is True if job was new.
        """
        with self._lock:
            if key in self._jobs:
                return self._jobs[key], False

            job = Job(key=key, rule=rule)
            self._add_unlocked(job)
            return job, True

    def add(self, job: Job) -> None:
        """Add a job to the registry."""
        with self._lock:
            self._add_unlocked(job)

    def _add_unlocked(self, job: Job) -> None:
        """Internal add with index updates (caller must hold lock)."""
        self._jobs[job.key] = job
        self._by_status[job.status].add(job.key)

        if job.job_id is not None:
            self._by_job_id[job.job_id] = job.key

        if job.rule not in self._by_rule:
            self._by_rule[job.rule] = set()
        self._by_rule[job.rule].add(job.key)

    def update_indexes(self, job: Job, old_status: JobStatus | None = None) -> None:
        """Update indexes after job state change.

        Args:
            job: The job that was updated.
            old_status: Previous status if status changed, for index update.
        """
        with self._lock:
            self._update_indexes_unlocked(job, old_status)

    def _update_indexes_unlocked(self, job: Job, old_status: JobStatus | None = None) -> None:
        """Update indexes without acquiring lock (caller must hold lock)."""
        # Update job_id index if needed
        if job.job_id is not None and job.job_id not in self._by_job_id:
            self._by_job_id[job.job_id] = job.key

        # Update status index if status changed
        if old_status is not None and old_status != job.status:
            self._by_status[old_status].discard(job.key)
            self._by_status[job.status].add(job.key)

    def set_status(self, job: Job, status: JobStatus) -> None:
        """Update job status and indexes."""
        with self._lock:
            old_status = job.status
            job.status = status
            self._update_indexes_unlocked(job, old_status)

    def running(self) -> list[Job]:
        """Get all running jobs."""
        with self._lock:
            return [self._jobs[key] for key in self._by_status[JobStatus.RUNNING]]

    def completed(self) -> list[Job]:
        """Get all completed jobs."""
        with self._lock:
            return [self._jobs[key] for key in self._by_status[JobStatus.COMPLETED]]

    def failed(self) -> list[Job]:
        """Get all failed jobs."""
        with self._lock:
            return [self._jobs[key] for key in self._by_status[JobStatus.FAILED]]

    def incomplete(self) -> list[Job]:
        """Get all incomplete jobs."""
        with self._lock:
            return [self._jobs[key] for key in self._by_status[JobStatus.INCOMPLETE]]

    def submitted(self) -> list[Job]:
        """Get all submitted jobs that haven't started yet."""
        with self._lock:
            return [self._jobs[key] for key in self._by_status[JobStatus.SUBMITTED]]

    def pending(self) -> list[Job]:
        """Get all pending jobs (not yet submitted)."""
        with self._lock:
            return [self._jobs[key] for key in self._by_status[JobStatus.PENDING]]

    def by_rule(self, rule: str) -> list[Job]:
        """Get all jobs for a specific rule."""
        with self._lock:
            keys = self._by_rule.get(rule, set())
            return [self._jobs[key] for key in keys]

    def all_jobs(self) -> list[Job]:
        """Get all jobs in the registry."""
        with self._lock:
            return list(self._jobs.values())

    def running_job_infos(self) -> list[JobInfo]:
        """Get running jobs as JobInfo for backward compatibility."""
        return [job.to_job_info() for job in self.running()]

    def completed_job_infos(self) -> list[JobInfo]:
        """Get completed jobs as JobInfo for backward compatibility."""
        return [job.to_job_info() for job in self.completed()]

    def failed_job_infos(self) -> list[JobInfo]:
        """Get failed jobs as JobInfo for backward compatibility."""
        return [job.to_job_info() for job in self.failed()]

    def submitted_job_infos(self) -> list[JobInfo]:
        """Get submitted jobs as JobInfo for backward compatibility."""
        return [job.to_job_info() for job in self.submitted()]

    def clear(self) -> None:
        """Clear all jobs from the registry."""
        with self._lock:
            self._jobs.clear()
            self._by_job_id.clear()
            for status_set in self._by_status.values():
                status_set.clear()
            self._by_rule.clear()

    def apply_job_info(self, job_info: JobInfo, key: str | None = None) -> Job:
        """Add or update a job from a JobInfo.

        Args:
            job_info: JobInfo to apply.
            key: Optional key. If None, uses job_id or generates one.

        Returns:
            The created or updated Job.
        """
        with self._lock:
            # Determine key
            if key is None:
                key = job_info.job_id or f"{job_info.rule}:{hash(job_info)}"

            # Look up existing job without additional lock acquisition
            existing = self._jobs.get(key)
            if existing is None and job_info.job_id:
                existing_key = self._by_job_id.get(job_info.job_id)
                if existing_key:
                    existing = self._jobs.get(existing_key)

            if existing:
                # Update existing job
                old_status = existing.status
                if job_info.start_time is not None:
                    existing.start_time = job_info.start_time
                if job_info.end_time is not None:
                    existing.end_time = job_info.end_time
                    existing.status = JobStatus.COMPLETED
                elif job_info.start_time is not None:
                    existing.status = JobStatus.RUNNING
                if job_info.threads is not None:
                    existing.threads = job_info.threads
                if job_info.wildcards:
                    existing.wildcards = dict(job_info.wildcards)
                if job_info.input_size is not None:
                    existing.input_size = job_info.input_size
                if job_info.log_file is not None:
                    existing.log_file = job_info.log_file
                if job_info.job_id is not None and existing.job_id is None:
                    existing.job_id = job_info.job_id
                self._update_indexes_unlocked(existing, old_status)
                return existing
            else:
                # Create new job
                job = Job.from_job_info(job_info, key)
                self._add_unlocked(job)
                return job

    def apply_event(self, event: SnakeseeEvent) -> Job | None:
        """Apply a snakesee event to update job state.

        Args:
            event: The event to apply.

        Returns:
            The updated Job, or None if event couldn't be applied.
        """
        from snakesee.events import EventType

        # Ignore non-job events (e.g., WORKFLOW_STARTED, PROGRESS)
        # These don't have job_id/rule_name and would create synthetic "unknown" jobs
        if event.event_type not in {
            EventType.JOB_SUBMITTED,
            EventType.JOB_STARTED,
            EventType.JOB_FINISHED,
            EventType.JOB_ERROR,
        }:
            return None

        with self._lock:
            # Get rule name from event
            rule_name = event.rule_name or "unknown"

            # Use job_id as key if available (convert int to str for key)
            job_id_str = str(event.job_id) if event.job_id is not None else None
            key = job_id_str or f"{rule_name}:{event.timestamp}"

            # Look up job without additional lock acquisition
            job = self._jobs.get(key)
            if job is None and job_id_str:
                existing_key = self._by_job_id.get(job_id_str)
                if existing_key:
                    job = self._jobs.get(existing_key)

            if job is None:
                # Create new job from event
                job = Job(key=key, rule=rule_name)
                self._add_unlocked(job)
                if job_id_str:
                    job.job_id = job_id_str
                    self._by_job_id[job_id_str] = key
                if event.wildcards:
                    job.wildcards = dict(event.wildcards)
                if event.threads:
                    job.threads = event.threads

            old_status = job.status

            # Update based on event type
            if event.event_type == EventType.JOB_SUBMITTED:
                job.status = JobStatus.SUBMITTED
            elif event.event_type == EventType.JOB_STARTED:
                job.status = JobStatus.RUNNING
                job.start_time = event.timestamp
                if event.threads:
                    job.threads = event.threads
            elif event.event_type == EventType.JOB_FINISHED:
                job.status = JobStatus.COMPLETED
                job.end_time = event.timestamp
            elif event.event_type == EventType.JOB_ERROR:
                job.status = JobStatus.FAILED
                job.end_time = event.timestamp

            self._update_indexes_unlocked(job, old_status)
            return job

    def store_threads(self, job_id: str, threads: int) -> None:
        """Store thread count for a job by job_id.

        This is used when thread info comes from events before the job
        is fully tracked.
        """
        with self._lock:
            key = self._by_job_id.get(job_id)
            if key:
                job = self._jobs.get(key)
                if job:
                    job.threads = threads

Functions

__contains__
__contains__(key: str) -> bool

Check if job exists by key.

Source code in snakesee/state/job_registry.py
def __contains__(self, key: str) -> bool:
    """Check if job exists by key."""
    with self._lock:
        return key in self._jobs
__init__
__init__() -> None

Initialize empty registry.

Source code in snakesee/state/job_registry.py
def __init__(self) -> None:
    """Initialize empty registry."""
    self._lock = threading.RLock()
    self._jobs: dict[str, Job] = {}
    self._by_job_id: dict[str, str] = {}  # job_id -> key
    self._by_status: dict[JobStatus, set[str]] = {status: set() for status in JobStatus}
    self._by_rule: dict[str, set[str]] = {}
__len__
__len__() -> int

Return number of jobs in registry.

Source code in snakesee/state/job_registry.py
def __len__(self) -> int:
    """Return number of jobs in registry."""
    with self._lock:
        return len(self._jobs)
add
add(job: Job) -> None

Add a job to the registry.

Source code in snakesee/state/job_registry.py
def add(self, job: Job) -> None:
    """Add a job to the registry."""
    with self._lock:
        self._add_unlocked(job)
all_jobs
all_jobs() -> list[Job]

Get all jobs in the registry.

Source code in snakesee/state/job_registry.py
def all_jobs(self) -> list[Job]:
    """Get all jobs in the registry."""
    with self._lock:
        return list(self._jobs.values())
apply_event
apply_event(event: SnakeseeEvent) -> Job | None

Apply a snakesee event to update job state.

Parameters:

Name Type Description Default
event SnakeseeEvent

The event to apply.

required

Returns:

Type Description
Job | None

The updated Job, or None if event couldn't be applied.

Source code in snakesee/state/job_registry.py
def apply_event(self, event: SnakeseeEvent) -> Job | None:
    """Apply a snakesee event to update job state.

    Args:
        event: The event to apply.

    Returns:
        The updated Job, or None if event couldn't be applied.
    """
    from snakesee.events import EventType

    # Ignore non-job events (e.g., WORKFLOW_STARTED, PROGRESS)
    # These don't have job_id/rule_name and would create synthetic "unknown" jobs
    if event.event_type not in {
        EventType.JOB_SUBMITTED,
        EventType.JOB_STARTED,
        EventType.JOB_FINISHED,
        EventType.JOB_ERROR,
    }:
        return None

    with self._lock:
        # Get rule name from event
        rule_name = event.rule_name or "unknown"

        # Use job_id as key if available (convert int to str for key)
        job_id_str = str(event.job_id) if event.job_id is not None else None
        key = job_id_str or f"{rule_name}:{event.timestamp}"

        # Look up job without additional lock acquisition
        job = self._jobs.get(key)
        if job is None and job_id_str:
            existing_key = self._by_job_id.get(job_id_str)
            if existing_key:
                job = self._jobs.get(existing_key)

        if job is None:
            # Create new job from event
            job = Job(key=key, rule=rule_name)
            self._add_unlocked(job)
            if job_id_str:
                job.job_id = job_id_str
                self._by_job_id[job_id_str] = key
            if event.wildcards:
                job.wildcards = dict(event.wildcards)
            if event.threads:
                job.threads = event.threads

        old_status = job.status

        # Update based on event type
        if event.event_type == EventType.JOB_SUBMITTED:
            job.status = JobStatus.SUBMITTED
        elif event.event_type == EventType.JOB_STARTED:
            job.status = JobStatus.RUNNING
            job.start_time = event.timestamp
            if event.threads:
                job.threads = event.threads
        elif event.event_type == EventType.JOB_FINISHED:
            job.status = JobStatus.COMPLETED
            job.end_time = event.timestamp
        elif event.event_type == EventType.JOB_ERROR:
            job.status = JobStatus.FAILED
            job.end_time = event.timestamp

        self._update_indexes_unlocked(job, old_status)
        return job
apply_job_info
apply_job_info(job_info: JobInfo, key: str | None = None) -> Job

Add or update a job from a JobInfo.

Parameters:

Name Type Description Default
job_info JobInfo

JobInfo to apply.

required
key str | None

Optional key. If None, uses job_id or generates one.

None

Returns:

Type Description
Job

The created or updated Job.

Source code in snakesee/state/job_registry.py
def apply_job_info(self, job_info: JobInfo, key: str | None = None) -> Job:
    """Add or update a job from a JobInfo.

    Args:
        job_info: JobInfo to apply.
        key: Optional key. If None, uses job_id or generates one.

    Returns:
        The created or updated Job.
    """
    with self._lock:
        # Determine key
        if key is None:
            key = job_info.job_id or f"{job_info.rule}:{hash(job_info)}"

        # Look up existing job without additional lock acquisition
        existing = self._jobs.get(key)
        if existing is None and job_info.job_id:
            existing_key = self._by_job_id.get(job_info.job_id)
            if existing_key:
                existing = self._jobs.get(existing_key)

        if existing:
            # Update existing job
            old_status = existing.status
            if job_info.start_time is not None:
                existing.start_time = job_info.start_time
            if job_info.end_time is not None:
                existing.end_time = job_info.end_time
                existing.status = JobStatus.COMPLETED
            elif job_info.start_time is not None:
                existing.status = JobStatus.RUNNING
            if job_info.threads is not None:
                existing.threads = job_info.threads
            if job_info.wildcards:
                existing.wildcards = dict(job_info.wildcards)
            if job_info.input_size is not None:
                existing.input_size = job_info.input_size
            if job_info.log_file is not None:
                existing.log_file = job_info.log_file
            if job_info.job_id is not None and existing.job_id is None:
                existing.job_id = job_info.job_id
            self._update_indexes_unlocked(existing, old_status)
            return existing
        else:
            # Create new job
            job = Job.from_job_info(job_info, key)
            self._add_unlocked(job)
            return job
by_rule
by_rule(rule: str) -> list[Job]

Get all jobs for a specific rule.

Source code in snakesee/state/job_registry.py
def by_rule(self, rule: str) -> list[Job]:
    """Get all jobs for a specific rule."""
    with self._lock:
        keys = self._by_rule.get(rule, set())
        return [self._jobs[key] for key in keys]
clear
clear() -> None

Clear all jobs from the registry.

Source code in snakesee/state/job_registry.py
def clear(self) -> None:
    """Clear all jobs from the registry."""
    with self._lock:
        self._jobs.clear()
        self._by_job_id.clear()
        for status_set in self._by_status.values():
            status_set.clear()
        self._by_rule.clear()
completed
completed() -> list[Job]

Get all completed jobs.

Source code in snakesee/state/job_registry.py
def completed(self) -> list[Job]:
    """Get all completed jobs."""
    with self._lock:
        return [self._jobs[key] for key in self._by_status[JobStatus.COMPLETED]]
completed_job_infos
completed_job_infos() -> list[JobInfo]

Get completed jobs as JobInfo for backward compatibility.

Source code in snakesee/state/job_registry.py
def completed_job_infos(self) -> list[JobInfo]:
    """Get completed jobs as JobInfo for backward compatibility."""
    return [job.to_job_info() for job in self.completed()]
failed
failed() -> list[Job]

Get all failed jobs.

Source code in snakesee/state/job_registry.py
def failed(self) -> list[Job]:
    """Get all failed jobs."""
    with self._lock:
        return [self._jobs[key] for key in self._by_status[JobStatus.FAILED]]
failed_job_infos
failed_job_infos() -> list[JobInfo]

Get failed jobs as JobInfo for backward compatibility.

Source code in snakesee/state/job_registry.py
def failed_job_infos(self) -> list[JobInfo]:
    """Get failed jobs as JobInfo for backward compatibility."""
    return [job.to_job_info() for job in self.failed()]
get
get(key: str) -> Job | None

Get job by key.

Source code in snakesee/state/job_registry.py
def get(self, key: str) -> Job | None:
    """Get job by key."""
    with self._lock:
        return self._jobs.get(key)
get_by_job_id
get_by_job_id(job_id: str) -> Job | None

Get job by Snakemake job_id.

Source code in snakesee/state/job_registry.py
def get_by_job_id(self, job_id: str) -> Job | None:
    """Get job by Snakemake job_id."""
    with self._lock:
        key = self._by_job_id.get(job_id)
        return self._jobs.get(key) if key else None
get_or_create
get_or_create(key: str, rule: str) -> tuple[Job, bool]

Get existing job or create a new one.

Parameters:

Name Type Description Default
key str

Unique job key.

required
rule str

Rule name for new job.

required

Returns:

Type Description
tuple[Job, bool]

Tuple of (job, created) where created is True if job was new.

Source code in snakesee/state/job_registry.py
def get_or_create(self, key: str, rule: str) -> tuple[Job, bool]:
    """Get existing job or create a new one.

    Args:
        key: Unique job key.
        rule: Rule name for new job.

    Returns:
        Tuple of (job, created) where created is True if job was new.
    """
    with self._lock:
        if key in self._jobs:
            return self._jobs[key], False

        job = Job(key=key, rule=rule)
        self._add_unlocked(job)
        return job, True
incomplete
incomplete() -> list[Job]

Get all incomplete jobs.

Source code in snakesee/state/job_registry.py
def incomplete(self) -> list[Job]:
    """Get all incomplete jobs."""
    with self._lock:
        return [self._jobs[key] for key in self._by_status[JobStatus.INCOMPLETE]]
pending
pending() -> list[Job]

Get all pending jobs (not yet submitted).

Source code in snakesee/state/job_registry.py
def pending(self) -> list[Job]:
    """Get all pending jobs (not yet submitted)."""
    with self._lock:
        return [self._jobs[key] for key in self._by_status[JobStatus.PENDING]]
running
running() -> list[Job]

Get all running jobs.

Source code in snakesee/state/job_registry.py
def running(self) -> list[Job]:
    """Get all running jobs."""
    with self._lock:
        return [self._jobs[key] for key in self._by_status[JobStatus.RUNNING]]
running_job_infos
running_job_infos() -> list[JobInfo]

Get running jobs as JobInfo for backward compatibility.

Source code in snakesee/state/job_registry.py
def running_job_infos(self) -> list[JobInfo]:
    """Get running jobs as JobInfo for backward compatibility."""
    return [job.to_job_info() for job in self.running()]
set_status
set_status(job: Job, status: JobStatus) -> None

Update job status and indexes.

Source code in snakesee/state/job_registry.py
def set_status(self, job: Job, status: JobStatus) -> None:
    """Update job status and indexes."""
    with self._lock:
        old_status = job.status
        job.status = status
        self._update_indexes_unlocked(job, old_status)
store_threads
store_threads(job_id: str, threads: int) -> None

Store thread count for a job by job_id.

This is used when thread info comes from events before the job is fully tracked.

Source code in snakesee/state/job_registry.py
def store_threads(self, job_id: str, threads: int) -> None:
    """Store thread count for a job by job_id.

    This is used when thread info comes from events before the job
    is fully tracked.
    """
    with self._lock:
        key = self._by_job_id.get(job_id)
        if key:
            job = self._jobs.get(key)
            if job:
                job.threads = threads
submitted
submitted() -> list[Job]

Get all submitted jobs that haven't started yet.

Source code in snakesee/state/job_registry.py
def submitted(self) -> list[Job]:
    """Get all submitted jobs that haven't started yet."""
    with self._lock:
        return [self._jobs[key] for key in self._by_status[JobStatus.SUBMITTED]]
submitted_job_infos
submitted_job_infos() -> list[JobInfo]

Get submitted jobs as JobInfo for backward compatibility.

Source code in snakesee/state/job_registry.py
def submitted_job_infos(self) -> list[JobInfo]:
    """Get submitted jobs as JobInfo for backward compatibility."""
    return [job.to_job_info() for job in self.submitted()]
update_indexes
update_indexes(job: Job, old_status: JobStatus | None = None) -> None

Update indexes after job state change.

Parameters:

Name Type Description Default
job Job

The job that was updated.

required
old_status JobStatus | None

Previous status if status changed, for index update.

None
Source code in snakesee/state/job_registry.py
def update_indexes(self, job: Job, old_status: JobStatus | None = None) -> None:
    """Update indexes after job state change.

    Args:
        job: The job that was updated.
        old_status: Previous status if status changed, for index update.
    """
    with self._lock:
        self._update_indexes_unlocked(job, old_status)

JobStatus

Bases: Enum

Status of a job in the workflow.

Source code in snakesee/state/job_registry.py
class JobStatus(Enum):
    """Status of a job in the workflow."""

    PENDING = "pending"
    SUBMITTED = "submitted"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    INCOMPLETE = "incomplete"