Skip to content

app

Textual App for snakesee TUI.

Classes

LayoutMode

Bases: Enum

Available TUI layout modes.

Source code in snakesee/tui/app.py
class LayoutMode(Enum):
    """Available TUI layout modes."""

    FULL = "full"
    COMPACT = "compact"
    MINIMAL = "minimal"

SnakeseeApp

Bases: App[None]

Textual application for monitoring Snakemake workflows.

Source code in snakesee/tui/app.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
class SnakeseeApp(App[None]):
    """Textual application for monitoring Snakemake workflows."""

    CSS_PATH = "app.tcss"
    BINDINGS: ClassVar[list[BindingType]] = [
        Binding("q,ctrl+c", "quit", "Quit"),
        Binding("tab", "cycle_layout", "Layout", priority=True),
        Binding("s", "cycle_sort_forward", "Sort →", show=False),
        Binding("S", "cycle_sort_back", "Sort ←", show=False),
        Binding("1", "sort_column(0)", show=False),
        Binding("2", "sort_column(1)", show=False),
        Binding("3", "sort_column(2)", show=False),
        Binding("4", "sort_column(3)", show=False),
        Binding("question_mark", "show_help", "Help"),
        Binding("f", "easter_pending", show=False),
        Binding("g", "easter_complete", show=False),
        Binding("slash", "open_filter", "Filter"),
        Binding("n", "next_match", show=False),
        Binding("N", "prev_match", show=False),
        Binding("escape", "clear_filter", "Clear filter"),
        Binding("p", "toggle_pause", "Pause"),
        Binding("e", "toggle_estimation", "Estimation"),
        Binding("w", "toggle_wildcard", "Wildcard"),
        Binding("a", "toggle_accessibility", "Accessibility"),
        Binding("r", "force_refresh", "Refresh"),
        Binding("ctrl+r", "hard_refresh", show=False),
        Binding("plus,equal", "rate_inc(0.5)", show=False),
        Binding("minus", "rate_dec(0.5)", show=False),
        Binding("greater_than_sign,full_stop", "rate_inc(5.0)", show=False),
        Binding("less_than_sign,comma", "rate_dec(5.0)", show=False),
        Binding("0", "rate_reset", show=False),
        Binding("G", "rate_min", show=False),
        Binding("left_square_bracket", "log_older(1)", show=False),
        Binding("right_square_bracket", "log_newer(1)", show=False),
        Binding("left_curly_bracket", "log_older(5)", show=False),
        Binding("right_curly_bracket", "log_newer(5)", show=False),
    ]

    paused: reactive[bool] = reactive(False)
    layout_mode: reactive[LayoutMode] = reactive(LayoutMode.FULL)
    sort_table: reactive[SortTable | None] = reactive(None, init=False)
    sort_column: reactive[int] = reactive(0, init=False)
    sort_ascending: reactive[bool] = reactive(True, init=False)
    filter_text: reactive[str | None] = reactive(None, init=False)
    accessibility_mode: reactive[bool] = reactive(False, init=False)
    refresh_rate: reactive[float] = reactive(DEFAULT_REFRESH_RATE, init=False)
    current_log_index: reactive[int] = reactive(0, init=False)

    _easter_timer: Timer | None = None
    _easter_pending: bool = False
    _refresh_timer: Timer | None = None
    _last_poll: tuple[WorkflowProgress, TimeEstimate | None] | None = None

    @property
    def last_poll(self) -> tuple[WorkflowProgress, TimeEstimate | None] | None:
        """The most recent (progress, estimate) snapshot taken by the refresh cycle.

        Read-only accessor for external tooling (e.g. the docs screenshot
        generator) so it does not have to reach into the private attribute.
        Returns None until the first refresh has polled the data source.
        """
        return self._last_poll

    def action_cycle_layout(self) -> None:
        """Cycle to the next layout mode."""
        modes = list(LayoutMode)
        idx = modes.index(self.layout_mode)
        self.layout_mode = modes[(idx + 1) % len(modes)]

    def watch_layout_mode(self, old: LayoutMode, new: LayoutMode) -> None:
        """Swap the CSS class on the root when the layout mode changes."""
        for mode in LayoutMode:
            self.remove_class(f"-{mode.value}")
        self.add_class(f"-{new.value}")

    def action_cycle_sort_forward(self) -> None:
        """Cycle the sort target one step forward (None → running → … → stats → None)."""
        self._cycle_sort(direction=1)

    def action_cycle_sort_back(self) -> None:
        """Cycle the sort target one step backward (None → stats → … → running → None)."""
        self._cycle_sort(direction=-1)

    def _cycle_sort(self, direction: int) -> None:
        """Advance the sort target by ``direction`` steps and refresh once.

        With no per-attribute watchers on the sort reactives, plain assignment is
        just storage; we explicitly call ``_refresh_panels`` once after all three
        settle so a single keystroke triggers exactly one redraw.
        """
        i = _SORT_CYCLE.index(self.sort_table)
        self.sort_table = _SORT_CYCLE[(i + direction) % len(_SORT_CYCLE)]
        self.sort_column = 0
        self.sort_ascending = True
        self._refresh_panels(ignore_pause=True)

    def action_sort_column(self, col: int) -> None:
        """Set the sort column for the active sort target, or toggle direction if same column.

        Columns are 0-indexed.  Each table enforces its own maximum:
        running and stats support columns 0-3; completions 0-2; pending 0-1.

        Args:
            col: Zero-based column index to sort by.
        """
        if self.sort_table is None:
            return
        if col >= _SORT_MAX_COLS[self.sort_table]:
            return
        if col == self.sort_column:
            self.sort_ascending = not self.sort_ascending
        else:
            self.sort_column = col
            self.sort_ascending = True
        self._refresh_panels(ignore_pause=True)

    def action_show_help(self) -> None:
        """Push the modal HelpScreen overlay."""
        self.push_screen(HelpScreen())

    def action_easter_pending(self) -> None:
        """Start (or restart) the 2-second window for completing the f-then-g easter egg."""
        self._easter_pending = True
        if self._easter_timer is not None:
            self._easter_timer.stop()
        self._easter_timer = self.set_timer(2.0, self._clear_easter)

    def _clear_easter(self) -> None:
        """Reset the easter-egg pending state when the 2-second window elapses."""
        self._easter_pending = False
        self._easter_timer = None

    def action_easter_complete(self) -> None:
        """Push the EasterEggScreen if the f-then-g chord finished within the window."""
        if self._easter_pending:
            self._easter_pending = False
            if self._easter_timer is not None:
                self._easter_timer.stop()
                self._easter_timer = None
            self.push_screen(EasterEggScreen())

    def action_open_filter(self) -> None:
        """Reveal the filter Input widget and focus it for keyboard entry."""
        f = self.query_one("#filter", Input)
        f.can_focus = True
        f.add_class("-active")
        f.focus()

    def on_input_submitted(self, event: Input.Submitted) -> None:
        """Apply the submitted filter text and hide the Input again."""
        if event.input.id != "filter":
            return
        self.filter_text = event.value or None
        event.input.remove_class("-active")
        event.input.value = ""
        self.set_focus(None)
        event.input.can_focus = False

    def action_clear_filter(self) -> None:
        """Hide the filter Input if focused, otherwise clear the filter and return to latest log."""
        focused = self.focused
        if isinstance(focused, Input) and focused.id == "filter":
            focused.remove_class("-active")
            focused.value = ""
            self.set_focus(None)
            focused.can_focus = False
            return
        self.filter_text = None
        self.current_log_index = 0

    def watch_filter_text(self, old: str | None, new: str | None) -> None:
        """Re-populate all panels when the filter text changes.

        Filter changes only come from user keystrokes, so the redraw bypasses
        the pause guard.
        """
        self._refresh_panels(ignore_pause=True)

    def action_next_match(self) -> None:
        """Move the cursor to the next row whose Rule column matches the filter."""
        self._jump_match(direction=1)

    def action_prev_match(self) -> None:
        """Move the cursor to the previous row whose Rule column matches the filter."""
        self._jump_match(direction=-1)

    def action_toggle_pause(self) -> None:
        """Toggle the paused state of the monitor and repaint immediately.

        The repaint bypasses the pause guard so the header's PAUSED indicator
        updates right away instead of waiting for the next interval tick.
        """
        self.paused = not self.paused
        self._refresh_panels(ignore_pause=True)

    def action_toggle_estimation(self) -> None:
        """Toggle time estimation and re-initialize the estimator in a worker thread."""
        self._data.use_estimation = not self._data.use_estimation
        self.run_worker(self._reinit_estimator, thread=True, exclusive=True)

    def action_toggle_wildcard(self) -> None:
        """Toggle wildcard conditioning and re-initialize the estimator in a worker thread."""
        self._data.use_wildcard_conditioning = not self._data.use_wildcard_conditioning
        self.run_worker(self._reinit_estimator, thread=True, exclusive=True)

    def action_toggle_accessibility(self) -> None:
        """Toggle accessibility mode, updating the accessibility config and refreshing panels.

        Toggling off restores the constructor-supplied config (which may be a
        custom override), not necessarily ``DEFAULT_CONFIG``.
        """
        self.accessibility_mode = not self.accessibility_mode
        self._accessibility_config = (
            ACCESSIBLE_CONFIG if self.accessibility_mode else self._base_accessibility_config
        )
        self._refresh_panels(ignore_pause=True)

    def action_force_refresh(self) -> None:
        """Force an immediate panel refresh, even when paused."""
        self._refresh_panels(ignore_pause=True)

    def action_hard_refresh(self) -> None:
        """Re-initialize the estimator in a worker thread and refresh panels."""
        self.run_worker(self._reinit_estimator, thread=True, exclusive=True)

    def watch_refresh_rate(self, old: float, new: float) -> None:
        """Restart the polling timer when the refresh rate changes."""
        if self._refresh_timer is not None:
            self._refresh_timer.stop()
        self._refresh_timer = self.set_interval(new, self._refresh_panels)
        self._data.refresh_rate = new
        self._data.update_cache_ttl()

    def action_rate_inc(self, delta: float) -> None:
        """Increase the refresh rate by ``delta`` seconds, clamped to MAX_REFRESH_RATE."""
        self.refresh_rate = min(MAX_REFRESH_RATE, self.refresh_rate + delta)

    def action_rate_dec(self, delta: float) -> None:
        """Decrease the refresh rate by ``delta`` seconds, clamped to MIN_REFRESH_RATE."""
        self.refresh_rate = max(MIN_REFRESH_RATE, self.refresh_rate - delta)

    def action_rate_reset(self) -> None:
        """Reset the refresh rate to the default value."""
        self.refresh_rate = DEFAULT_REFRESH_RATE

    def action_rate_min(self) -> None:
        """Set the refresh rate to the minimum value."""
        self.refresh_rate = MIN_REFRESH_RATE

    def watch_current_log_index(self, old: int, new: int) -> None:
        """Sync the log index to the data source and refresh panels when it changes.

        Log navigation only comes from user keystrokes, so the redraw bypasses
        the pause guard.
        """
        self._data.current_log_index = new
        self._refresh_panels(ignore_pause=True)

    def action_log_older(self, step: int) -> None:
        """Navigate to an older log file by ``step`` entries.

        Args:
            step: Number of log entries to step backward (toward older logs).
        """
        self._data.refresh_log_list()
        max_idx = self._data.available_log_count - 1
        if max_idx < 0:
            self.current_log_index = 0
            return
        self.current_log_index = min(max_idx, self.current_log_index + step)

    def action_log_newer(self, step: int) -> None:
        """Navigate to a newer log file by ``step`` entries.

        Args:
            step: Number of log entries to step forward (toward newer logs).
        """
        self.current_log_index = max(0, self.current_log_index - step)

    def _reinit_estimator(self) -> None:
        """Re-initialize the estimator (runs on a worker thread) then refresh panels.

        Loads silently: the Textual UI is already on screen, so the startup Rich
        progress spinner would corrupt the display.

        The refresh bypasses the pause guard: every path here is an explicit user
        action (Ctrl+R, estimation/wildcard toggles), so the result should render
        even while auto-refresh is paused.
        """
        self._data.init_estimator(show_progress=False)
        self.call_from_thread(lambda: self._refresh_panels(ignore_pause=True))

    def _sort_table_name(self) -> SortTableName | None:
        """Return the current sort target as the Literal alias the data source expects."""
        return self.sort_table.value if self.sort_table is not None else None

    def _jump_match(self, direction: int) -> None:
        """Jump the focused (or running) DataTable's cursor to the next/prev match.

        The filter matches against the table's Rule column, whose index varies by
        table (see ``_RULE_COLUMN_BY_TABLE``). Tables with no Rule column (e.g.
        ``#incomplete``) are skipped so a global n/N press can neither raise
        ``IndexError`` nor match against the wrong column.

        Args:
            direction: 1 to step forward, -1 to step backward.
        """
        if not self.filter_text:
            return
        focused = self.focused
        table = focused if isinstance(focused, DataTable) else self.query_one("#running", DataTable)
        rule_col = _RULE_COLUMN_BY_TABLE.get(table.id or "")
        if rule_col is None:
            return
        n = table.row_count
        if n == 0:
            return
        needle = self.filter_text.lower()
        start = (table.cursor_row + direction) % n
        i = start
        for _ in range(n):
            row = table.get_row_at(i)
            if rule_col < len(row) and needle in str(row[rule_col]).lower():
                table.move_cursor(row=i)
                return
            i = (i + direction) % n

    def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
        """Open the JobLogScreen for the selected job in the running/completions tables."""
        table_id = event.data_table.id
        if table_id not in {SortTable.RUNNING.value, SortTable.COMPLETIONS.value}:
            return
        # Reuse the latest poll snapshot — populated by _refresh_panels at most
        # `refresh_rate` seconds ago, which is also the data the table was rendered from.
        if self._last_poll is None:
            self._last_poll = self._data.poll_state()
        progress, _ = self._last_poll
        if table_id == SortTable.RUNNING.value:
            jobs = self._data.get_running_jobs_list(
                progress,
                filter_text=self.filter_text,
                sort_table=self._sort_table_name(),
                sort_column=self.sort_column,
                sort_ascending=self.sort_ascending,
            )
        else:  # completions
            jobs, _ = self._data.get_completions_list(
                progress,
                filter_text=self.filter_text,
                sort_table=self._sort_table_name(),
                sort_column=self.sort_column,
                sort_ascending=self.sort_ascending,
            )
        if event.cursor_row >= len(jobs):
            return
        job = jobs[event.cursor_row]

        # Remote jobs (e.g. AWS Batch) may have no local log file but still carry
        # an external id + console/CloudWatch links worth showing.
        from snakesee.tui.renderables import make_remote_job_info

        header_lines = make_remote_job_info(job)

        log_path = job.log_file
        if log_path is None:
            if not header_lines:
                return
            # No local log, but remote info is available — show just that.
            self.push_screen(JobLogScreen(None, [], header_lines=header_lines))
            return
        lines = self._data.read_log_tail(log_path, max_lines=500)
        self.push_screen(JobLogScreen(log_path, lines, header_lines=header_lines))

    def __init__(
        self,
        workflow_dir: Path,
        refresh_rate: float = DEFAULT_REFRESH_RATE,
        use_estimation: bool = True,
        profile_path: Path | None = None,
        use_wildcard_conditioning: bool = True,
        weighting_strategy: WeightingStrategy = "index",
        half_life_logs: int = 10,
        half_life_days: float = 7.0,
        accessibility_config: AccessibilityConfig | None = None,
    ) -> None:
        """Initialize the SnakeseeApp.

        Args:
            workflow_dir: Path to workflow directory containing ``.snakemake/``.
            refresh_rate: Refresh interval in seconds.
            use_estimation: Whether to enable time estimation.
            profile_path: Optional path to a timing profile for bootstrapping estimates.
            use_wildcard_conditioning: Whether to enable wildcard-conditioned estimates.
            weighting_strategy: Strategy for weighting historical data ("index" or "time").
            half_life_logs: Half-life in run count for index-based weighting.
            half_life_days: Half-life in days for time-based weighting.
            accessibility_config: Optional accessibility configuration override.
        """
        super().__init__()
        self._data = WorkflowDataSource(
            workflow_dir=workflow_dir,
            refresh_rate=refresh_rate,
            use_estimation=use_estimation,
            profile_path=profile_path,
            use_wildcard_conditioning=use_wildcard_conditioning,
            weighting_strategy=weighting_strategy,
            half_life_logs=half_life_logs,
            half_life_days=half_life_days,
        )
        # Keep the constructor-supplied config around so toggling accessibility
        # off restores it rather than falling back to DEFAULT_CONFIG.
        self._base_accessibility_config = accessibility_config or DEFAULT_CONFIG
        self._accessibility_config = self._base_accessibility_config
        # Resolve the workflow path once; the header truncates it per frame, so the
        # per-render cost stays a string slice rather than a filesystem resolve().
        self._resolved_workflow_dir = str(workflow_dir.resolve())
        # Seed the reactive without firing watch_refresh_rate. A plain assignment would
        # invoke the watcher (whenever refresh_rate differs from the reactive default),
        # which calls set_interval() before run() has started the event loop, raising
        # "RuntimeError: no running event loop". on_mount starts the timer once the loop
        # is running. The ignore works around Textual's stubs typing class-level reactive
        # access as the value type (float) rather than Reactive[float].
        self.set_reactive(SnakeseeApp.refresh_rate, refresh_rate)  # type: ignore[arg-type]
        # Whether the (lazily-added) Cost columns have been added to the
        # completions / stats tables. Once True they stay — the column and flag
        # must always move together to avoid an add_row cell-count mismatch.
        self._completions_cost_col: bool = False
        self._stats_cost_col: bool = False

    def compose(self) -> ComposeResult:
        """Compose the widget tree (header / progress / six tables / summary / footer)."""
        yield Static(id="header")
        yield Static(id="progress")
        with Container(id="body"):
            with Horizontal(id="left"):
                yield DataTable(id="running")
                yield DataTable(id="completions")
            with Horizontal(id="right"):
                yield DataTable(id="pending")
                yield DataTable(id="failed")
                yield DataTable(id="incomplete")
                yield DataTable(id="stats")
        yield Static(id="summary")
        filter_input = Input(placeholder="filter rules…", id="filter")
        filter_input.can_focus = False
        yield filter_input
        yield Footer()

    def on_mount(self) -> None:
        """Configure tables, populate panels, and start the refresh timer."""
        running = self.query_one("#running", DataTable)
        running.add_columns("#", "Rule", "Thr", "Started", "Elapsed", "Progress", "ETA")
        running.cursor_type = "row"
        completions = self.query_one("#completions", DataTable)
        completions.add_columns("#", "Rule", "Thr", "Duration", "Completed")
        completions.cursor_type = "row"
        self.query_one("#pending", DataTable).add_columns("Rule", "Est. Count")
        self.query_one("#failed", DataTable).add_columns("#", "Rule", "Job ID")
        self.query_one("#incomplete", DataTable).add_columns("Output File")
        self.query_one("#stats", DataTable).add_columns("Rule", "Thr", "Count", "Avg", "Std Dev")
        self._refresh_panels()
        # Start the polling timer now that the event loop is running. __init__ seeds
        # refresh_rate via set_reactive (no watcher), so this is the only timer created.
        self._refresh_timer = self.set_interval(self.refresh_rate, self._refresh_panels)
        self.add_class(f"-{self.layout_mode.value}")

    def _refresh_panels(self, ignore_pause: bool = False) -> None:
        """Poll the data source and update header/progress/summary/tables widgets.

        Args:
            ignore_pause: When True, refresh even if ``paused`` is set. Used by every
                explicit user-triggered redraw (force refresh, pause toggle, sort,
                filter, log nav, accessibility, estimator re-init) — pausing gates
                only the interval timer's automatic polling.
        """
        if self.paused and not ignore_pause:
            return
        progress, estimate = self._data.poll_state()
        self._last_poll = (progress, estimate)
        self.query_one("#header", Static).update(
            make_header(
                progress,
                self._resolved_workflow_dir,
                self.paused,
                self._data.event_reader,
                max_path_len=max(20, self.size.width - 80),
            )
        )
        self.query_one("#progress", Static).update(
            make_progress_panel(
                progress,
                estimate,
                self._data.use_estimation,
                self._accessibility_config,
            )
        )
        self.query_one("#summary", Static).update(make_summary_footer(progress))

        self._populate_running(progress)
        self._populate_completions(progress)
        self._populate_pending(progress)
        self._populate_failed(progress)
        self._populate_incomplete(progress)
        self._populate_stats()

    def _populate_running(self, progress: WorkflowProgress) -> None:
        """Populate the running-jobs table from the current workflow progress."""
        table = self.query_one("#running", DataTable)
        table.clear()
        jobs = self._data.get_running_jobs_list(
            progress,
            filter_text=self.filter_text,
            sort_table=self._sort_table_name(),
            sort_column=self.sort_column,
            sort_ascending=self.sort_ascending,
        )
        rows = running_rows(self._data.build_running_job_data(jobs))
        for idx, row in enumerate(rows):
            job = row.job
            elapsed_str = (
                format_duration(row.elapsed_seconds) if row.elapsed_seconds is not None else "?"
            )
            remaining_str = (
                f"~{format_duration(row.remaining_seconds)}"
                if row.remaining_seconds is not None
                else "?"
            )
            started_str = "?"
            if job.start_time is not None:
                started_str = datetime.fromtimestamp(job.start_time).strftime("%H:%M:%S")

            progress_str = "-"
            if row.tool_progress is not None:
                if row.tool_progress.percent_complete is not None:
                    progress_str = row.tool_progress.percent_str
                else:
                    progress_str = f"{row.tool_progress.items_processed:,} {row.tool_progress.unit}"

            threads_str = str(job.threads) if job.threads is not None else "-"
            job_id_str = str(job.job_id) if job.job_id else str(idx + 1)
            table.add_row(
                job_id_str,
                job.rule,
                threads_str,
                started_str,
                elapsed_str,
                progress_str,
                remaining_str,
            )

    def _populate_completions(self, progress: WorkflowProgress) -> None:
        """Populate the recent-completions table from the current workflow progress."""
        table = self.query_one("#completions", DataTable)
        table.clear()
        jobs, failed_job_ids = self._data.get_completions_list(
            progress,
            filter_text=self.filter_text,
            sort_table=self._sort_table_name(),
            sort_column=self.sort_column,
            sort_ascending=self.sort_ascending,
        )
        # Add a Cost column the first time any estimated cost is available, so
        # runs without cost estimation never get a blank column.
        if progress.total_cost_estimate is not None and not self._completions_cost_col:
            table.add_column("Cost", key="cost")
            self._completions_cost_col = True

        rows = completion_rows(jobs, failed_job_ids)
        for idx, row in enumerate(rows):
            job = row.job
            duration_str = format_duration(job.duration) if job.duration is not None else "?"
            threads_str = str(job.threads) if job.threads is not None else "-"
            completed_str = "?"
            if job.end_time is not None:
                completed_str = datetime.fromtimestamp(job.end_time).strftime("%H:%M:%S")
            job_id_str = str(job.job_id) if job.job_id else str(idx + 1)
            cells = [job_id_str, job.rule, threads_str, duration_str, completed_str]
            if self._completions_cost_col:
                cells.append(
                    format_cost(job.cost_estimate) if job.cost_estimate is not None else "-"
                )
            table.add_row(*cells)

    def _populate_pending(self, progress: WorkflowProgress) -> None:
        """Populate the pending-jobs table using inferred per-rule pending counts."""
        table = self.query_one("#pending", DataTable)
        table.clear()
        pending_rules = self._data.get_inferred_pending_rules(progress)
        if not pending_rules:
            return
        rows = pending_rows(pending_rules)
        if self.sort_table == SortTable.PENDING:
            rows = sort_rows(rows, self.sort_column, self.sort_ascending)
        for row in rows:
            table.add_row(row.rule, str(row.job_count))

    def _populate_failed(self, progress: WorkflowProgress) -> None:
        """Populate the failed-jobs table from ``progress.failed_jobs_list``."""
        table = self.query_one("#failed", DataTable)
        table.clear()
        rows = failed_rows(progress)
        for idx, row in enumerate(rows):
            job = row.job
            job_id_str = job.job_id if job.job_id else "-"
            table.add_row(str(idx + 1), job.rule, job_id_str)

    def _populate_incomplete(self, progress: WorkflowProgress) -> None:
        """Populate the incomplete-jobs table from ``progress.incomplete_jobs_list``."""
        table = self.query_one("#incomplete", DataTable)
        table.clear()
        for row in incomplete_rows(progress):
            table.add_row(row.display_path)

    def _populate_stats(self) -> None:
        """Populate the rule-statistics table from the data source's filtered stats."""
        table = self.query_one("#stats", DataTable)
        table.clear()
        if not self._data.use_estimation:
            return
        stats_list = self._data.get_filtered_stats()
        if not stats_list:
            return
        # Default ordering: most-frequently-run rules first.
        stats_list = sorted(stats_list, key=lambda s: s.count, reverse=True)
        rows = stats_rows(stats_list, self._data.thread_stats_dict())
        if self.sort_table == SortTable.STATS:
            rows = sort_stats_rows(rows, self.sort_column, self.sort_ascending)

        # Per-rule estimated cost: add a Cost column once any cost data exists.
        # Sourced from the live registry (the stats panel is itself registry-backed,
        # so this stays consistent with the rest of the stats frame).
        cost_by_rule = self._data.cost_by_rule()
        if cost_by_rule and not self._stats_cost_col:
            table.add_column("Cost", key="cost")
            self._stats_cost_col = True

        for row in rows:
            cells = [
                row.rule_display,
                row.threads,
                str(row.stats.count),
                format_duration(row.stats.mean_duration),
                format_duration(row.stats.std_dev) if row.stats.std_dev > 0 else "-",
            ]
            if self._stats_cost_col:
                rule_cost = cost_by_rule.get(row.stats.rule)
                cells.append(format_cost(rule_cost) if rule_cost is not None else "-")
            table.add_row(*cells)

