using-message-queues
This skill provides a comprehensive guide for implementing asynchronous communication patterns using message brokers and task queues. It covers broker selection (Kafka for event streaming, RabbitMQ for complex routing, NATS for cloud-native systems, Redis Streams, and workflow orchestrators like Temporal), performance comparisons, and use cases ranging from background job processing to event-driven architecture and service decoupling.
git clone --depth 1 https://github.com/ancoleman/ai-design-components /tmp/using-message-queues && cp -r /tmp/using-message-queues/skills/using-message-queues ~/.claude/skills/using-message-queuesSKILL.md
# Message Queues
Implement asynchronous communication patterns for event-driven architectures, background job processing, and service decoupling.
## When to Use This Skill
Use message queues when:
- **Long-running operations** block HTTP requests (report generation, video processing)
- **Service decoupling** required (microservices, event-driven architecture)
- **Guaranteed delivery** needed (payment processing, order fulfillment)
- **Event streaming** for analytics (log aggregation, metrics pipelines)
- **Workflow orchestration** for complex processes (multi-step sagas, human-in-the-loop)
- **Background job processing** (email sending, image resizing)
## Broker Selection Decision Tree
Choose message broker based on primary need:
### Event Streaming / Log Aggregation
**→ Apache Kafka**
- Throughput: 500K-1M msg/s
- Replay events (event sourcing)
- Exactly-once semantics
- Long-term retention
- Use: Analytics pipelines, CQRS, event sourcing
### Simple Background Jobs
**→ Task Queues**
- **Python** → Celery + Redis
- **TypeScript** → BullMQ + Redis
- **Go** → Asynq + Redis
- Use: Email sending, report generation, webhooks
### Complex Workflows / Sagas
**→ Temporal**
- Durable execution (survives restarts)
- Saga pattern support
- Human-in-the-loop workflows
- Use: Order processing, AI agent orchestration
### Request-Reply / RPC Patterns
**→ NATS**
- Built-in request-reply
- Sub-millisecond latency
- Cloud-native, simple operations
- Use: Microservices RPC, IoT command/control
### Complex Message Routing
**→ RabbitMQ**
- Exchanges (direct, topic, fanout, headers)
- Dead letter exchanges
- Message TTL, priorities
- Use: Multi-consumer patterns, pub/sub
### Already Using Redis
**→ Redis Streams**
- No new infrastructure
- Simple consumer groups
- Moderate throughput (100K+ msg/s)
- Use: Notification queues, simple job queues
## Performance Comparison
| Broker | Throughput | Latency (p99) | Best For |
|--------|-----------|---------------|----------|
| **Kafka** | 500K-1M msg/s | 10-50ms | Event streaming |
| **NATS JetStream** | 200K-400K msg/s | Sub-ms to 5ms | Cloud-native microservices |
| **RabbitMQ** | 50K-100K msg/s | 5-20ms | Task queues, complex routing |
| **Redis Streams** | 100K+ msg/s | Sub-ms | Simple queues, caching |
## Quick Start Examples
### Kafka Producer/Consumer (Python)
See `examples/kafka-python/` for working code.
```python
from confluent_kafka import Producer, Consumer
# Producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('orders', key='order_123', value='{"status": "created"}')
producer.flush()
# Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])
while True:
msg = consumer.poll(1.0)
if msg is not None:
process_order(msg.value())
```
### Celery Background Jobs (Python)
See `examples/celery-image-processing/` for full implementation.
```python
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379')
@app.task(bind=True, max_retries=3)
def process_image(self, image_url: str):
try:
result = expensive_image_processing(image_url)
return result
except RecoverableError as e:
raise self.retry(exc=e, countdown=60)
```
### BullMQ Job Processing (TypeScript)
See `examples/bullmq-webhook-processor/` for full implementation.
```typescript
import { Queue, Worker } from 'bullmq'
const queue = new Queue('webhooks', {
connection: { host: 'localhost', port: 6379 }
})
// Enqueue job
await queue.add('send-webhook', {
url: 'https://example.com/webhook',
payload: { event: 'order.created' }
})
// Process jobs
const worker = new Worker('webhooks', async job => {
await fetch(job.data.url, {
method: 'POST',
body: JSON.stringify(job.data.payload)
})
}, { connection: { host: 'localhost', port: 6379 } })
```
### Temporal Workflow Orchestration
See `examples/temporal-order-saga/` for saga pattern implementation.
```python
from temporalio import workflow, activity
from datetime import timedelta
@workflow.defn
class OrderSagaWorkflow:
@workflow.run
async def run(self, order_id: str) -> str:
# Step 1: Reserve inventory
inventory_id = await workflow.execute_activity(
reserve_inventory,
order_id,
start_to_close_timeout=timedelta(seconds=10),
)
# Step 2: Charge payment
payment_id = await workflow.execute_activity(
charge_payment,
order_id,
start_to_close_timeout=timedelta(seconds=30),
)
return f"Order {order_id} completed"
```
## Core Patterns
### Event Naming Convention
Use: `Domain.Entity.Action.Version`
Examples:
- `order.created.v1`
- `user.profile.updated.v2`
- `payment.failed.v1`
### Event Schema Structure
```json
{
"event_type": "order.created.v2",
"event_id": "uuid-here",
"timestamp": "2025-12-02T10:00:00Z",
"version": "2.0",
"data": {
"order_id": "ord_123",
"customer_id": "cus_456"
},
"metadata": {
"producer": "order-service",
"trace_id": "abc123",
"correlation_id": "xyz789"
}
}
```
### Dead Letter Queue Pattern
Route failed messages to dead letter queue (DLQ) after max retries:
```python
@app.task(bind=True, max_retries=3)
def process_order(self, order_id: str):
try:
result = perform_processing(order_id)
return result
except UnrecoverableError as e:
send_to_dlq(order_id, str(e))
raise Reject(e, requeue=False)
```
### Idempotency for Exactly-Once Processing
```python
@app.post("/process")
async def process_payment(
payment_data: dict,
idempotency_key: str = Header(None)
):
# Check if already processed
cached_result = redis_client.get(f"idempotency:{idempotency_key}")
if cached_result:
return {"status": "already_processed"}
result = process_payment_logic(payment_datManage Linux systems covering systemd services, process management, filesystems, networking, performance tuning, and troubleshooting. Use when deploying applications, optimizing server performance, diagnosing production issues, or managing users and security on Linux servers.
Data pipelines, feature stores, and embedding generation for AI/ML systems. Use when building RAG pipelines, ML feature serving, or data transformations. Covers feature stores (Feast, Tecton), embedding pipelines, chunking strategies, orchestration (Dagster, Prefect, Airflow), dbt transformations, data versioning (LakeFS), and experiment tracking (MLflow, W&B).
Strategic guidance for designing modern data platforms, covering storage paradigms (data lake, warehouse, lakehouse), modeling approaches (dimensional, normalized, data vault, wide tables), data mesh principles, and medallion architecture patterns. Use when architecting data platforms, choosing between centralized vs decentralized patterns, selecting table formats (Iceberg, Delta Lake), or designing data governance frameworks.
Design cloud network architectures with VPC patterns, subnet strategies, zero trust principles, and hybrid connectivity. Use when planning VPC topology, implementing multi-cloud networking, or establishing secure network segmentation for cloud workloads.
Design comprehensive security architectures using defense-in-depth, zero trust principles, threat modeling (STRIDE, PASTA), and control frameworks (NIST CSF, CIS Controls, ISO 27001). Use when designing security for new systems, auditing existing architectures, or establishing security governance programs.
Assembles component outputs from AI Design Components skills into unified, production-ready component systems with validated token integration, proper import chains, and framework-specific scaffolding. Use as the capstone skill after running theming, layout, dashboard, data-viz, or feedback skills to wire components into working React/Next.js, Python, or Rust projects.
Builds AI chat interfaces and conversational UI with streaming responses, context management, and multi-modal support. Use when creating ChatGPT-style interfaces, AI assistants, code copilots, or conversational agents. Handles streaming text, token limits, regeneration, feedback loops, tool usage visualization, and AI-specific error patterns. Provides battle-tested components from leading AI products with accessibility and performance built in.
Constructs secure, efficient CI/CD pipelines with supply chain security (SLSA), monorepo optimization, caching strategies, and parallelization patterns for GitHub Actions, GitLab CI, and Argo Workflows. Use when setting up automated testing, building, or deployment workflows.