Skip to main content
ClaudeWave
Skill374 repo starsupdated 6mo ago

using-message-queues

This skill provides a comprehensive guide for implementing asynchronous communication patterns using message brokers and task queues. It covers broker selection (Kafka for event streaming, RabbitMQ for complex routing, NATS for cloud-native systems, Redis Streams, and workflow orchestrators like Temporal), performance comparisons, and use cases ranging from background job processing to event-driven architecture and service decoupling.

Install in Claude Code
Copy
git clone --depth 1 https://github.com/ancoleman/ai-design-components /tmp/using-message-queues && cp -r /tmp/using-message-queues/skills/using-message-queues ~/.claude/skills/using-message-queues
Then start a new Claude Code session; the skill loads automatically.

SKILL.md

# Message Queues

Implement asynchronous communication patterns for event-driven architectures, background job processing, and service decoupling.

## When to Use This Skill

Use message queues when:
- **Long-running operations** block HTTP requests (report generation, video processing)
- **Service decoupling** required (microservices, event-driven architecture)
- **Guaranteed delivery** needed (payment processing, order fulfillment)
- **Event streaming** for analytics (log aggregation, metrics pipelines)
- **Workflow orchestration** for complex processes (multi-step sagas, human-in-the-loop)
- **Background job processing** (email sending, image resizing)

## Broker Selection Decision Tree

Choose message broker based on primary need:

### Event Streaming / Log Aggregation
**→ Apache Kafka**
- Throughput: 500K-1M msg/s
- Replay events (event sourcing)
- Exactly-once semantics
- Long-term retention
- Use: Analytics pipelines, CQRS, event sourcing

### Simple Background Jobs
**→ Task Queues**
- **Python** → Celery + Redis
- **TypeScript** → BullMQ + Redis
- **Go** → Asynq + Redis
- Use: Email sending, report generation, webhooks

### Complex Workflows / Sagas
**→ Temporal**
- Durable execution (survives restarts)
- Saga pattern support
- Human-in-the-loop workflows
- Use: Order processing, AI agent orchestration

### Request-Reply / RPC Patterns
**→ NATS**
- Built-in request-reply
- Sub-millisecond latency
- Cloud-native, simple operations
- Use: Microservices RPC, IoT command/control

### Complex Message Routing
**→ RabbitMQ**
- Exchanges (direct, topic, fanout, headers)
- Dead letter exchanges
- Message TTL, priorities
- Use: Multi-consumer patterns, pub/sub

### Already Using Redis
**→ Redis Streams**
- No new infrastructure
- Simple consumer groups
- Moderate throughput (100K+ msg/s)
- Use: Notification queues, simple job queues

## Performance Comparison

| Broker | Throughput | Latency (p99) | Best For |
|--------|-----------|---------------|----------|
| **Kafka** | 500K-1M msg/s | 10-50ms | Event streaming |
| **NATS JetStream** | 200K-400K msg/s | Sub-ms to 5ms | Cloud-native microservices |
| **RabbitMQ** | 50K-100K msg/s | 5-20ms | Task queues, complex routing |
| **Redis Streams** | 100K+ msg/s | Sub-ms | Simple queues, caching |

## Quick Start Examples

### Kafka Producer/Consumer (Python)
See `examples/kafka-python/` for working code.

```python
from confluent_kafka import Producer, Consumer

# Producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('orders', key='order_123', value='{"status": "created"}')
producer.flush()

# Consumer
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processors',
    'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])

while True:
    msg = consumer.poll(1.0)
    if msg is not None:
        process_order(msg.value())
```

### Celery Background Jobs (Python)
See `examples/celery-image-processing/` for full implementation.

```python
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379')

@app.task(bind=True, max_retries=3)
def process_image(self, image_url: str):
    try:
        result = expensive_image_processing(image_url)
        return result
    except RecoverableError as e:
        raise self.retry(exc=e, countdown=60)
```

### BullMQ Job Processing (TypeScript)
See `examples/bullmq-webhook-processor/` for full implementation.

```typescript
import { Queue, Worker } from 'bullmq'

const queue = new Queue('webhooks', {
  connection: { host: 'localhost', port: 6379 }
})

// Enqueue job
await queue.add('send-webhook', {
  url: 'https://example.com/webhook',
  payload: { event: 'order.created' }
})

// Process jobs
const worker = new Worker('webhooks', async job => {
  await fetch(job.data.url, {
    method: 'POST',
    body: JSON.stringify(job.data.payload)
  })
}, { connection: { host: 'localhost', port: 6379 } })
```