Attributes

last_poll property
last_poll: tuple[WorkflowProgress, TimeEstimate | None] | None

The most recent (progress, estimate) snapshot taken by the refresh cycle.

Read-only accessor for external tooling (e.g. the docs screenshot generator) so it does not have to reach into the private attribute. Returns None until the first refresh has polled the data source.

Methods:

__init__
__init__(workflow_dir: Path, refresh_rate: float = DEFAULT_REFRESH_RATE, use_estimation: bool = True, profile_path: Path | None = None, use_wildcard_conditioning: bool = True, weighting_strategy: WeightingStrategy = 'index', half_life_logs: int = 10, half_life_days: float = 7.0, accessibility_config: AccessibilityConfig | None = None) -> None

Initialize the SnakeseeApp.

Parameters:

Name Type Description Default
workflow_dir Path

Path to workflow directory containing .snakemake/.

required
refresh_rate float

Refresh interval in seconds.

DEFAULT_REFRESH_RATE
use_estimation bool

Whether to enable time estimation.

True
profile_path Path | None

Optional path to a timing profile for bootstrapping estimates.

None
use_wildcard_conditioning bool

Whether to enable wildcard-conditioned estimates.

True
weighting_strategy WeightingStrategy

Strategy for weighting historical data ("index" or "time").

