Queueing setup
Step-by-step instructions for enabling Asynq and queuing tasks in Goffee.
1. Prerequisites
Make sure Redis is running and configured in your .env file:
REDIS_HOST=localhost
REDIS_PORT=6379
2. Enable the queue system
Open config/queue.go and set EnableQueue to true. You can also configure concurrency and queue priorities:
#file: config/queue.go
package config
import "git.smarteching.com/goffee/core"
func GetQueueConfig() core.QueueConfig {
return core.QueueConfig{
// Enable the queue system
EnableQueue: true,
// Number of concurrent workers (goroutines) processing tasks
Concurrency: 10,
// Queue names with priority weights
// Higher number = higher priority (more workers allocated)
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
}
}
3. Define task types and worker handlers
Create your worker handlers in workers/workers.go. Each handler is a function that receives a context.Context and an *asynq.Task:
#file: workers/workers.go
...
// Define task type constants
const (
TypeWelcomeEmail = "email:welcome"
TypeReminderEmail = "email:reminder"
)
// Define a payload struct for your task data
type EmailTaskPayload struct {
UserID int
}
// Worker handler for welcome email tasks
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
var p EmailTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
logger.ResolveLogger().Info(fmt.Sprintf("Sending welcome email to user %d", p.UserID))
// ... actual email sending logic here
return nil
}
...
4. Register workers
Open register-queues.go and register your workers:
#file: register-queues.go
...
func registerQueues() {
var queque = new(core.Queuemux)
queque.QueueInit()
// Register your workers here
queque.AddWork(workers.TypeWelcomeEmail, workers.HandleWelcomeEmailTask)
queque.AddWork(workers.TypeReminderEmail, workers.HandleReminderEmailTask)
// Start the queue server — DO NOT MODIFY THIS LINE
go queque.RunQueueserver(config.GetQueueConfig())
}
...
5. Enqueue tasks from a controller
Use c.GetQueueClient() to get the Asynq client and enqueue tasks, for example:
#file: controllers/queuesample.go
...
func Queuesample(c *core.Context) *core.Response {
client := c.GetQueueClient()
// Create a task payload
payload, err := json.Marshal(workers.EmailTaskPayload{UserID: 42})
if err != nil {
c.GetLogger().Error(err.Error())
return c.Response.SetStatusCode(500).Json(`{"message": "internal error"}`)
}
// Define tasks
t1 := asynq.NewTask(workers.TypeWelcomeEmail, payload)
t2 := asynq.NewTask(workers.TypeReminderEmail, payload)
// Enqueue immediately
info, err := client.Enqueue(t1)
if err != nil {
c.GetLogger().Error(err.Error())
return c.Response.SetStatusCode(500).Json(`{"message": "internal error"}`)
}
c.GetLogger().Info(fmt.Sprintf("Enqueued task: %+v", info))
// Enqueue with a 1-minute delay
for i := 1; i < 3; i++ {
info, err = client.Enqueue(t2, asynq.ProcessIn(1*time.Minute))
if err != nil {
c.GetLogger().Error(err.Error())
return c.Response.SetStatusCode(500).Json(`{"message": "internal error"}`)
}
c.GetLogger().Info(fmt.Sprintf("Enqueued delayed task: %+v", info))
}
return c.Response.Json(`{"message": "Task queued"}`)
}
...
6. Register the route
This is an example; you can queue tasks within your functions and/or controllers.
Add the route in routes.go:
#file: routes.go
router.Get("/queuesample", controllers.Queuesample)
Complete file structure
cup/
├── config/
│ └── queue.go ← Enable queue, set concurrency and priorities
├── controllers/
│ └── queuesample.go ← Sample controller that enqueues tasks
├── workers/
│ └── workers.go ← Task type definitions and worker handlers
└── register-queues.go ← Register workers and start the queue server
Important notes
- The queue server runs in a separate goroutine (
go queque.RunQueueserver(...)) and does not block the main HTTP server. - Tasks are persisted in Redis — if the server restarts, pending tasks are not lost.
- Worker handlers use
logger.ResolveLogger()(notc.GetLogger()) because they run outside the HTTP request context. - Redis connection details (
REDIS_HOST,REDIS_PORT) are read from environment variables automatically.