Skip to main content

Tasks queueing

Goffee uses Asynq package to offer an advanced queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable.

Highlevel overview of how Asynq works:

  • Client puts tasks on a queue
  • Server pulls tasks off queues and starts a worker goroutine for each task
  • Tasks are processed concurrently by multiple workers

Task queues are used as a mechanism to distribute work across multiple machines. A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.

Task Queue Diagram

Capabilities

The queue system exposes the full Asynq client API through c.GetQueueClient(), which returns an *asynq.Client. This gives you access to all of Asynq's task scheduling features.

Schedule a task immediately

client := c.GetQueueClient()
task := asynq.NewTask("email:welcome", payload)
info, err := client.Enqueue(task)

Schedule a task with a delay

Use asynq.ProcessIn to run a task at a specific time in the future:

client := c.GetQueueClient()
task := asynq.NewTask("email:reminder", payload)

// 1 minute from now
info, err := client.Enqueue(task, asynq.ProcessIn(1*time.Minute))

// 24 hours from now
info, err := client.Enqueue(task, asynq.ProcessIn(24*time.Hour))

Schedule a task at a specific time

Use asynq.ProcessAt to run a task at an exact time:

client := c.GetQueueClient()
task := asynq.NewTask("email:reminder", payload)

// Run at a specific date/time
futureTime := time.Date(2025, 12, 25, 10, 0, 0, 0, time.UTC)
info, err := client.Enqueue(task, asynq.ProcessAt(futureTime))

Assign a task to a specific queue

If you have configured multiple queues with different priorities, you can assign tasks to a specific queue using asynq.Queue:

client := c.GetQueueClient()
task := asynq.NewTask("email:reminder", payload)

// Send to the "critical" queue (highest priority)
info, err := client.Enqueue(task, asynq.Queue("critical"))

// Send to the "low" queue (lowest priority)
info, err := client.Enqueue(task, asynq.Queue("low"))

Set max retry count

Control how many times a failed task is retried:

client := c.GetQueueClient()
task := asynq.NewTask("email:reminder", payload)

// Retry up to 5 times on failure (default is 25)
info, err := client.Enqueue(task, asynq.MaxRetry(5))

// No retries
info, err := client.Enqueue(task, asynq.MaxRetry(0))

Set a timeout for task processing

Control how long a worker can process a task before it's considered failed:

client := c.GetQueueClient()
task := asynq.NewTask("email:reminder", payload)

// 30 second timeout
info, err := client.Enqueue(task, asynq.Timeout(30*time.Second))

// No timeout
info, err := client.Enqueue(task, asynq.Timeout(0))

Set a unique task ID

Prevent duplicate tasks by assigning a unique ID — if a task with the same ID already exists in the queue, it will not be enqueued again:

client := c.GetQueueClient()
task := asynq.NewTask("email:welcome", payload)

info, err := client.Enqueue(task, asynq.TaskID(fmt.Sprintf("welcome:%d", userID)))

Combine multiple options

All options can be combined freely:

client := c.GetQueueClient()
task := asynq.NewTask("email:reminder", payload)

info, err := client.Enqueue(task,
asynq.ProcessIn(1*time.Hour),
asynq.Queue("critical"),
asynq.MaxRetry(3),
asynq.Timeout(10*time.Second),
)

Logging in workers

The queue system integrates with Goffee's logging system. Inside a controller, use c.GetLogger():

c.GetLogger().Info("Task enqueued successfully")
c.GetLogger().Error("Failed to enqueue task: " + err.Error())

Inside worker handlers (which run in a separate goroutine without access to c.GetLogger()), use the logger directly from the package:

import "git.smarteching.com/goffee/core/logger"

logger.ResolveLogger().Info(fmt.Sprintf("Processing task for user %d", p.UserID))

Task retry and error handling

If a worker function returns an error, Asynq will automatically retry the task based on the retry count and backoff strategy:

func HandleEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
// Return error → task will be retried
return fmt.Errorf("failed to unmarshal payload: %w", err)
}

err := sendEmail(p.UserID)
if err != nil {
// Return nil to mark as processed without retry, but log the error
logger.ResolveLogger().Error(fmt.Sprintf("failed to send email to user %d: %v", p.UserID, err))
return nil
}

return nil
}

Queue priorities

The queue server is configured with priority-based queue management. Each queue has a weight — higher weight means more workers will pick up tasks from that queue:

// config/queue.go
Queues: map[string]int{
"critical": 6, // processed most frequently
"default": 3, // normal priority
"low": 1, // processed least frequently
},

This means tasks in the "critical" queue will be processed approximately 6 times more often than tasks in the "low" queue.