'index'
half_life_logs int

Half-life in run count for index-based weighting.

10
half_life_days float

Half-life in days for time-based weighting.

7.0
accessibility_config AccessibilityConfig | None

Optional accessibility configuration override.

None
Source code in snakesee/tui/app.py
def __init__(
    self,
    workflow_dir: Path,
    refresh_rate: float = DEFAULT_REFRESH_RATE,
    use_estimation: bool = True,
    profile_path: Path | None = None,
    use_wildcard_conditioning: bool = True,
    weighting_strategy: WeightingStrategy = "index",
    half_life_logs: int = 10,
    half_life_days: float = 7.0,
    accessibility_config: AccessibilityConfig | None = None,
) -> None:
    """Initialize the SnakeseeApp.

    Args:
        workflow_dir: Path to workflow directory containing ``.snakemake/``.
        refresh_rate: Refresh interval in seconds.
        use_estimation: Whether to enable time estimation.
        profile_path: Optional path to a timing profile for bootstrapping estimates.
        use_wildcard_conditioning: Whether to enable wildcard-conditioned estimates.
        weighting_strategy: Strategy for weighting historical data ("index" or "time").
        half_life_logs: Half-life in run count for index-based weighting.
        half_life_days: Half-life in days for time-based weighting.
        accessibility_config: Optional accessibility configuration override.
    """
    super().__init__()
    self._data = WorkflowDataSource(
        workflow_dir=workflow_dir,
        refresh_rate=refresh_rate,
        use_estimation=use_estimation,
        profile_path=profile_path,
        use_wildcard_conditioning=use_wildcard_conditioning,
        weighting_strategy=weighting_strategy,
        half_life_logs=half_life_logs,
        half_life_days=half_life_days,
    )
    # Keep the constructor-supplied config around so toggling accessibility
    # off restores it rather than falling back to DEFAULT_CONFIG.
    self._base_accessibility_config = accessibility_config or DEFAULT_CONFIG
    self._accessibility_config = self._base_accessibility_config
    # Resolve the workflow path once; the header truncates it per frame, so the
    # per-render cost stays a string slice rather than a filesystem resolve().
    self._resolved_workflow_dir = str(workflow_dir.resolve())
    # Seed the reactive without firing watch_refresh_rate. A plain assignment would
    # invoke the watcher (whenever refresh_rate differs from the reactive default),
    # which calls set_interval() before run() has started the event loop, raising
    # "RuntimeError: no running event loop". on_mount starts the timer once the loop
    # is running. The ignore works around Textual's stubs typing class-level reactive
    # access as the value type (float) rather than Reactive[float].
    self.set_reactive(SnakeseeApp.refresh_rate, refresh_rate)  # type: ignore[arg-type]
    # Whether the (lazily-added) Cost columns have been added to the
    # completions / stats tables. Once True they stay — the column and flag
    # must always move together to avoid an add_row cell-count mismatch.
    self._completions_cost_col: bool = False
    self._stats_cost_col: bool = False
