state
Centralized state management for snakesee.
This module provides a unified architecture for managing workflow state, replacing scattered state across multiple components with a single source of truth.
Key classes: - Clock: Injectable time source for testability - EstimationConfig: Centralized configuration for estimation - WorkflowPaths: Centralized path resolution - JobRegistry: Single source of truth for job state (Phase 6) - RuleRegistry: Centralized rule statistics (Phase 7) - WorkflowState: Top-level state container (Phase 8)
Classes¶
Clock ¶
Bases: Protocol
Protocol for injectable time sources.
This enables deterministic testing by allowing tests to provide a controlled time source instead of using real wall-clock time.
Source code in snakesee/state/clock.py
ConfidenceThresholds
dataclass
¶
Thresholds for confidence-based decisions.
Attributes:
| Name | Type | Description |
|---|---|---|
size_scaling_min |
float
|
Min confidence to use size-scaled estimate. |
high_confidence |
float
|
Show simple estimate without range. |
medium_confidence |
float
|
Show range. |
low_confidence |
float
|
Show "rough" caveat. |
max_confidence |
float
|
Cap to avoid overconfidence. |
bootstrap_confidence |
float
|
Confidence when no progress yet. |
Source code in snakesee/state/config.py
ConfidenceWeights
dataclass
¶
Weights for computing overall confidence score.
Used in _estimate_weighted() to combine multiple factors. All weights should sum to 1.0.
Attributes:
| Name | Type | Description |
|---|---|---|
sample_size |
float
|
Weight for historical sample count. |
recency |
float
|
Weight for data recency. |
consistency |
float
|
Weight for recent run consistency. |
data_coverage |
float
|
Weight for fraction of rules with data. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If weights do not sum to 1.0. |
Source code in snakesee/state/config.py
EstimationConfig
dataclass
¶
Central configuration for time estimation.
This class consolidates all estimation-related configuration that was previously scattered across multiple files as magic numbers.
Attributes:
| Name | Type | Description |
|---|---|---|
weighting_strategy |
WeightingStrategy
|
How to weight historical data ("time" or "index"). |
half_life_days |
float
|
Half-life in days for time-based weighting. |
half_life_logs |
int
|
Half-life in run count for index-based weighting. |
variance |
VarianceMultipliers
|
Variance multiplier settings. |
confidence_weights |
ConfidenceWeights
|
Weights for confidence calculation. |
confidence_thresholds |
ConfidenceThresholds
|
Decision thresholds based on confidence. |
time |
TimeConstants
|
Time-related constants. |
min_samples_for_conditioning |
int
|
Minimum samples for wildcard conditioning. |
min_pairs_for_size_scaling |
int
|
Minimum pairs for size correlation. |
high_variance_cv |
float
|
Coefficient of variation threshold for "high variance". |
default_global_mean |
float
|
Fallback duration when no data available. |
Raises: ValueError: If any parameter is invalid (e.g., non-positive half_life).
Source code in snakesee/state/config.py
Functions¶
__post_init__ ¶
Validate configuration parameters.
Source code in snakesee/state/config.py
FrozenClock ¶
Clock frozen at a specific time for testing.
Useful for testing time-dependent logic without flakiness.
Attributes:
| Name | Type | Description |
|---|---|---|
frozen_time |
The frozen Unix timestamp. |
|
frozen_monotonic |
The frozen monotonic value. |
Example
clock = FrozenClock(1700000000.0) assert clock.now() == 1700000000.0
clock.advance(60.0) # Advance by 1 minute assert clock.now() == 1700000060.0
Source code in snakesee/state/clock.py
Functions¶
__init__ ¶
Initialize with specific frozen times.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
frozen_time
|
float | None
|
Unix timestamp to freeze at. Defaults to current time. |
None
|
frozen_monotonic
|
float | None
|
Monotonic value to freeze at. Defaults to 0.0. |
None
|
Source code in snakesee/state/clock.py
advance ¶
Advance the frozen time by the given number of seconds.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
seconds
|
float
|
Number of seconds to advance (can be negative). |
required |
monotonic ¶
now ¶
set_monotonic ¶
Set the frozen monotonic value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
float
|
Monotonic value to set. |
required |
set_time ¶
Set the frozen time to a specific timestamp.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timestamp
|
float
|
Unix timestamp to set. |
required |
Job
dataclass
¶
Mutable job state container.
Unlike the frozen JobInfo dataclass, this class allows state updates as jobs progress through their lifecycle.
Attributes:
| Name | Type | Description |
|---|---|---|
key |
str
|
Unique key for this job (rule + output hash or job_id). |
rule |
str
|
The Snakemake rule name. |
status |
JobStatus
|
Current job status. |
job_id |
str | None
|
Snakemake job ID (may be None initially). |
start_time |
float | None
|
Unix timestamp when job started. |
end_time |
float | None
|
Unix timestamp when job completed. |
wildcards |
dict[str, str]
|
Dictionary of wildcard values. |
threads |
int | None
|
Number of threads allocated. |
input_size |
int | None
|
Total input file size in bytes. |
log_file |
Path | None
|
Path to job's log file. |
Source code in snakesee/state/job_registry.py
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | |
Attributes¶
Functions¶
from_job_info
classmethod
¶
Create a Job from a JobInfo instance.
Source code in snakesee/state/job_registry.py
to_job_info ¶
to_job_info() -> JobInfo
Convert to immutable JobInfo for backward compatibility.
Source code in snakesee/state/job_registry.py
JobRegistry ¶
Central registry for all job state.
Provides O(1) lookup by job key or job_id, and efficient iteration over jobs by status.
Example
registry = JobRegistry() job = registry.get_or_create("job_1", "align") job.status = JobStatus.RUNNING job.start_time = time.time() registry.update_indexes(job) running = registry.running() # [job]
Source code in snakesee/state/job_registry.py
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 | |
Functions¶
__contains__ ¶
__init__ ¶
Initialize empty registry.
Source code in snakesee/state/job_registry.py
__len__ ¶
apply_event ¶
apply_event(event: SnakeseeEvent) -> Job | None
Apply a snakesee event to update job state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
SnakeseeEvent
|
The event to apply. |
required |
Returns:
| Type | Description |
|---|---|
Job | None
|
The updated Job, or None if event couldn't be applied. |
Source code in snakesee/state/job_registry.py
apply_job_info ¶
Add or update a job from a JobInfo.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_info
|
JobInfo
|
JobInfo to apply. |
required |
key
|
str | None
|
Optional key. If None, uses job_id or generates one. |
None
|
Returns:
| Type | Description |
|---|---|
Job
|
The created or updated Job. |
Source code in snakesee/state/job_registry.py
clear ¶
Clear all jobs from the registry.
get_or_create ¶
get_or_create(key: str, rule: str) -> tuple[Job, bool]
Get existing job or create a new one.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Unique job key. |
required |
rule
|
str
|
Rule name for new job. |
required |
Returns:
| Type | Description |
|---|---|
tuple[Job, bool]
|
Tuple of (job, created) where created is True if job was new. |
Source code in snakesee/state/job_registry.py
set_status ¶
Update job status and indexes.
store_threads ¶
Store thread count for a job by job_id.
This is used when thread info comes from events before the job is fully tracked.
Source code in snakesee/state/job_registry.py
JobStatus ¶
Bases: Enum
Status of a job in the workflow.
Source code in snakesee/state/job_registry.py
OffsetClock ¶
Clock with a fixed offset from system time.
Useful for simulating time shifts without fully freezing time.
Attributes:
| Name | Type | Description |
|---|---|---|
offset |
Seconds to add to system time. |
Source code in snakesee/state/clock.py
RuleRegistry ¶
Central registry for all rule timing statistics.
Consolidates what was previously scattered across TimeEstimator's rule_stats, thread_stats, and wildcard_stats dictionaries.
Example
registry = RuleRegistry() stats = registry.get_or_create("align") stats.record_completion(duration=100.0, timestamp=time.time(), threads=4) mean, var = registry.get_estimate("align", threads=4)
Source code in snakesee/state/rule_registry.py
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 | |
Functions¶
__contains__ ¶
__init__ ¶
__init__(config: EstimationConfig | None = None) -> None
Initialize empty registry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
EstimationConfig | None
|
Optional estimation configuration. |
None
|
Source code in snakesee/state/rule_registry.py
__len__ ¶
all_rules ¶
clear ¶
get ¶
get(rule: str) -> RuleStatistics | None
get_combination_stats ¶
get_combination_stats(rule: str, wildcards: dict[str, str] | None, threads: int | None, min_samples: int | None = None) -> RuleTimingStats | None
Get timing stats for a specific wildcard+threads combination.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rule
|
str
|
Rule name. |
required |
wildcards
|
dict[str, str] | None
|
Wildcard values dict. |
required |
threads
|
int | None
|
Thread count. |
required |
min_samples
|
int | None
|
Minimum samples required. If None, uses MIN_SAMPLES_FOR_CONDITIONING. Set to 1 to get stats even with a single historical run. |
None
|
Returns:
| Type | Description |
|---|---|
RuleTimingStats | None
|
RuleTimingStats for the combination, or None if not found or insufficient samples. |
Source code in snakesee/state/rule_registry.py
get_or_create ¶
get_or_create(rule: str) -> RuleStatistics
Get existing statistics or create new ones.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rule
|
str
|
Rule name. |
required |
Returns:
| Type | Description |
|---|---|
RuleStatistics
|
RuleStatistics for the rule. |
Source code in snakesee/state/rule_registry.py
global_mean_duration ¶
Get global mean duration across all rules.
Returns:
| Type | Description |
|---|---|
float
|
Mean duration in seconds, or config default if no data. |
Source code in snakesee/state/rule_registry.py
load_from_rule_stats ¶
load_from_rule_stats(rule_stats: dict[str, RuleTimingStats], thread_stats: dict[str, ThreadTimingStats] | None = None, wildcard_stats: dict[str, dict[str, WildcardTimingStats]] | None = None) -> None
Load from existing stats dictionaries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rule_stats
|
dict[str, RuleTimingStats]
|
Aggregate rule timing stats. |
required |
thread_stats
|
dict[str, ThreadTimingStats] | None
|
Thread-specific stats. |
None
|
wildcard_stats
|
dict[str, dict[str, WildcardTimingStats]] | None
|
Wildcard-specific stats. |
None
|
Source code in snakesee/state/rule_registry.py
record_completion ¶
record_completion(rule: str, duration: float, timestamp: float, threads: int | None = None, wildcards: dict[str, str] | None = None, input_size: int | None = None) -> None
Record a job completion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rule
|
str
|
Rule name. |
required |
duration
|
float
|
Job duration in seconds. |
required |
timestamp
|
float
|
Completion timestamp. |
required |
threads
|
int | None
|
Number of threads used. |
None
|
wildcards
|
dict[str, str] | None
|
Wildcard values. |
None
|
input_size
|
int | None
|
Input file size. |
None
|
Source code in snakesee/state/rule_registry.py
record_job_completion ¶
record_job_completion(job: Job) -> None
Record completion from a Job object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
Job
|
Completed job with timing data. |
required |
Source code in snakesee/state/rule_registry.py
rule_count ¶
set_expected_counts ¶
Set expected job counts for rules.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
counts
|
dict[str, int]
|
Dictionary of rule name to expected count. |
required |
Source code in snakesee/state/rule_registry.py
to_rule_stats_dict ¶
to_rule_stats_dict() -> dict[str, RuleTimingStats]
Convert to dict for backward compatibility with TimeEstimator.
to_thread_stats_dict ¶
to_thread_stats_dict() -> dict[str, ThreadTimingStats]
Convert to thread stats dict for backward compatibility.
Source code in snakesee/state/rule_registry.py
to_wildcard_stats_dict ¶
to_wildcard_stats_dict() -> dict[str, dict[str, WildcardTimingStats]]
Convert to wildcard stats dict for backward compatibility.
Source code in snakesee/state/rule_registry.py
total_sample_count ¶
typical_threads ¶
Get the typical thread count for a rule.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rule
|
str
|
Rule name. |
required |
Returns:
| Type | Description |
|---|---|
float
|
Weighted average thread count, or 1.0 if no thread data. |
Source code in snakesee/state/rule_registry.py
RuleStatistics
dataclass
¶
Complete timing statistics for a single rule.
Combines aggregate, thread-level, and wildcard-level statistics in a single container.
Attributes:
| Name | Type | Description |
|---|---|---|
rule |
str
|
The rule name. |
aggregate |
RuleTimingStats
|
Overall timing statistics for the rule. |
by_threads |
ThreadTimingStats | None
|
Thread-specific timing statistics. |
by_wildcard |
dict[str, WildcardTimingStats]
|
Wildcard-specific timing statistics. |
by_combination |
dict[str, RuleTimingStats]
|
Full wildcard+threads combination statistics. |
expected_count |
int | None
|
Expected number of jobs for this rule (if known). |
Source code in snakesee/state/rule_registry.py
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | |
Attributes¶
typical_threads
property
¶
Weighted average of thread counts from historical data.
Uses the number of observations at each thread count as weights. Defaults to 1.0 if no thread data is available.
Functions¶
__post_init__ ¶
Ensure aggregate has correct rule name.
Source code in snakesee/state/rule_registry.py
record_completion ¶
record_completion(duration: float, timestamp: float, threads: int | None = None, wildcards: dict[str, str] | None = None, input_size: int | None = None) -> None
Record a job completion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
duration
|
float
|
Job duration in seconds. |
required |
timestamp
|
float
|
Completion timestamp. |
required |
threads
|
int | None
|
Number of threads used. |
None
|
wildcards
|
dict[str, str] | None
|
Wildcard values for the job. |
None
|
input_size
|
int | None
|
Input file size in bytes. |
None
|
Source code in snakesee/state/rule_registry.py
SystemClock ¶
Default clock implementation using system time.
This is the production implementation that delegates to the standard library's time module.
Source code in snakesee/state/clock.py
TimeConstants
dataclass
¶
Time-related constants used throughout the codebase.
Attributes:
| Name | Type | Description |
|---|---|---|
seconds_per_day |
int
|
Number of seconds in a day. |
seconds_per_hour |
int
|
Number of seconds in an hour. |
seconds_per_minute |
int
|
Number of seconds in a minute. |
stale_workflow_threshold |
float
|
Seconds before workflow is considered stale. |
timing_mismatch_tolerance |
float
|
Tolerance for validation timing comparisons. |
Source code in snakesee/state/config.py
VarianceMultipliers
dataclass
¶
Variance multipliers for different estimation scenarios.
When only one observation exists, variance is computed as (mean * multiplier)^2. Lower multipliers = tighter confidence bounds.
Attributes:
| Name | Type | Description |
|---|---|---|
exact_thread_match |
float
|
High confidence - exact thread count match. |
exact_wildcard_match |
float
|
High confidence - exact wildcard match. |
size_scaled |
float
|
Medium confidence - input-size scaling. |
rule_fallback |
float
|
Standard - rule-level fallback. |
aggregate_fallback |
float
|
Standard - aggregate across thread counts. |
fuzzy_match |
float
|
Lower confidence - fuzzy rule matching. |
global_fallback |
float
|
Lowest confidence - no rule data. |
bootstrap |
float
|
No progress yet - initial estimate. |
Source code in snakesee/state/config.py
WorkflowPaths
dataclass
¶
Centralized path management for Snakemake workflow directories.
This frozen dataclass provides computed paths for all standard Snakemake directory locations, eliminating ad-hoc path construction.
Attributes:
| Name | Type | Description |
|---|---|---|
workflow_dir |
Path
|
Root directory of the workflow (contains .snakemake/). |
Example
paths = WorkflowPaths(Path("/my/workflow"))
Access computed paths¶
if paths.metadata_dir.exists(): for f in paths.get_metadata_files(): process(f)
Find logs¶
latest = paths.find_latest_log() all_logs = paths.find_all_logs()
Source code in snakesee/state/paths.py
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 | |
Attributes¶
default_profile
property
¶
Path to default profile file (.snakesee-profile.json).
has_incomplete
property
¶
Check if incomplete directory exists and contains markers.
validation_log
property
¶
Path to validation log file (.snakesee_validation.log).
Functions¶
count_metadata_files ¶
Count the number of metadata files.
Returns:
| Type | Description |
|---|---|
int
|
Number of metadata files. |
Source code in snakesee/state/paths.py
decode_incomplete_marker ¶
Decode an incomplete marker filename to get the output path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
marker
|
Path
|
Path to the marker file. |
required |
Returns:
| Type | Description |
|---|---|
Path | None
|
Decoded output file path, or None if decoding fails. |
Source code in snakesee/state/paths.py
find_all_logs ¶
Find all snakemake log files, sorted by modification time.
Returns:
| Type | Description |
|---|---|
list[Path]
|
List of paths sorted oldest to newest. |
Source code in snakesee/state/paths.py
find_latest_log ¶
Find the most recent snakemake log file.
Returns:
| Type | Description |
|---|---|
Path | None
|
Path to the most recent log file, or None if no logs exist. |
Source code in snakesee/state/paths.py
find_logs_sorted_newest_first ¶
Find all snakemake log files, sorted newest first.
Returns:
| Type | Description |
|---|---|
list[Path]
|
List of paths sorted newest to oldest. |
find_profile ¶
Search for a profile file in workflow and parent directories.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_levels
|
int
|
Maximum parent levels to search (including current). |
6
|
Returns:
| Type | Description |
|---|---|
Path | None
|
Path to the found profile, or None if not found. |
Source code in snakesee/state/paths.py
get_incomplete_markers ¶
Iterate over incomplete job markers.
Yields:
| Type | Description |
|---|---|
Path
|
Path to each incomplete marker file. |
Source code in snakesee/state/paths.py
get_job_log ¶
get_job_log(rule: str, wildcards: dict[str, str] | None = None, job_id: int | str | None = None) -> Path | None
Find the log file for a specific job.
Searches common log locations for a file matching the rule and optional wildcards/job_id.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rule
|
str
|
Name of the rule. |
required |
wildcards
|
dict[str, str] | None
|
Optional wildcard values. |
None
|
job_id
|
int | str | None
|
Optional job ID. |
None
|
Returns:
| Type | Description |
|---|---|
Path | None
|
Path to the log file if found, None otherwise. |
Source code in snakesee/state/paths.py
get_metadata_files ¶
Iterate over all metadata files.
Yields:
| Type | Description |
|---|---|
Path
|
Path to each metadata file. |
Source code in snakesee/state/paths.py
validate ¶
Validate that this is a valid workflow directory.
Raises:
| Type | Description |
|---|---|
ValueError
|
If .snakemake directory doesn't exist. |
Source code in snakesee/state/paths.py
WorkflowState
dataclass
¶
Unified container for all workflow state.
This is the single source of truth for workflow state, consolidating what was previously scattered across IncrementalLogReader, WorkflowProgress, TUI, EventAccumulator, and TimeEstimator.
Attributes:
| Name | Type | Description |
|---|---|---|
paths |
WorkflowPaths
|
Centralized path resolution. |
jobs |
JobRegistry
|
Registry of all job state. |
rules |
RuleRegistry
|
Registry of all rule timing statistics. |
clock |
Clock
|
Injectable time source. |
config |
EstimationConfig
|
Estimation configuration. |
status |
WorkflowStatus
|
Overall workflow status. |
total_jobs |
int | None
|
Total number of jobs in workflow (if known). |
start_time |
float | None
|
Workflow start timestamp. |
current_log |
Path | None
|
Path to current log file being monitored. |
Source code in snakesee/state/workflow_state.py
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 | |
Attributes¶
Functions¶
clear ¶
Clear all state.
Source code in snakesee/state/workflow_state.py
create
classmethod
¶
create(workflow_dir: Path, clock: Clock | None = None, config: EstimationConfig | None = None) -> WorkflowState
Create a new WorkflowState for a workflow directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workflow_dir
|
Path
|
Path to the workflow directory. |
required |
clock
|
Clock | None
|
Optional clock for time operations. |
None
|
config
|
EstimationConfig | None
|
Optional estimation configuration. |
None
|
Returns:
| Type | Description |
|---|---|
WorkflowState
|
New WorkflowState instance. |
Source code in snakesee/state/workflow_state.py
record_job_completion ¶
Record a job completion and update rule statistics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_key
|
str
|
Key of the completed job. |
required |
Source code in snakesee/state/workflow_state.py
to_progress ¶
to_progress() -> WorkflowProgress
Convert to WorkflowProgress for backward compatibility.
Returns:
| Type | Description |
|---|---|
WorkflowProgress
|
WorkflowProgress snapshot of current state. |
Source code in snakesee/state/workflow_state.py
update_status ¶
Update workflow status based on current state.
Source code in snakesee/state/workflow_state.py
Functions¶
get_clock ¶
get_clock() -> Clock
Get the current default clock.
Returns:
| Type | Description |
|---|---|
Clock
|
The currently configured clock instance. |
reset_clock ¶
set_clock ¶
set_clock(clock: Clock) -> None
Set the default clock (primarily for testing).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
clock
|
Clock
|
Clock instance to use as default. |
required |
Modules¶
clock ¶
Injectable clock for testable time handling.
This module provides a Clock protocol and implementations that allow time-dependent code to be tested deterministically.
Example usage
Production code¶
from snakesee.state import get_clock
def calculate_elapsed(start_time: float) -> float: return get_clock().now() - start_time
Test code¶
from snakesee.state import FrozenClock, set_clock
def test_elapsed(): clock = FrozenClock(1000.0) set_clock(clock)
assert calculate_elapsed(900.0) == 100.0
clock.advance(50.0)
assert calculate_elapsed(900.0) == 150.0
Classes¶
Clock ¶
Bases: Protocol
Protocol for injectable time sources.
This enables deterministic testing by allowing tests to provide a controlled time source instead of using real wall-clock time.
Source code in snakesee/state/clock.py
ClockUtils ¶
Centralized utilities for clock and duration handling.
This class provides consistent handling of duration calculations, including validation and clock skew detection. It centralizes logic that was previously duplicated in JobInfo.elapsed(), JobInfo.duration(), and RuleTimingStats._time_weighted_mean().
Example
Get validated duration¶
result = ClockUtils.calculate_duration(start_time, end_time, "job foo") if not result.is_valid: logger.warning("Clock skew detected for %s", result.context) return result.value
Or use the elapsed time helper¶
elapsed = ClockUtils.elapsed_since(start_time, context="job bar")
Source code in snakesee/state/clock.py
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 | |
Functions¶
staticmethod
¶age_seconds(timestamp: float, clock: Clock | None = None) -> float
Calculate age in seconds (time since timestamp).
This is a convenience method for calculating how old a timestamp is. The result is always non-negative.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timestamp
|
float
|
Unix timestamp to calculate age for. |
required |
clock
|
Clock | None
|
Optional clock instance (defaults to global clock). |
None
|
Returns:
| Type | Description |
|---|---|
float
|
Age in seconds (>= 0). |
Source code in snakesee/state/clock.py
staticmethod
¶calculate_duration(start_time: float, end_time: float, context: str | None = None) -> DurationResult
Calculate duration between two timestamps with validation.
This is the primary method for calculating durations. It handles clock skew by clamping negative durations to zero.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
start_time
|
float
|
Start timestamp (Unix epoch seconds). |
required |
end_time
|
float
|
End timestamp (Unix epoch seconds). |
required |
context
|
str | None
|
Optional description for logging (e.g., "job foo"). |
None
|
Returns:
| Type | Description |
|---|---|
DurationResult
|
DurationResult with validated duration and status. |
Source code in snakesee/state/clock.py
staticmethod
¶elapsed_since(start_time: float, context: str | None = None, clock: Clock | None = None) -> DurationResult
Calculate elapsed time since a start timestamp.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
start_time
|
float
|
Start timestamp (Unix epoch seconds). |
required |
context
|
str | None
|
Optional description for logging. |
None
|
clock
|
Clock | None
|
Optional clock instance (defaults to global clock). |
None
|
Returns:
| Type | Description |
|---|---|
DurationResult
|
DurationResult with validated elapsed time and status. |
Source code in snakesee/state/clock.py
staticmethod
¶validate_duration(duration: float, context: str | None = None) -> DurationResult
Validate a pre-calculated duration value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
duration
|
float
|
Duration value in seconds. |
required |
context
|
str | None
|
Optional description for logging. |
None
|
Returns:
| Type | Description |
|---|---|
DurationResult
|
DurationResult with validated duration and status. |
Source code in snakesee/state/clock.py
DurationResult ¶
Result of a duration calculation with validation status.
This class provides a way to handle duration calculations that may encounter clock skew or other issues without raising exceptions.
Attributes:
| Name | Type | Description |
|---|---|---|
value |
The calculated duration (>= 0, clamped if negative). |
|
raw_value |
The original calculated value before clamping. |
|
is_valid |
True if the duration was valid (non-negative). |
|
context |
Optional context description. |
Source code in snakesee/state/clock.py
FrozenClock ¶
Clock frozen at a specific time for testing.
Useful for testing time-dependent logic without flakiness.
Attributes:
| Name | Type | Description |
|---|---|---|
frozen_time |
The frozen Unix timestamp. |
|
frozen_monotonic |
The frozen monotonic value. |
Example
clock = FrozenClock(1700000000.0) assert clock.now() == 1700000000.0
clock.advance(60.0) # Advance by 1 minute assert clock.now() == 1700000060.0
Source code in snakesee/state/clock.py
Functions¶
Initialize with specific frozen times.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
frozen_time
|
float | None
|
Unix timestamp to freeze at. Defaults to current time. |
None
|
frozen_monotonic
|
float | None
|
Monotonic value to freeze at. Defaults to 0.0. |
None
|
Source code in snakesee/state/clock.py
Advance the frozen time by the given number of seconds.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
seconds
|
float
|
Number of seconds to advance (can be negative). |
required |
Set the frozen monotonic value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
float
|
Monotonic value to set. |
required |
Set the frozen time to a specific timestamp.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timestamp
|
float
|
Unix timestamp to set. |
required |
OffsetClock ¶
Clock with a fixed offset from system time.
Useful for simulating time shifts without fully freezing time.
Attributes:
| Name | Type | Description |
|---|---|---|
offset |
Seconds to add to system time. |
Source code in snakesee/state/clock.py
SystemClock ¶
Default clock implementation using system time.
This is the production implementation that delegates to the standard library's time module.
Source code in snakesee/state/clock.py
Functions¶
get_clock ¶
get_clock() -> Clock
Get the current default clock.
Returns:
| Type | Description |
|---|---|
Clock
|
The currently configured clock instance. |
reset_clock ¶
config ¶
Centralized configuration for time estimation.
This module consolidates all estimation-related configuration that was previously scattered across multiple files as magic numbers.
The EstimationConfig dataclass contains all thresholds, multipliers, and settings used by the time estimation system.
Classes¶
ConfidenceThresholds
dataclass
¶
Thresholds for confidence-based decisions.
Attributes:
| Name | Type | Description |
|---|---|---|
size_scaling_min |
float
|
Min confidence to use size-scaled estimate. |
high_confidence |
float
|
Show simple estimate without range. |
medium_confidence |
float
|
Show range. |
low_confidence |
float
|
Show "rough" caveat. |
max_confidence |
float
|
Cap to avoid overconfidence. |
bootstrap_confidence |
float
|
Confidence when no progress yet. |
Source code in snakesee/state/config.py
ConfidenceWeights
dataclass
¶
Weights for computing overall confidence score.
Used in _estimate_weighted() to combine multiple factors. All weights should sum to 1.0.
Attributes:
| Name | Type | Description |
|---|---|---|
sample_size |
float
|
Weight for historical sample count. |
recency |
float
|
Weight for data recency. |
consistency |
float
|
Weight for recent run consistency. |
data_coverage |
float
|
Weight for fraction of rules with data. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If weights do not sum to 1.0. |
Source code in snakesee/state/config.py
EstimationConfig
dataclass
¶
Central configuration for time estimation.
This class consolidates all estimation-related configuration that was previously scattered across multiple files as magic numbers.
Attributes:
| Name | Type | Description |
|---|---|---|
weighting_strategy |
WeightingStrategy
|
How to weight historical data ("time" or "index"). |
half_life_days |
float
|
Half-life in days for time-based weighting. |
half_life_logs |
int
|
Half-life in run count for index-based weighting. |
variance |
VarianceMultipliers
|
Variance multiplier settings. |
confidence_weights |
ConfidenceWeights
|
Weights for confidence calculation. |
confidence_thresholds |
ConfidenceThresholds
|
Decision thresholds based on confidence. |
time |
TimeConstants
|
Time-related constants. |
min_samples_for_conditioning |
int
|
Minimum samples for wildcard conditioning. |
min_pairs_for_size_scaling |
int
|
Minimum pairs for size correlation. |
high_variance_cv |
float
|
Coefficient of variation threshold for "high variance". |
default_global_mean |
float
|
Fallback duration when no data available. |
Raises: ValueError: If any parameter is invalid (e.g., non-positive half_life).
Source code in snakesee/state/config.py
Functions¶
Validate configuration parameters.
Source code in snakesee/state/config.py
TimeConstants
dataclass
¶
Time-related constants used throughout the codebase.
Attributes:
| Name | Type | Description |
|---|---|---|
seconds_per_day |
int
|
Number of seconds in a day. |
seconds_per_hour |
int
|
Number of seconds in an hour. |
seconds_per_minute |
int
|
Number of seconds in a minute. |
stale_workflow_threshold |
float
|
Seconds before workflow is considered stale. |
timing_mismatch_tolerance |
float
|
Tolerance for validation timing comparisons. |
Source code in snakesee/state/config.py
VarianceMultipliers
dataclass
¶
Variance multipliers for different estimation scenarios.
When only one observation exists, variance is computed as (mean * multiplier)^2. Lower multipliers = tighter confidence bounds.
Attributes:
| Name | Type | Description |
|---|---|---|
exact_thread_match |
float
|
High confidence - exact thread count match. |
exact_wildcard_match |
float
|
High confidence - exact wildcard match. |
size_scaled |
float
|
Medium confidence - input-size scaling. |
rule_fallback |
float
|
Standard - rule-level fallback. |
aggregate_fallback |
float
|
Standard - aggregate across thread counts. |
fuzzy_match |
float
|
Lower confidence - fuzzy rule matching. |
global_fallback |
float
|
Lowest confidence - no rule data. |
bootstrap |
float
|
No progress yet - initial estimate. |
Source code in snakesee/state/config.py
job_registry ¶
Job registry for centralized job state management.
This module provides a single source of truth for all job state in a workflow, replacing the scattered job tracking across multiple components.
Classes¶
Job
dataclass
¶
Mutable job state container.
Unlike the frozen JobInfo dataclass, this class allows state updates as jobs progress through their lifecycle.
Attributes:
| Name | Type | Description |
|---|---|---|
key |
str
|
Unique key for this job (rule + output hash or job_id). |
rule |
str
|
The Snakemake rule name. |
status |
JobStatus
|
Current job status. |
job_id |
str | None
|
Snakemake job ID (may be None initially). |
start_time |
float | None
|
Unix timestamp when job started. |
end_time |
float | None
|
Unix timestamp when job completed. |
wildcards |
dict[str, str]
|
Dictionary of wildcard values. |
threads |
int | None
|
Number of threads allocated. |
input_size |
int | None
|
Total input file size in bytes. |
log_file |
Path | None
|
Path to job's log file. |
Source code in snakesee/state/job_registry.py
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | |
Attributes¶
Functions¶
classmethod
¶Create a Job from a JobInfo instance.
Source code in snakesee/state/job_registry.py
to_job_info() -> JobInfo
Convert to immutable JobInfo for backward compatibility.
Source code in snakesee/state/job_registry.py
JobRegistry ¶
Central registry for all job state.
Provides O(1) lookup by job key or job_id, and efficient iteration over jobs by status.
Example
registry = JobRegistry() job = registry.get_or_create("job_1", "align") job.status = JobStatus.RUNNING job.start_time = time.time() registry.update_indexes(job) running = registry.running() # [job]
Source code in snakesee/state/job_registry.py
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 | |
Functions¶
Initialize empty registry.
Source code in snakesee/state/job_registry.py
apply_event(event: SnakeseeEvent) -> Job | None
Apply a snakesee event to update job state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
SnakeseeEvent
|
The event to apply. |
required |
Returns:
| Type | Description |
|---|---|
Job | None
|
The updated Job, or None if event couldn't be applied. |
Source code in snakesee/state/job_registry.py
Add or update a job from a JobInfo.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_info
|
JobInfo
|
JobInfo to apply. |
required |
key
|
str | None
|
Optional key. If None, uses job_id or generates one. |
None
|
Returns:
| Type | Description |
|---|---|
Job
|
The created or updated Job. |
Source code in snakesee/state/job_registry.py
Clear all jobs from the registry.
get_or_create(key: str, rule: str) -> tuple[Job, bool]
Get existing job or create a new one.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Unique job key. |
required |
rule
|
str
|
Rule name for new job. |
required |
Returns:
| Type | Description |
|---|---|
tuple[Job, bool]
|
Tuple of (job, created) where created is True if job was new. |
Source code in snakesee/state/job_registry.py
Update job status and indexes.
Store thread count for a job by job_id.
This is used when thread info comes from events before the job is fully tracked.
Source code in snakesee/state/job_registry.py
JobStatus ¶
Bases: Enum
Status of a job in the workflow.
Source code in snakesee/state/job_registry.py
paths ¶
Centralized path management for Snakemake workflow directories.
This module provides the WorkflowPaths class which centralizes all path construction for Snakemake workflows, eliminating ad-hoc path construction scattered across multiple functions.
Classes¶
WorkflowPaths
dataclass
¶
Centralized path management for Snakemake workflow directories.
This frozen dataclass provides computed paths for all standard Snakemake directory locations, eliminating ad-hoc path construction.
Attributes:
| Name | Type | Description |
|---|---|---|
workflow_dir |
Path
|
Root directory of the workflow (contains .snakemake/). |
Example
paths = WorkflowPaths(Path("/my/workflow"))
Access computed paths¶
if paths.metadata_dir.exists(): for f in paths.get_metadata_files(): process(f)
Find logs¶
latest = paths.find_latest_log() all_logs = paths.find_all_logs()
Source code in snakesee/state/paths.py
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 | |
Attributes¶
property
¶Path to default profile file (.snakesee-profile.json).
property
¶Check if incomplete directory exists and contains markers.
property
¶Path to validation log file (.snakesee_validation.log).
Functions¶
Count the number of metadata files.
Returns:
| Type | Description |
|---|---|
int
|
Number of metadata files. |
Source code in snakesee/state/paths.py
Decode an incomplete marker filename to get the output path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
marker
|
Path
|
Path to the marker file. |
required |
Returns:
| Type | Description |
|---|---|
Path | None
|
Decoded output file path, or None if decoding fails. |
Source code in snakesee/state/paths.py
Find all snakemake log files, sorted by modification time.
Returns:
| Type | Description |
|---|---|
list[Path]
|
List of paths sorted oldest to newest. |
Source code in snakesee/state/paths.py
Find the most recent snakemake log file.
Returns:
| Type | Description |
|---|---|
Path | None
|
Path to the most recent log file, or None if no logs exist. |
Source code in snakesee/state/paths.py
Find all snakemake log files, sorted newest first.
Returns:
| Type | Description |
|---|---|
list[Path]
|
List of paths sorted newest to oldest. |
Search for a profile file in workflow and parent directories.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_levels
|
int
|
Maximum parent levels to search (including current). |
6
|
Returns:
| Type | Description |
|---|---|
Path | None
|
Path to the found profile, or None if not found. |
Source code in snakesee/state/paths.py
Iterate over incomplete job markers.
Yields:
| Type | Description |
|---|---|
Path
|
Path to each incomplete marker file. |
Source code in snakesee/state/paths.py
get_job_log(rule: str, wildcards: dict[str, str] | None = None, job_id: int | str | None = None) -> Path | None
Find the log file for a specific job.
Searches common log locations for a file matching the rule and optional wildcards/job_id.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rule
|
str
|
Name of the rule. |
required |
wildcards
|
dict[str, str] | None
|
Optional wildcard values. |
None
|
job_id
|
int | str | None
|
Optional job ID. |
None
|
Returns:
| Type | Description |
|---|---|
Path | None
|
Path to the log file if found, None otherwise. |
Source code in snakesee/state/paths.py
Iterate over all metadata files.
Yields:
| Type | Description |
|---|---|
Path
|
Path to each metadata file. |
Source code in snakesee/state/paths.py
Validate that this is a valid workflow directory.
Raises:
| Type | Description |
|---|---|
ValueError
|
If .snakemake directory doesn't exist. |
Source code in snakesee/state/paths.py
Functions¶
clear_exists_cache ¶
Clear the filesystem existence cache.
Useful for testing or after significant filesystem operations.
rule_registry ¶
Rule registry for centralized rule timing statistics.
This module provides a single source of truth for all rule timing data, consolidating aggregate, thread-level, and wildcard-level statistics.
Classes¶
RuleRegistry ¶
Central registry for all rule timing statistics.
Consolidates what was previously scattered across TimeEstimator's rule_stats, thread_stats, and wildcard_stats dictionaries.
Example
registry = RuleRegistry() stats = registry.get_or_create("align") stats.record_completion(duration=100.0, timestamp=time.time(), threads=4) mean, var = registry.get_estimate("align", threads=4)
Source code in snakesee/state/rule_registry.py
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 | |
Functions¶
__init__(config: EstimationConfig | None = None) -> None
Initialize empty registry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
EstimationConfig | None
|
Optional estimation configuration. |
None
|
Source code in snakesee/state/rule_registry.py
get(rule: str) -> RuleStatistics | None
get_combination_stats(rule: str, wildcards: dict[str, str] | None, threads: int | None, min_samples: int | None = None) -> RuleTimingStats | None
Get timing stats for a specific wildcard+threads combination.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rule
|
str
|
Rule name. |
required |
wildcards
|
dict[str, str] | None
|
Wildcard values dict. |
required |
threads
|
int | None
|
Thread count. |
required |
min_samples
|
int | None
|
Minimum samples required. If None, uses MIN_SAMPLES_FOR_CONDITIONING. Set to 1 to get stats even with a single historical run. |
None
|
Returns:
| Type | Description |
|---|---|
RuleTimingStats | None
|
RuleTimingStats for the combination, or None if not found or insufficient samples. |
Source code in snakesee/state/rule_registry.py
get_or_create(rule: str) -> RuleStatistics
Get existing statistics or create new ones.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rule
|
str
|
Rule name. |
required |
Returns:
| Type | Description |
|---|---|
RuleStatistics
|
RuleStatistics for the rule. |
Source code in snakesee/state/rule_registry.py
Get global mean duration across all rules.
Returns:
| Type | Description |
|---|---|
float
|
Mean duration in seconds, or config default if no data. |
Source code in snakesee/state/rule_registry.py
load_from_rule_stats(rule_stats: dict[str, RuleTimingStats], thread_stats: dict[str, ThreadTimingStats] | None = None, wildcard_stats: dict[str, dict[str, WildcardTimingStats]] | None = None) -> None
Load from existing stats dictionaries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rule_stats
|
dict[str, RuleTimingStats]
|
Aggregate rule timing stats. |
required |
thread_stats
|
dict[str, ThreadTimingStats] | None
|
Thread-specific stats. |
None
|
wildcard_stats
|
dict[str, dict[str, WildcardTimingStats]] | None
|
Wildcard-specific stats. |
None
|
Source code in snakesee/state/rule_registry.py
record_completion(rule: str, duration: float, timestamp: float, threads: int | None = None, wildcards: dict[str, str] | None = None, input_size: int | None = None) -> None
Record a job completion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rule
|
str
|
Rule name. |
required |
duration
|
float
|
Job duration in seconds. |
required |
timestamp
|
float
|
Completion timestamp. |
required |
threads
|
int | None
|
Number of threads used. |
None
|
wildcards
|
dict[str, str] | None
|
Wildcard values. |
None
|
input_size
|
int | None
|
Input file size. |
None
|
Source code in snakesee/state/rule_registry.py
record_job_completion(job: Job) -> None
Record completion from a Job object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
Job
|
Completed job with timing data. |
required |
Source code in snakesee/state/rule_registry.py
Set expected job counts for rules.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
counts
|
dict[str, int]
|
Dictionary of rule name to expected count. |
required |
Source code in snakesee/state/rule_registry.py
to_rule_stats_dict() -> dict[str, RuleTimingStats]
Convert to dict for backward compatibility with TimeEstimator.
to_thread_stats_dict() -> dict[str, ThreadTimingStats]
Convert to thread stats dict for backward compatibility.
Source code in snakesee/state/rule_registry.py
to_wildcard_stats_dict() -> dict[str, dict[str, WildcardTimingStats]]
Convert to wildcard stats dict for backward compatibility.
Source code in snakesee/state/rule_registry.py
Get the typical thread count for a rule.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rule
|
str
|
Rule name. |
required |
Returns:
| Type | Description |
|---|---|
float
|
Weighted average thread count, or 1.0 if no thread data. |
Source code in snakesee/state/rule_registry.py
RuleStatistics
dataclass
¶
Complete timing statistics for a single rule.
Combines aggregate, thread-level, and wildcard-level statistics in a single container.
Attributes:
| Name | Type | Description |
|---|---|---|
rule |
str
|
The rule name. |
aggregate |
RuleTimingStats
|
Overall timing statistics for the rule. |
by_threads |
ThreadTimingStats | None
|
Thread-specific timing statistics. |
by_wildcard |
dict[str, WildcardTimingStats]
|
Wildcard-specific timing statistics. |
by_combination |
dict[str, RuleTimingStats]
|
Full wildcard+threads combination statistics. |
expected_count |
int | None
|
Expected number of jobs for this rule (if known). |
Source code in snakesee/state/rule_registry.py
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | |
Attributes¶
property
¶Weighted average of thread counts from historical data.
Uses the number of observations at each thread count as weights. Defaults to 1.0 if no thread data is available.
Functions¶
Ensure aggregate has correct rule name.
Source code in snakesee/state/rule_registry.py
record_completion(duration: float, timestamp: float, threads: int | None = None, wildcards: dict[str, str] | None = None, input_size: int | None = None) -> None
Record a job completion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
duration
|
float
|
Job duration in seconds. |
required |
timestamp
|
float
|
Completion timestamp. |
required |
threads
|
int | None
|
Number of threads used. |
None
|
wildcards
|
dict[str, str] | None
|
Wildcard values for the job. |
None
|
input_size
|
int | None
|
Input file size in bytes. |
None
|
Source code in snakesee/state/rule_registry.py
workflow_state ¶
Unified workflow state container.
This module provides the top-level WorkflowState class that serves as the single source of truth for all workflow state, combining job tracking, rule statistics, paths, and configuration.
Classes¶
WorkflowState
dataclass
¶
Unified container for all workflow state.
This is the single source of truth for workflow state, consolidating what was previously scattered across IncrementalLogReader, WorkflowProgress, TUI, EventAccumulator, and TimeEstimator.
Attributes:
| Name | Type | Description |
|---|---|---|
paths |
WorkflowPaths
|
Centralized path resolution. |
jobs |
JobRegistry
|
Registry of all job state. |
rules |
RuleRegistry
|
Registry of all rule timing statistics. |
clock |
Clock
|
Injectable time source. |
config |
EstimationConfig
|
Estimation configuration. |
status |
WorkflowStatus
|
Overall workflow status. |
total_jobs |
int | None
|
Total number of jobs in workflow (if known). |
start_time |
float | None
|
Workflow start timestamp. |
current_log |
Path | None
|
Path to current log file being monitored. |
Source code in snakesee/state/workflow_state.py
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 | |
Attributes¶
Functions¶
Clear all state.
Source code in snakesee/state/workflow_state.py
classmethod
¶create(workflow_dir: Path, clock: Clock | None = None, config: EstimationConfig | None = None) -> WorkflowState
Create a new WorkflowState for a workflow directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workflow_dir
|
Path
|
Path to the workflow directory. |
required |
clock
|
Clock | None
|
Optional clock for time operations. |
None
|
config
|
EstimationConfig | None
|
Optional estimation configuration. |
None
|
Returns:
| Type | Description |
|---|---|
WorkflowState
|
New WorkflowState instance. |
Source code in snakesee/state/workflow_state.py
Record a job completion and update rule statistics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_key
|
str
|
Key of the completed job. |
required |
Source code in snakesee/state/workflow_state.py
to_progress() -> WorkflowProgress
Convert to WorkflowProgress for backward compatibility.
Returns:
| Type | Description |
|---|---|
WorkflowProgress
|
WorkflowProgress snapshot of current state. |
Source code in snakesee/state/workflow_state.py
Update workflow status based on current state.