### Temporal Workflow Orchestration
See `examples/temporal-order-saga/` for saga pattern implementation.

```python
from temporalio import workflow, activity
from datetime import timedelta

@workflow.defn
class OrderSagaWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> str:
        # Step 1: Reserve inventory
        inventory_id = await workflow.execute_activity(
            reserve_inventory,
            order_id,
            start_to_close_timeout=timedelta(seconds=10),
        )

        # Step 2: Charge payment
        payment_id = await workflow.execute_activity(
            charge_payment,
            order_id,
            start_to_close_timeout=timedelta(seconds=30),
        )

        return f"Order {order_id} completed"
```

## Core Patterns

### Event Naming Convention
Use: `Domain.Entity.Action.Version`

Examples:
- `order.created.v1`
- `user.profile.updated.v2`
- `payment.failed.v1`

### Event Schema Structure
```json
{
  "event_type": "order.created.v2",
  "event_id": "uuid-here",
  "timestamp": "2025-12-02T10:00:00Z",
  "version": "2.0",
  "data": {
    "order_id": "ord_123",
    "customer_id": "cus_456"
  },
  "metadata": {
    "producer": "order-service",
    "trace_id": "abc123",
    "correlation_id": "xyz789"
  }
}
```

### Dead Letter Queue Pattern
Route failed messages to dead letter queue (DLQ) after max retries:

```python
@app.task(bind=True, max_retries=3)
def process_order(self, order_id: str):
    try:
        result = perform_processing(order_id)
        return result
    except UnrecoverableError as e:
        send_to_dlq(order_id, str(e))
        raise Reject(e, requeue=False)
```

### Idempotency for Exactly-Once Processing
```python
@app.post("/process")
async def process_payment(
    payment_data: dict,
    idempotency_key: str = Header(None)
):
    # Check if already processed
    cached_result = redis_client.get(f"idempotency:{idempotency_key}")
    if cached_result:
        return {"status": "already_processed"}

    result = process_payment_logic(payment_dat
administering-linuxSkill

Manage Linux systems covering systemd services, process management, filesystems, networking, performance tuning, and troubleshooting. Use when deploying applications, optimizing server performance, diagnosing production issues, or managing users and security on Linux servers.

ai-data-engineeringSkill

Data pipelines, feature stores, and embedding generation for AI/ML systems. Use when building RAG pipelines, ML feature serving, or data transformations. Covers feature stores (Feast, Tecton), embedding pipelines, chunking strategies, orchestration (Dagster, Prefect, Airflow), dbt transformations, data versioning (LakeFS), and experiment tracking (MLflow, W&B).

architecting-dataSkill

Strategic guidance for designing modern data platforms, covering storage paradigms (data lake, warehouse, lakehouse), modeling approaches (dimensional, normalized, data vault, wide tables), data mesh principles, and medallion architecture patterns. Use when architecting data platforms, choosing between centralized vs decentralized patterns, selecting table formats (Iceberg, Delta Lake), or designing data governance frameworks.

architecting-networksSkill

Design cloud network architectures with VPC patterns, subnet strategies, zero trust principles, and hybrid connectivity. Use when planning VPC topology, implementing multi-cloud networking, or establishing secure network segmentation for cloud workloads.

architecting-securitySkill

Design comprehensive security architectures using defense-in-depth, zero trust principles, threat modeling (STRIDE, PASTA), and control frameworks (NIST CSF, CIS Controls, ISO 27001). Use when designing security for new systems, auditing existing architectures, or establishing security governance programs.

assembling-componentsSkill

Assembles component outputs from AI Design Components skills into unified, production-ready component systems with validated token integration, proper import chains, and framework-specific scaffolding. Use as the capstone skill after running theming, layout, dashboard, data-viz, or feedback skills to wire components into working React/Next.js, Python, or Rust projects.

building-ai-chatSkill

Builds AI chat interfaces and conversational UI with streaming responses, context management, and multi-modal support. Use when creating ChatGPT-style interfaces, AI assistants, code copilots, or conversational agents. Handles streaming text, token limits, regeneration, feedback loops, tool usage visualization, and AI-specific error patterns. Provides battle-tested components from leading AI products with accessibility and performance built in.

building-ci-pipelinesSkill

Constructs secure, efficient CI/CD pipelines with supply chain security (SLSA), monorepo optimization, caching strategies, and parallelization patterns for GitHub Actions, GitLab CI, and Argo Workflows. Use when setting up automated testing, building, or deployment workflows.