Go SDK
The official TinyMQ Go client (client/client.go) wraps the HTTP API with idiomatic Go patterns: variadic options, long-polling, exponential backoff, and automatic re-queuing on failure.
Installation
go get github.com/x-name15/tinymq/client
Your go.mod will have zero indirect dependencies — TinyMQ's client uses only the Go standard library.
Creating a Client
import "github.com/x-name15/tinymq/client"
mq := client.NewClient("http://localhost:7800")
The client internally creates an http.Client with a 35-second timeout — longer than the default 30-second long-polling window to avoid premature timeouts.
Publishing Messages
Basic Publish
payload := []byte(`{"event": "user_signup", "id": 99}`)
if err := mq.Publish("users.new", payload); err != nil {
log.Fatalf("Failed to publish: %v", err)
}
Publish with Options (PublishOptions)
PublishOptions is the unified struct for all advanced routing parameters:
type PublishOptions struct {
TTL time.Duration // Time-To-Live
Delay time.Duration // Delivery delay
Broadcast bool // Fan-out to all waiting consumers
}
// With TTL — message expires in 5 minutes
mq.Publish("notifications", payload, client.PublishOptions{
TTL: 5 * time.Minute,
})
// With Delay — deliver in 30 seconds
mq.Publish("tasks.scheduled", payload, client.PublishOptions{
Delay: 30 * time.Second,
})
// Broadcast — fan-out to all waiting consumers, ephemeral
mq.Publish("cache.invalidate", payload, client.PublishOptions{
Broadcast: true,
})
// Combined TTL + Delay
mq.Publish("reminders", payload, client.PublishOptions{
Delay: 1 * time.Minute,
TTL: 10 * time.Minute,
})
Broadcast Shorthand
mq.PublishBroadcast("events.global", payload)
Consuming Messages (Subscribe / Workers)
Subscribe is a blocking, long-running function designed to be run in a goroutine. It implements:
- Long-polling with configurable timeout
- Automatic ACK on successful handler execution
- Automatic re-queue on handler error
- Exponential backoff from 1s to 32s on repeated failures
go mq.Subscribe(
"orders", // Topic to consume from
client.SubscriptionOptions{Timeout: "10s"}, // Long-poll for 10s per attempt
func(msg message.Message) error {
// Process the message here
// Return nil = success (ACK is sent automatically)
// Return error = failure (re-queue is triggered)
return processOrder(msg)
},
)
Worker Lifecycle
Re-queue and DLQ Integration
When the handler returns an error:
- The SDK ACKs the message (removes it from the broker's current in-flight state)
- Calls
POST /requeuewith the message body (incrementsretry_count) - The broker checks: if
retry_count >= 3, moves the message to{topic}.dlq - The SDK sleeps for the current backoff duration before retrying
go mq.Subscribe("orders", client.SubscriptionOptions{Timeout: "10s"}, func(msg message.Message) error {
payload := string(msg.Payload)
if err := database.Save(payload); err != nil {
// This triggers re-queue + backoff
// After 3 total failures: message moves to orders.dlq
return fmt.Errorf("database save failed: %w", err)
}
return nil // ACK sent, message removed from broker
})
Backoff Schedule
| Attempt | Wait Before Retry |
|---|---|
| 1st failure | 1 second |
| 2nd failure | 2 seconds |
| 3rd failure | 4 seconds |
| 4th failure | 8 seconds |
| 5th failure | 16 seconds |
| 6th+ failure | 32 seconds (capped) |
Complete Worker Example
package main
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/x-name15/tinymq/client"
"github.com/x-name15/tinymq/internal/message"
)
type OrderPayload struct {
UserID int `json:"user_id"`
Item string `json:"item"`
Amount float64 `json:"amount"`
}
func processOrder(msg message.Message) error {
var order OrderPayload
if err := json.Unmarshal(msg.Payload, &order); err != nil {
// Malformed JSON — will eventually go to DLQ
return fmt.Errorf("invalid payload: %w", err)
}
log.Printf("Processing order for user %d: %s (%.2f)\n",
order.UserID, order.Item, order.Amount)
// Simulate processing work
time.Sleep(100 * time.Millisecond)
log.Printf("Order %s completed successfully\n", msg.ID)
return nil
}
func main() {
mq := client.NewClient("http://localhost:7800")
log.Println("Starting order worker...")
// Publish a test message
mq.Publish("orders", []byte(`{"user_id": 42, "item": "laptop", "amount": 999.99}`))
// Start the worker — this blocks forever
go mq.Subscribe("orders", client.SubscriptionOptions{Timeout: "10s"}, processOrder)
// Keep the process alive
select {}
}
Publisher Example with PublishOptions
package main
import (
"log"
"time"
"github.com/x-name15/tinymq/client"
)
func main() {
mq := client.NewClient("http://localhost:7800")
// Standard publish
// Delayed job — run in 1 minute
mq.Publish("jobs.reminder", []byte(`{"type": "standup"}`), client.PublishOptions{
Delay: 1 * time.Minute,
})
// Expiring notification — relevant only for 30 seconds
mq.Publish("notifications", []byte(`{"msg": "Flash sale live!"}`), client.PublishOptions{
TTL: 30 * time.Second,
})
// Broadcast cache invalidation to all active workers
mq.PublishBroadcast("cache.products", []byte(`{"action": "flush_all"}`))
log.Println("All messages published")
}
SubscriptionOptions Reference
type SubscriptionOptions struct {
Timeout string // Long-polling duration string, e.g. "5s", "10s", "30s"
}
The timeout determines how long GET /consume/{topic}?timeout={timeout} holds the connection open when the queue is empty. Setting "" defaults to "5s".
API Reference
NewClient(baseURL string) *Client
Creates a new TinyMQ client pointing at the given broker URL.
Publish(topic string, payload []byte, opts ...PublishOptions) error
Publishes a payload to the given topic. Returns an error if the broker returns a non-2xx status (e.g., HTTP 413 for oversized payload, HTTP 429 for queue capacity exceeded).
PublishBroadcast(topic string, payload []byte) error
Shorthand for Publish with PublishOptions{Broadcast: true}.
Subscribe(topic string, options SubscriptionOptions, handler MessageHandler)
Starts a blocking consumer loop. Never returns unless the process exits. Must be run in a goroutine.
MessageHandler signature:
type MessageHandler func(msg message.Message) error
Related Sections
- REST API → — Use TinyMQ from any language via raw HTTP
- Dead Letter Queues → — Understand what happens after 3 failures
- Enterprise Features → — TTL, Delay, Broadcast, and Batching