action_clear_filter
action_clear_filter() -> None

Hide the filter Input if focused, otherwise clear the filter and return to latest log.

Source code in snakesee/tui/app.py
def action_clear_filter(self) -> None:
    """Hide the filter Input if focused, otherwise clear the filter and return to latest log."""
    focused = self.focused
    if isinstance(focused, Input) and focused.id == "filter":
        focused.remove_class("-active")
        focused.value = ""
        self.set_focus(None)
        focused.can_focus = False
        return
    self.filter_text = None
    self.current_log_index = 0
action_cycle_layout
action_cycle_layout() -> None

Cycle to the next layout mode.

Source code in snakesee/tui/app.py
def action_cycle_layout(self) -> None:
    """Cycle to the next layout mode."""
    modes = list(LayoutMode)
    idx = modes.index(self.layout_mode)
    self.layout_mode = modes[(idx + 1) % len(modes)]
action_cycle_sort_back
action_cycle_sort_back() -> None

Cycle the sort target one step backward (None → stats → … → running → None).

Source code in snakesee/tui/app.py
def action_cycle_sort_back(self) -> None:
    """Cycle the sort target one step backward (None → stats → … → running → None)."""
    self._cycle_sort(direction=-1)
action_cycle_sort_forward
action_cycle_sort_forward() -> None

