Skip to main content

Go SDK

The official TinyMQ Go client (client/client.go) wraps the HTTP API with idiomatic Go patterns: variadic options, long-polling, exponential backoff, and automatic re-queuing on failure.


Installation

go get github.com/x-name15/tinymq/client

Your go.mod will have zero indirect dependencies — TinyMQ's client uses only the Go standard library.


Creating a Client

import "github.com/x-name15/tinymq/client"

mq := client.NewClient("http://localhost:7800")

The client internally creates an http.Client with a 35-second timeout — longer than the default 30-second long-polling window to avoid premature timeouts.


Publishing Messages

Basic Publish

payload := []byte(`{"event": "user_signup", "id": 99}`)

if err := mq.Publish("users.new", payload); err != nil {
log.Fatalf("Failed to publish: %v", err)
}

Publish with Options (PublishOptions)

PublishOptions is the unified struct for all advanced routing parameters:

type PublishOptions struct {
TTL time.Duration // Time-To-Live
Delay time.Duration // Delivery delay
Broadcast bool // Fan-out to all waiting consumers
}
// With TTL — message expires in 5 minutes
mq.Publish("notifications", payload, client.PublishOptions{
TTL: 5 * time.Minute,
})

// With Delay — deliver in 30 seconds
mq.Publish("tasks.scheduled", payload, client.PublishOptions{
Delay: 30 * time.Second,
})

// Broadcast — fan-out to all waiting consumers, ephemeral
mq.Publish("cache.invalidate", payload, client.PublishOptions{
Broadcast: true,
})

// Combined TTL + Delay
mq.Publish("reminders", payload, client.PublishOptions{
Delay: 1 * time.Minute,
TTL: 10 * time.Minute,
})

Broadcast Shorthand

mq.PublishBroadcast("events.global", payload)

Consuming Messages (Subscribe / Workers)

Subscribe is a blocking, long-running function designed to be run in a goroutine. It implements:

  • Long-polling with configurable timeout
  • Automatic ACK on successful handler execution
  • Automatic re-queue on handler error
  • Exponential backoff from 1s to 32s on repeated failures
go mq.Subscribe(
"orders", // Topic to consume from
client.SubscriptionOptions{Timeout: "10s"}, // Long-poll for 10s per attempt
func(msg message.Message) error {
// Process the message here
// Return nil = success (ACK is sent automatically)
// Return error = failure (re-queue is triggered)
return processOrder(msg)
},
)

Worker Lifecycle

Re-queue and DLQ Integration

When the handler returns an error:

  1. The SDK ACKs the message (removes it from the broker's current in-flight state)
  2. Calls POST /requeue with the message body (increments retry_count)
  3. The broker checks: if retry_count >= 3, moves the message to {topic}.dlq
  4. The SDK sleeps for the current backoff duration before retrying
go mq.Subscribe("orders", client.SubscriptionOptions{Timeout: "10s"}, func(msg message.Message) error {
payload := string(msg.Payload)

if err := database.Save(payload); err != nil {
// This triggers re-queue + backoff
// After 3 total failures: message moves to orders.dlq
return fmt.Errorf("database save failed: %w", err)
}

return nil // ACK sent, message removed from broker
})

Backoff Schedule

AttemptWait Before Retry
1st failure1 second
2nd failure2 seconds
3rd failure4 seconds
4th failure8 seconds
5th failure16 seconds
6th+ failure32 seconds (capped)

Complete Worker Example

package main

import (
"encoding/json"
"fmt"
"log"
"time"

"github.com/x-name15/tinymq/client"
"github.com/x-name15/tinymq/internal/message"
)

type OrderPayload struct {
UserID int `json:"user_id"`
Item string `json:"item"`
Amount float64 `json:"amount"`
}

func processOrder(msg message.Message) error {
var order OrderPayload
if err := json.Unmarshal(msg.Payload, &order); err != nil {
// Malformed JSON — will eventually go to DLQ
return fmt.Errorf("invalid payload: %w", err)
}

log.Printf("Processing order for user %d: %s (%.2f)\n",
order.UserID, order.Item, order.Amount)

// Simulate processing work
time.Sleep(100 * time.Millisecond)

log.Printf("Order %s completed successfully\n", msg.ID)
return nil
}

func main() {
mq := client.NewClient("http://localhost:7800")

log.Println("Starting order worker...")

// Publish a test message
mq.Publish("orders", []byte(`{"user_id": 42, "item": "laptop", "amount": 999.99}`))

// Start the worker — this blocks forever
go mq.Subscribe("orders", client.SubscriptionOptions{Timeout: "10s"}, processOrder)

// Keep the process alive
select {}
}

Publisher Example with PublishOptions

package main

import (
"log"
"time"

"github.com/x-name15/tinymq/client"
)

func main() {
mq := client.NewClient("http://localhost:7800")

// Standard publish
mq.Publish("jobs.email", []byte(`{"to": "[email protected]"}`))

// Delayed job — run in 1 minute
mq.Publish("jobs.reminder", []byte(`{"type": "standup"}`), client.PublishOptions{
Delay: 1 * time.Minute,
})

// Expiring notification — relevant only for 30 seconds
mq.Publish("notifications", []byte(`{"msg": "Flash sale live!"}`), client.PublishOptions{
TTL: 30 * time.Second,
})

// Broadcast cache invalidation to all active workers
mq.PublishBroadcast("cache.products", []byte(`{"action": "flush_all"}`))

log.Println("All messages published")
}

SubscriptionOptions Reference

type SubscriptionOptions struct {
Timeout string // Long-polling duration string, e.g. "5s", "10s", "30s"
}

The timeout determines how long GET /consume/{topic}?timeout={timeout} holds the connection open when the queue is empty. Setting "" defaults to "5s".


API Reference

NewClient(baseURL string) *Client

Creates a new TinyMQ client pointing at the given broker URL.

Publish(topic string, payload []byte, opts ...PublishOptions) error

Publishes a payload to the given topic. Returns an error if the broker returns a non-2xx status (e.g., HTTP 413 for oversized payload, HTTP 429 for queue capacity exceeded).

PublishBroadcast(topic string, payload []byte) error

Shorthand for Publish with PublishOptions{Broadcast: true}.

Subscribe(topic string, options SubscriptionOptions, handler MessageHandler)

Starts a blocking consumer loop. Never returns unless the process exits. Must be run in a goroutine.

MessageHandler signature:

type MessageHandler func(msg message.Message) error