Examples
Examples
Complete code examples for common use cases.
Basic Job Processing
Simple fire-and-forget job processing.
package main
import (
"context"
"fmt"
"time"
jobs "github.com/jdziat/simple-durable-jobs"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func main() {
// Setup
db, _ := gorm.Open(sqlite.Open("jobs.db"), &gorm.Config{})
storage := jobs.NewGormStorage(db)
storage.Migrate(context.Background())
queue := jobs.New(storage)
// Register handler
queue.Register("send-email", func(ctx context.Context, args EmailArgs) error {
fmt.Printf("Sending email to %s: %s\n", args.To, args.Subject)
return nil
})
// Enqueue jobs
ctx := context.Background()
queue.Enqueue(ctx, "send-email", EmailArgs{To: "user@example.com", Subject: "Hello"})
queue.Enqueue(ctx, "send-email", EmailArgs{To: "admin@example.com", Subject: "Report"},
jobs.Priority(100)) // High priority
// Start worker
worker := queue.NewWorker()
worker.Start(ctx)
}
type EmailArgs struct {
To string `json:"to"`
Subject string `json:"subject"`
}Durable Workflows
Multi-step workflows with automatic checkpointing and crash recovery.
package main
import (
"context"
"fmt"
jobs "github.com/jdziat/simple-durable-jobs"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func main() {
db, _ := gorm.Open(sqlite.Open("workflow.db"), &gorm.Config{})
storage := jobs.NewGormStorage(db)
storage.Migrate(context.Background())
queue := jobs.New(storage)
// Register step handlers
queue.Register("validate-order", func(ctx context.Context, order Order) (Order, error) {
fmt.Println("Validating order...")
order.Status = "validated"
return order, nil
})
queue.Register("charge-payment", func(ctx context.Context, order Order) (string, error) {
fmt.Println("Charging payment...")
return "receipt-123", nil
})
queue.Register("ship-order", func(ctx context.Context, receipt string) error {
fmt.Println("Shipping order...")
return nil
})
// Register workflow
queue.Register("process-order", func(ctx context.Context, order Order) error {
// Each Call is checkpointed. On crash/retry, completed steps
// return cached results without re-executing.
validated, err := jobs.Call[Order](ctx, "validate-order", order)
if err != nil {
return err
}
receipt, err := jobs.Call[string](ctx, "charge-payment", validated)
if err != nil {
return err
}
_, err = jobs.Call[any](ctx, "ship-order", receipt)
return err
})
// Enqueue and process
ctx := context.Background()
queue.Enqueue(ctx, "process-order", Order{ID: "ORD-001", Total: 99.99},
jobs.Retries(5))
worker := queue.NewWorker()
worker.Start(ctx)
}
type Order struct {
ID string `json:"id"`
Total float64 `json:"total"`
Status string `json:"status"`
}Fan-Out/Fan-In
Process items in parallel using sub-jobs, then aggregate results.
package main
import (
"context"
"fmt"
jobs "github.com/jdziat/simple-durable-jobs"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func main() {
db, _ := gorm.Open(sqlite.Open("fanout.db"), &gorm.Config{})
storage := jobs.NewGormStorage(db)
storage.Migrate(context.Background())
queue := jobs.New(storage)
// Register sub-job handler
queue.Register("process-image", func(ctx context.Context, img Image) (Result, error) {
fmt.Printf("Processing image: %s\n", img.URL)
return Result{ImageID: img.ID, Thumbnail: img.URL + "/thumb"}, nil
})
// Register parent job that fans out
queue.Register("batch-process-images", func(ctx context.Context, images []Image) error {
// Create sub-jobs for each image
subJobs := make([]jobs.SubJob, len(images))
for i, img := range images {
subJobs[i] = jobs.Sub("process-image", img)
}
// Fan-out: spawn all sub-jobs in parallel, wait for results
results, err := jobs.FanOut[Result](ctx, subJobs,
jobs.FailFast(), // Stop on first failure
jobs.WithFanOutQueue("batch"), // Run on batch queue
jobs.WithFanOutRetries(3), // Retry failed sub-jobs
)
if err != nil {
return err
}
// Aggregate successful results
thumbnails := jobs.Values(results)
fmt.Printf("Generated %d thumbnails\n", len(thumbnails))
return nil
})
ctx := context.Background()
queue.Enqueue(ctx, "batch-process-images", []Image{
{ID: "1", URL: "https://example.com/image1.jpg"},
{ID: "2", URL: "https://example.com/image2.jpg"},
{ID: "3", URL: "https://example.com/image3.jpg"},
})
worker := queue.NewWorker(
jobs.WorkerQueue("default", jobs.Concurrency(5)),
jobs.WorkerQueue("batch", jobs.Concurrency(10)),
)
worker.Start(ctx)
}
type Image struct {
ID string `json:"id"`
URL string `json:"url"`
}
type Result struct {
ImageID string `json:"image_id"`
Thumbnail string `json:"thumbnail"`
}Fan-Out Strategies
// FailFast: Stop on first sub-job failure (default)
results, err := jobs.FanOut[T](ctx, subJobs, jobs.FailFast())
// CollectAll: Wait for all sub-jobs, return partial results
results, err := jobs.FanOut[T](ctx, subJobs, jobs.CollectAll())
// Threshold: Succeed if at least 80% of sub-jobs complete
results, err := jobs.FanOut[T](ctx, subJobs, jobs.Threshold(0.8))Working with Results
results, _ := jobs.FanOut[T](ctx, subJobs, jobs.CollectAll())
// Extract all successful values
values := jobs.Values(results)
// Split into successes and failures
successes, failures := jobs.Partition(results)
// Check if all succeeded
if jobs.AllSucceeded(results) {
fmt.Println("All sub-jobs completed successfully")
}
// Access individual results
for _, r := range results {
if r.Err != nil {
fmt.Printf("Sub-job %d failed: %v\n", r.Index, r.Err)
} else {
fmt.Printf("Sub-job %d result: %v\n", r.Index, r.Value)
}
}Scheduled Jobs
Recurring jobs with various schedule types.
package main
import (
"context"
"fmt"
"time"
jobs "github.com/jdziat/simple-durable-jobs"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func main() {
db, _ := gorm.Open(sqlite.Open("scheduled.db"), &gorm.Config{})
storage := jobs.NewGormStorage(db)
storage.Migrate(context.Background())
queue := jobs.New(storage)
// Register handlers
queue.Register("health-check", func(ctx context.Context, _ struct{}) error {
fmt.Printf("[%s] Health check: OK\n", time.Now().Format("15:04:05"))
return nil
})
queue.Register("daily-report", func(ctx context.Context, _ struct{}) error {
fmt.Println("Generating daily report...")
return nil
})
queue.Register("weekly-backup", func(ctx context.Context, _ struct{}) error {
fmt.Println("Running weekly backup...")
return nil
})
// Schedule jobs
queue.Schedule("health-check", jobs.Every(1 * time.Minute))
queue.Schedule("daily-report", jobs.Daily(9, 0)) // 9:00 AM UTC
queue.Schedule("weekly-backup", jobs.Weekly(time.Sunday, 2, 0)) // Sunday 2:00 AM
// Cron expression: every hour at minute 0
queue.Schedule("hourly-task", jobs.Cron("0 * * * *"))
// Start worker with scheduler enabled
worker := queue.NewWorker(jobs.WithScheduler(true))
worker.Start(context.Background())
}Priority Queues
Process high-priority jobs first.
package main
import (
"context"
"fmt"
jobs "github.com/jdziat/simple-durable-jobs"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func main() {
db, _ := gorm.Open(sqlite.Open("priority.db"), &gorm.Config{})
storage := jobs.NewGormStorage(db)
storage.Migrate(context.Background())
queue := jobs.New(storage)
queue.Register("task", func(ctx context.Context, name string) error {
fmt.Printf("Processing: %s\n", name)
return nil
})
ctx := context.Background()
// Enqueue with different priorities
queue.Enqueue(ctx, "task", "low-priority-1", jobs.Priority(1))
queue.Enqueue(ctx, "task", "low-priority-2", jobs.Priority(1))
queue.Enqueue(ctx, "task", "URGENT", jobs.Priority(100)) // Runs first
queue.Enqueue(ctx, "task", "medium", jobs.Priority(50))
queue.Enqueue(ctx, "task", "CRITICAL", jobs.Priority(1000)) // Runs first
// Single worker to demonstrate ordering
worker := queue.NewWorker(jobs.WorkerQueue("default", jobs.Concurrency(1)))
worker.Start(ctx)
}Error Handling
Control retry behavior with custom error types.
package main
import (
"context"
"errors"
"time"
jobs "github.com/jdziat/simple-durable-jobs"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func main() {
db, _ := gorm.Open(sqlite.Open("errors.db"), &gorm.Config{})
storage := jobs.NewGormStorage(db)
storage.Migrate(context.Background())
queue := jobs.New(storage)
queue.Register("api-call", func(ctx context.Context, endpoint string) error {
// Simulate different error scenarios
// Permanent failure - don't retry
if endpoint == "/invalid" {
return jobs.NoRetry(errors.New("invalid endpoint"))
}
// Rate limited - retry after delay
if endpoint == "/rate-limited" {
return jobs.RetryAfter(5*time.Minute, errors.New("rate limited"))
}
// Temporary failure - use default retry with backoff
if endpoint == "/flaky" {
return errors.New("temporary network error")
}
return nil
})
ctx := context.Background()
queue.Enqueue(ctx, "api-call", "/invalid", jobs.Retries(3))
queue.Enqueue(ctx, "api-call", "/rate-limited", jobs.Retries(3))
queue.Enqueue(ctx, "api-call", "/flaky", jobs.Retries(3))
worker := queue.NewWorker()
worker.Start(ctx)
}Observability
Monitor job execution with hooks and events.
package main
import (
"context"
"log"
jobs "github.com/jdziat/simple-durable-jobs"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func main() {
db, _ := gorm.Open(sqlite.Open("observe.db"), &gorm.Config{})
storage := jobs.NewGormStorage(db)
storage.Migrate(context.Background())
queue := jobs.New(storage)
// Register hooks
queue.OnJobStart(func(ctx context.Context, job *jobs.Job) {
log.Printf("[START] %s (%s)", job.ID[:8], job.Type)
})
queue.OnJobComplete(func(ctx context.Context, job *jobs.Job) {
duration := job.CompletedAt.Sub(*job.StartedAt)
log.Printf("[DONE] %s completed in %v", job.ID[:8], duration)
})
queue.OnJobFail(func(ctx context.Context, job *jobs.Job, err error) {
log.Printf("[FAIL] %s: %v", job.ID[:8], err)
})
queue.OnRetry(func(ctx context.Context, job *jobs.Job, attempt int, err error) {
log.Printf("[RETRY] %s attempt %d: %v", job.ID[:8], attempt, err)
})
// Or use event stream for async processing
events := queue.Events()
go func() {
for event := range events {
switch e := event.(type) {
case *jobs.JobStarted:
// Send to metrics system
case *jobs.JobCompleted:
// Update dashboard
case *jobs.JobFailed:
// Alert on-call
}
}
}()
// ... register handlers and start worker
}Distributed Workers
Run multiple workers for horizontal scaling.
package main
import (
"context"
"flag"
"fmt"
jobs "github.com/jdziat/simple-durable-jobs"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
func main() {
workerID := flag.String("id", "worker-1", "Worker ID")
flag.Parse()
// Use PostgreSQL for production distributed systems
dsn := "host=localhost user=app dbname=jobs"
db, _ := gorm.Open(postgres.Open(dsn), &gorm.Config{})
storage := jobs.NewGormStorage(db)
storage.Migrate(context.Background())
queue := jobs.New(storage)
queue.Register("task", func(ctx context.Context, id int) error {
fmt.Printf("[%s] Processing task %d\n", *workerID, id)
return nil
})
// Each worker processes jobs independently
// Jobs are locked to prevent duplicate processing
worker := queue.NewWorker(
jobs.WorkerQueue("default", jobs.Concurrency(5)),
)
fmt.Printf("[%s] Starting...\n", *workerID)
worker.Start(context.Background())
}Run multiple instances:
./app -id worker-1 &
./app -id worker-2 &
./app -id worker-3 &Pause/Resume
Control job execution at the worker, queue, and individual job level.
// Graceful pause: finish running jobs, stop picking new ones
worker.Pause(jobs.PauseModeGraceful)
// Wait for all running jobs to complete (with timeout)
if err := worker.WaitForPause(30 * time.Second); err != nil {
log.Printf("Timeout waiting for jobs: %v", err)
}
worker.Resume()
// Aggressive pause: cancel running jobs immediately
worker.Pause(jobs.PauseModeAggressive)
// Cancel a specific running job
cancelled := worker.CancelJob(jobID)
// Pause/resume at queue level
queue.PauseQueue(ctx, "emails")
queue.ResumeQueue(ctx, "emails")
// Pause/resume individual jobs
queue.PauseJob(ctx, jobID)
queue.ResumeJob(ctx, jobID)Embedded Web UI
Mount a full-featured monitoring dashboard into any Go HTTP server.
import "github.com/jdziat/simple-durable-jobs/ui"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mux := http.NewServeMux()
mux.Handle("/jobs/", http.StripPrefix("/jobs", ui.Handler(storage,
ui.WithQueue(queue), // Enable event streaming and scheduled jobs
ui.WithContext(ctx), // Graceful shutdown for background workers
ui.WithStatsRetention(7 * 24 * time.Hour), // Keep stats for 7 days
ui.WithMiddleware(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Add authentication here
next.ServeHTTP(w, r)
})
}),
)))
log.Fatal(http.ListenAndServe(":8080", mux))Connection Pool Configuration
Tune database connection pooling for your workload.
// Use a preset for high-concurrency workloads
storage, err := jobs.NewGormStorageWithPool(db, jobs.HighConcurrencyPoolConfig())
// Or customize individual pool settings
storage, err := jobs.NewGormStorageWithPool(db,
jobs.MaxOpenConns(50),
jobs.MaxIdleConns(20),
jobs.ConnMaxLifetime(10 * time.Minute),
jobs.ConnMaxIdleTime(2 * time.Minute),
)More Examples
Find complete runnable examples in the examples directory.