Cycle the sort target one step forward (None → running → … → stats → None).

Source code in snakesee/tui/app.py
def action_cycle_sort_forward(self) -> None:
    """Cycle the sort target one step forward (None → running → … → stats → None)."""
    self._cycle_sort(direction=1)
action_easter_complete
action_easter_complete() -> None

Push the EasterEggScreen if the f-then-g chord finished within the window.

Source code in snakesee/tui/app.py
def action_easter_complete(self) -> None:
    """Push the EasterEggScreen if the f-then-g chord finished within the window."""
    if self._easter_pending:
        self._easter_pending = False
        if self._easter_timer is not None:
            self._easter_timer.stop()
            self._easter_timer = None
        self.push_screen(EasterEggScreen())
action_easter_pending
action_easter_pending() -> None

Start (or restart) the 2-second window for completing the f-then-g easter egg.

Source code in snakesee/tui/app.py
def action_easter_pending(self) -> None:
    """Start (or restart) the 2-second window for completing the f-then-g easter egg."""
    self._easter_pending = True
    if self._easter_timer is not None:
        self._easter_timer.stop()
    self._easter_timer = self.set_timer(2.0, self._clear_easter)
action_force_refresh
action_force_refresh() -> None

Force an immediate panel refresh, even when paused.

Source code in snakesee/tui/app.py
def action_force_refresh(self) -> None:
    """Force an immediate panel refresh, even when paused."""
    self._refresh_panels(ignore_pause=True)
