upstash-workflow
This skill provides a three-layer architecture guide for building Upstash Workflow async tasks in LobeHub, combining dry-run mode for preview operations, fan-out pattern for splitting large batches into parallel chunks to avoid step limits, and single-item execution for idempotent processing. Use it when implementing scalable asynchronous workflows that handle pagination, rate limiting, and batch processing across multiple items.
git clone --depth 1 https://github.com/lobehub/lobehub /tmp/upstash-workflow && cp -r /tmp/upstash-workflow/.agents/skills/upstash-workflow ~/.claude/skills/upstash-workflowSKILL.md
# Upstash Workflow Implementation Guide
Standard patterns for implementing Upstash Workflow + QStash async workflows in the LobeHub codebase.
## 🎯 The Three Core Patterns
Every workflow in LobeHub combines these three patterns. They exist because the platform constrains you in three ways: rate limits make blind fan-out dangerous, step limits cap a single workflow's size, and idempotency demands that retries don't double-process.
1. **🔍 Dry-Run Mode** — get statistics without triggering actual execution
2. **🌟 Fan-Out Pattern** — split large batches into smaller chunks for parallel processing
3. **🎯 Single Task Execution** — each workflow execution processes **exactly ONE item**
---
## Architecture Overview
All workflows follow the same 3-layer architecture:
```text
Layer 1: Entry Point (process-*)
├─ Validates prerequisites
├─ Calculates total items to process
├─ Filters existing items
├─ Supports dry-run mode (statistics only)
└─ Triggers Layer 2 if work is needed
Layer 2: Pagination (paginate-*)
├─ Handles cursor-based pagination
├─ Implements fan-out for large batches
├─ Recursively processes all pages
└─ Triggers Layer 3 for each item
Layer 3: Single Task Execution (execute-* / generate-*)
└─ Performs actual business logic for ONE item
```
**Real examples in this codebase:** `welcome-placeholder`, `agent-welcome` — see [`references/examples.md`](./references/examples.md).
---
## The Three Patterns in 60 Seconds
### 1. Dry-Run Mode
Short-circuit Layer 1 before any side effects so callers can preview what would happen:
```typescript
if (dryRun) {
return {
...result,
dryRun: true,
message: `[DryRun] Would process ${itemsNeedingProcessing.length} items`,
};
}
```
Use case: check how many items will be processed before committing.
### 2. Fan-Out Pattern
Layer 2 splits oversized batches into chunks and recursively re-triggers itself with each chunk. This avoids hitting workflow step limits when one page contains too many items:
```typescript
const CHUNK_SIZE = 20;
if (itemIds.length > CHUNK_SIZE) {
const chunks = chunk(itemIds, CHUNK_SIZE);
await Promise.all(
chunks.map((ids, idx) =>
context.run(`workflow:fanout:${idx + 1}/${chunks.length}`, () =>
WorkflowClass.triggerPaginateItems({ itemIds: ids }),
),
),
);
}
```
Defaults: `PAGE_SIZE = 50` (items per page), `CHUNK_SIZE = 20` (items per fan-out chunk).
### 3. Single Task Execution
Layer 3 always processes exactly one item per invocation. Parallelism comes from Layer 2 fanning out to many Layer 3 invocations, controlled by `flowControl`:
```typescript
export const { POST } = serve<ExecutePayload>(
async (context) => {
const { itemId } = context.requestPayload ?? {};
if (!itemId) return { success: false, error: 'Missing itemId' };
const item = await context.run('workflow:get-item', () => getItem(itemId));
const result = await context.run('workflow:execute', () => processItem(item));
await context.run('workflow:save', () => saveResult(itemId, result));
return { success: true, itemId, result };
},
{
flowControl: { key: 'workflow.execute', parallelism: 10, ratePerSecond: 5 },
},
);
```
---
## File Structure
```text
src/
├── app/(backend)/api/workflows/
│ └── {workflow-name}/
│ ├── process-{entities}/route.ts # Layer 1
│ ├── paginate-{entities}/route.ts # Layer 2
│ └── execute-{entity}/route.ts # Layer 3
│
└── server/workflows/
└── {workflowName}/
└── index.ts # Workflow class
```
---
## Where to Go Next
Pick the reference that matches what you're doing:
| You want to... | Read |
| ---------------------------------------------------- | ---------------------------------------------------------------- |
| Write the Workflow class + 3 routes from scratch | [`references/implementation.md`](./references/implementation.md) |
| Tune flowControl, error handling, logging, testing | [`references/best-practices.md`](./references/best-practices.md) |
| See two real workflows end-to-end | [`references/examples.md`](./references/examples.md) |
| Deploy on lobehub-cloud (re-exports, cloud-only ops) | [`references/cloud.md`](./references/cloud.md) |
---
## Environment Variables
```bash
# Required for all workflows
APP_URL=https://your-app.com # Base URL for workflow endpoints
QSTASH_TOKEN=qstash_xxx # QStash authentication token
# Optional (for custom QStash URL)
QSTASH_URL=https://custom-qstash.com
```
---
## Checklist for New Workflows
### Planning
- [ ] Identify the entity to process (users, agents, items, …)
- [ ] Define the per-item business logic
- [ ] Determine filtering logic (Redis cache, database state, …)
### Implementation
- [ ] Define payload types with TypeScript interfaces
- [ ] Create workflow class with static trigger methods
- [ ] **Layer 1:** entry point with **dry-run** support
- [ ] **Layer 1:** filtering logic to avoid duplicate work
- [ ] **Layer 2:** pagination with **fan-out**
- [ ] **Layer 3:** **single-task execution** (ONE item per run)
- [ ] Configure appropriate `flowControl` for each layer
- [ ] Consistent logging with workflow prefixes
- [ ] Validate all required payload parameters
- [ ] Unique `context.run()` step names
### Quality & Deployment
- [ ] Return consistent response shapes
- [ ] Configure cloud deployment ([`references/cloud.md`](./references/cloud.md) if on lobehub-cloud)
- [ ] Write integration tests (`dryRun` path + full path)
- [ ] Smoke-test with dry-run first
- [ ] Test with a small batch before full rollout
---
## Additional Resources
- [Upstash Workflow Documentation](https://upstash.com/docs/workflow)
- [QStash Documentation](https://upstash.com/docs/qstash)
- [Example Workflows in Codebase](<../../src/app/(backend)/api/workAdd documentation for a new AI provider — usage docs, env vars, Docker config, image resources.
Add server-side environment variables that control default values for user settings.
Agent runtime lifecycle hooks. Use for before/after tool or step hooks, tool mocks, human intervention, sub-agent calls, context compression, evals, tracing, callAgent, or lifecycle events.
Build or extend LobeHub Agent Signal pipelines. Use for signal sources, signal/action types, policies, middleware, workflow handoff, dedupe, scope behavior, or observability.
Agent tracing CLI for execution snapshots. Use for agent-tracing, traces, snapshots, LLM call inspection, context engine data, agent step analysis, or execution debugging.
Build LobeHub builtin tool packages. Use when adding agent-callable tools, manifests, executors, runtimes, inspectors, renders, placeholders, streaming, interventions, portals, or tool registries.
Build multi-platform chat bots with the chat SDK. Use for Slack, Teams, Google Chat, Discord, GitHub, Linear bots, webhooks, mentions, slash commands, cards, modals, or streaming responses.
>