Skip to content

log_handler_script

Log handler script for Snakemake 8.x --log-handler-script integration.

This script converts Snakemake log messages to the same event format used by the snakesee logger plugin (Snakemake 9+), enabling real-time monitoring with snakesee for Snakemake 8.x users.

Usage

snakemake --log-handler-script $(snakesee log-handler-path) --cores 4

Note on execution tracking

This script is optimized for local execution where jobs start immediately after submission. For cluster/cloud executors (SLURM, AWS Batch, etc.), jobs may be queued before running. Since Snakemake 8.x doesn't provide a reliable signal for when a queued job actually starts executing, we emit job_started immediately upon job_info. This means "running" jobs may actually still be queued on cluster systems.

For accurate queue vs running tracking on clusters, consider using Snakemake 9+ with the logger plugin (--logger snakesee).

Functions

log_handler

log_handler(msg: dict[str, Any]) -> None

Main log handler function called by Snakemake for every log message.

This function is invoked by Snakemake when using --log-handler-script. It converts log messages to snakesee events for real-time monitoring.

Parameters:

Name Type Description Default
msg dict[str, Any]

Dictionary containing log message data with keys like: - level: Log level (job_info, job_finished, error, progress, etc.) - jobid: Job identifier - name/rule: Rule name - wildcards: Job wildcards - threads: Thread count - resources: Resource requirements - input/output: File lists - done/total: Progress counts

required
Source code in snakesee/log_handler_script.py
def log_handler(msg: dict[str, Any]) -> None:
    """Main log handler function called by Snakemake for every log message.

    This function is invoked by Snakemake when using --log-handler-script.
    It converts log messages to snakesee events for real-time monitoring.

    Args:
        msg: Dictionary containing log message data with keys like:
            - level: Log level (job_info, job_finished, error, progress, etc.)
            - jobid: Job identifier
            - name/rule: Rule name
            - wildcards: Job wildcards
            - threads: Thread count
            - resources: Resource requirements
            - input/output: File lists
            - done/total: Progress counts
    """
    level = msg.get("level")
    if level is None:
        return

    timestamp = time.time()

    # Emit workflow_started on first message
    _ensure_workflow_started(timestamp)

    # Map log levels to event handlers
    if level == "job_info":
        _handle_job_info(msg, timestamp)
    elif level == "job_started":
        _handle_job_started(msg, timestamp)
    elif level == "job_finished":
        _handle_job_finished(msg, timestamp)
    elif level in ("job_error", "error"):
        _handle_job_error(msg, timestamp)
    elif level == "progress":
        _handle_progress(msg, timestamp)