action_hard_refresh
action_hard_refresh() -> None

Re-initialize the estimator in a worker thread and refresh panels.

Source code in snakesee/tui/app.py
def action_hard_refresh(self) -> None:
    """Re-initialize the estimator in a worker thread and refresh panels."""
    self.run_worker(self._reinit_estimator, thread=True, exclusive=True)
action_log_newer
action_log_newer(step: int) -> None

Navigate to a newer log file by step entries.

Parameters:

Name Type Description Default
step int

Number of log entries to step forward (toward newer logs).

required
Source code in snakesee/tui/app.py
def action_log_newer(self, step: int) -> None:
    """Navigate to a newer log file by ``step`` entries.

    Args:
        step: Number of log entries to step forward (toward newer logs).
    """
    self.current_log_index = max(0, self.current_log_index - step)
action_log_older
action_log_older(step: int) -> None

Navigate to an older log file by step entries.

Parameters:

Name Type Description Default
step int

Number of log entries to step backward (toward older logs).

required
Source code in snakesee/tui/app.py
def action_log_older(self, step: int) -> None:
    """Navigate to an older log file by ``step`` entries.

    Args:
        step: Number of log entries to step backward (toward older logs).
    """
    self._data.refresh_log_list()
    max_idx = self._data.available_log_count - 1
    if max_idx < 0:
        self.current_log_index = 0
        return
    self.current_log_index = min(max_idx, self.current_log_index + step)
action_next_match
action_next_match() -> None

Move the cursor to the next row whose Rule column matches the filter.

Source code in snakesee/tui/app.py
def action_next_match(self) -> None:
    """Move the cursor to the next row whose Rule column matches the filter."""
    self._jump_match(direction=1)
action_open_filter
action_open_filter() -> None

Reveal the filter Input widget and focus it for keyboard entry.

Source code in snakesee/tui/app.py
def action_open_filter(self) -> None:
    """Reveal the filter Input widget and focus it for keyboard entry."""
    f = self.query_one("#filter", Input)
    f.can_focus = True
    f.add_class("-active")
    f.focus()
action_prev_match
action_prev_match() -> None

Move the cursor to the previous row whose Rule column matches the filter.

Source code in snakesee/tui/app.py
def action_prev_match(self) -> None:
    """Move the cursor to the previous row whose Rule column matches the filter."""
    self._jump_match(direction=-1)
action_rate_dec
action_rate_dec(delta: float) -> None

Decrease the refresh rate by delta seconds, clamped to MIN_REFRESH_RATE.

Source code in snakesee/tui/app.py
def action_rate_dec(self, delta: float) -> None:
    """Decrease the refresh rate by ``delta`` seconds, clamped to MIN_REFRESH_RATE."""
    self.refresh_rate = max(MIN_REFRESH_RATE, self.refresh_rate - delta)
action_rate_inc
action_rate_inc(delta: float) -> None

Increase the refresh rate by delta seconds, clamped to MAX_REFRESH_RATE.

Source code in snakesee/tui/app.py
def action_rate_inc(self, delta: float) -> None:
    """Increase the refresh rate by ``delta`` seconds, clamped to MAX_REFRESH_RATE."""
    self.refresh_rate = min(MAX_REFRESH_RATE, self.refresh_rate + delta)
action_rate_min
action_rate_min() -> None

Set the refresh rate to the minimum value.

Source code in snakesee/tui/app.py
def action_rate_min(self) -> None:
    """Set the refresh rate to the minimum value."""
    self.refresh_rate = MIN_REFRESH_RATE
action_rate_reset
action_rate_reset() -> None

Reset the refresh rate to the default value.

Source code in snakesee/tui/app.py
def action_rate_reset(self) -> None:
    """Reset the refresh rate to the default value."""
    self.refresh_rate = DEFAULT_REFRESH_RATE
action_show_help
action_show_help() -> None

Push the modal HelpScreen overlay.

Source code in snakesee/tui/app.py
def action_show_help(self) -> None:
    """Push the modal HelpScreen overlay."""
    self.push_screen(HelpScreen())
action_sort_column
action_sort_column(col: int) -> None

Set the sort column for the active sort target, or toggle direction if same column.

Columns are 0-indexed. Each table enforces its own maximum: running and stats support columns 0-3; completions 0-2; pending 0-1.

Parameters:

Name Type Description Default
col int

Zero-based column index to sort by.

required
Source code in snakesee/tui/app.py
def action_sort_column(self, col: int) -> None:
    """Set the sort column for the active sort target, or toggle direction if same column.

    Columns are 0-indexed.  Each table enforces its own maximum:
    running and stats support columns 0-3; completions 0-2; pending 0-1.

    Args:
        col: Zero-based column index to sort by.
    """
    if self.sort_table is None:
        return
    if col >= _SORT_MAX_COLS[self.sort_table]:
        return
    if col == self.sort_column:
        self.sort_ascending = not self.sort_ascending
    else:
        self.sort_column = col
        self.sort_ascending = True
    self._refresh_panels(ignore_pause=True)
action_toggle_accessibility
action_toggle_accessibility() -> None

Toggle accessibility mode, updating the accessibility config and refreshing panels.

Toggling off restores the constructor-supplied config (which may be a custom override), not necessarily DEFAULT_CONFIG.

Source code in snakesee/tui/app.py
def action_toggle_accessibility(self) -> None:
    """Toggle accessibility mode, updating the accessibility config and refreshing panels.

    Toggling off restores the constructor-supplied config (which may be a
    custom override), not necessarily ``DEFAULT_CONFIG``.
    """
    self.accessibility_mode = not self.accessibility_mode
    self._accessibility_config = (
        ACCESSIBLE_CONFIG if self.accessibility_mode else self._base_accessibility_config
    )
    self._refresh_panels(ignore_pause=True)
action_toggle_estimation
action_toggle_estimation() -> None

Toggle time estimation and re-initialize the estimator in a worker thread.

Source code in snakesee/tui/app.py
def action_toggle_estimation(self) -> None:
    """Toggle time estimation and re-initialize the estimator in a worker thread."""
    self._data.use_estimation = not self._data.use_estimation
    self.run_worker(self._reinit_estimator, thread=True, exclusive=True)
action_toggle_pause
action_toggle_pause() -> None

Toggle the paused state of the monitor and repaint immediately.

The repaint bypasses the pause guard so the header's PAUSED indicator updates right away instead of waiting for the next interval tick.

Source code in snakesee/tui/app.py
def action_toggle_pause(self) -> None:
    """Toggle the paused state of the monitor and repaint immediately.

    The repaint bypasses the pause guard so the header's PAUSED indicator
    updates right away instead of waiting for the next interval tick.
    """
    self.paused = not self.paused
    self._refresh_panels(ignore_pause=True)
action_toggle_wildcard
action_toggle_wildcard() -> None

Toggle wildcard conditioning and re-initialize the estimator in a worker thread.

Source code in snakesee/tui/app.py
def action_toggle_wildcard(self) -> None:
    """Toggle wildcard conditioning and re-initialize the estimator in a worker thread."""
    self._data.use_wildcard_conditioning = not self._data.use_wildcard_conditioning
    self.run_worker(self._reinit_estimator, thread=True, exclusive=True)
compose
compose() -> ComposeResult

Compose the widget tree (header / progress / six tables / summary / footer).

Source code in snakesee/tui/app.py
def compose(self) -> ComposeResult:
    """Compose the widget tree (header / progress / six tables / summary / footer)."""
    yield Static(id="header")
    yield Static(id="progress")
    with Container(id="body"):
        with Horizontal(id="left"):
            yield DataTable(id="running")
            yield DataTable(id="completions")
        with Horizontal(id="right"):
            yield DataTable(id="pending")
            yield DataTable(id="failed")
            yield DataTable(id="incomplete")
            yield DataTable(id="stats")
    yield Static(id="summary")
    filter_input = Input(placeholder="filter rules…", id="filter")
    filter_input.can_focus = False
    yield filter_input
    yield Footer()
on_data_table_row_selected
on_data_table_row_selected(event: RowSelected) -> None

Open the JobLogScreen for the selected job in the running/completions tables.

Source code in snakesee/tui/app.py
def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
    """Open the JobLogScreen for the selected job in the running/completions tables."""
    table_id = event.data_table.id
    if table_id not in {SortTable.RUNNING.value, SortTable.COMPLETIONS.value}:
        return
    # Reuse the latest poll snapshot — populated by _refresh_panels at most
    # `refresh_rate` seconds ago, which is also the data the table was rendered from.
    if self._last_poll is None:
        self._last_poll = self._data.poll_state()
    progress, _ = self._last_poll
    if table_id == SortTable.RUNNING.value:
        jobs = self._data.get_running_jobs_list(
            progress,
            filter_text=self.filter_text,
            sort_table=self._sort_table_name(),
            sort_column=self.sort_column,
            sort_ascending=self.sort_ascending,
        )
    else:  # completions
        jobs, _ = self._data.get_completions_list(
            progress,
            filter_text=self.filter_text,
            sort_table=self._sort_table_name(),
            sort_column=self.sort_column,
            sort_ascending=self.sort_ascending,
        )
    if event.cursor_row >= len(jobs):
        return
    job = jobs[event.cursor_row]

    # Remote jobs (e.g. AWS Batch) may have no local log file but still carry
    # an external id + console/CloudWatch links worth showing.
    from snakesee.tui.renderables import make_remote_job_info

    header_lines = make_remote_job_info(job)

    log_path = job.log_file
    if log_path is None:
        if not header_lines:
            return
        # No local log, but remote info is available — show just that.
        self.push_screen(JobLogScreen(None, [], header_lines=header_lines))
        return
    lines = self._data.read_log_tail(log_path, max_lines=500)
    self.push_screen(JobLogScreen(log_path, lines, header_lines=header_lines))
on_input_submitted
on_input_submitted(event: Submitted) -> None

Apply the submitted filter text and hide the Input again.

Source code in snakesee/tui/app.py
def on_input_submitted(self, event: Input.Submitted) -> None:
    """Apply the submitted filter text and hide the Input again."""
    if event.input.id != "filter":
        return
    self.filter_text = event.value or None
    event.input.remove_class("-active")
    event.input.value = ""
    self.set_focus(None)
    event.input.can_focus = False
on_mount
on_mount() -> None

Configure tables, populate panels, and start the refresh timer.

Source code in snakesee/tui/app.py
def on_mount(self) -> None:
    """Configure tables, populate panels, and start the refresh timer."""
    running = self.query_one("#running", DataTable)
    running.add_columns("#", "Rule", "Thr", "Started", "Elapsed", "Progress", "ETA")
    running.cursor_type = "row"
    completions = self.query_one("#completions", DataTable)
    completions.add_columns("#", "Rule", "Thr", "Duration", "Completed")
    completions.cursor_type = "row"
    self.query_one("#pending", DataTable).add_columns("Rule", "Est. Count")
    self.query_one("#failed", DataTable).add_columns("#", "Rule", "Job ID")
    self.query_one("#incomplete", DataTable).add_columns("Output File")
    self.query_one("#stats", DataTable).add_columns("Rule", "Thr", "Count", "Avg", "Std Dev")
    self._refresh_panels()
    # Start the polling timer now that the event loop is running. __init__ seeds
    # refresh_rate via set_reactive (no watcher), so this is the only timer created.
    self._refresh_timer = self.set_interval(self.refresh_rate, self._refresh_panels)
    self.add_class(f"-{self.layout_mode.value}")
watch_current_log_index
watch_current_log_index(old: int, new: int) -> None

Sync the log index to the data source and refresh panels when it changes.

Log navigation only comes from user keystrokes, so the redraw bypasses the pause guard.

Source code in snakesee/tui/app.py
def watch_current_log_index(self, old: int, new: int) -> None:
    """Sync the log index to the data source and refresh panels when it changes.

    Log navigation only comes from user keystrokes, so the redraw bypasses
    the pause guard.
    """
    self._data.current_log_index = new
    self._refresh_panels(ignore_pause=True)
watch_filter_text
watch_filter_text(old: str | None, new: str | None) -> None

Re-populate all panels when the filter text changes.

Filter changes only come from user keystrokes, so the redraw bypasses the pause guard.

Source code in snakesee/tui/app.py
def watch_filter_text(self, old: str | None, new: str | None) -> None:
    """Re-populate all panels when the filter text changes.

    Filter changes only come from user keystrokes, so the redraw bypasses
    the pause guard.
    """
    self._refresh_panels(ignore_pause=True)
watch_layout_mode
watch_layout_mode(old: LayoutMode, new: LayoutMode) -> None

Swap the CSS class on the root when the layout mode changes.

Source code in snakesee/tui/app.py
def watch_layout_mode(self, old: LayoutMode, new: LayoutMode) -> None:
    """Swap the CSS class on the root when the layout mode changes."""
    for mode in LayoutMode:
        self.remove_class(f"-{mode.value}")
    self.add_class(f"-{new.value}")
watch_refresh_rate
watch_refresh_rate(old: float, new: float) -> None

Restart the polling timer when the refresh rate changes.

Source code in snakesee/tui/app.py
def watch_refresh_rate(self, old: float, new: float) -> None:
    """Restart the polling timer when the refresh rate changes."""
    if self._refresh_timer is not None:
        self._refresh_timer.stop()
    self._refresh_timer = self.set_interval(new, self._refresh_panels)
    self._data.refresh_rate = new
    self._data.update_cache_ttl()

SortTable

Bases: StrEnum

Sortable DataTable identifiers; values match the widget IDs in compose().

Source code in snakesee/tui/app.py
class SortTable(StrEnum):
    """Sortable DataTable identifiers; values match the widget IDs in compose()."""

    RUNNING = "running"
    COMPLETIONS = "completions"
    PENDING = "pending"
    STATS = "stats"

Functions: