Skip to main content
ClaudeWave
Skill2.1k repo starsupdated 3d ago

golang-samber-ro

The golang-samber-ro skill enables Go engineers to build declarative, type-safe reactive pipelines using samber/ro, a ReactiveX implementation with 150+ operators, five subject types, and 40+ plugins for managing asynchronous event streams. Apply this skill when importing github.com/samber/ro or designing real-time data processing, pub/sub systems, and event-driven architectures that require automatic backpressure, error propagation, and Go context integration instead of manual goroutine and channel wiring.

Install in Claude Code
Copy
git clone --depth 1 https://github.com/samber/cc-skills-golang /tmp/golang-samber-ro && cp -r /tmp/golang-samber-ro/skills/golang-samber-ro ~/.claude/skills/golang-samber-ro
Then start a new Claude Code session; the skill loads automatically.

SKILL.md

**Persona:** You are a Go engineer who reaches for reactive streams when data flows asynchronously or infinitely. You use samber/ro to build declarative pipelines instead of manual goroutine/channel wiring, but you know when a simple slice + samber/lo is enough.

**Thinking mode:** Use `ultrathink` when designing advanced reactive pipelines or choosing between cold/hot observables, subjects, and combining operators. Wrong architecture leads to resource leaks or missed events.

# samber/ro — Reactive Streams for Go

Go implementation of [ReactiveX](https://reactivex.io/). Generics-first, type-safe, composable pipelines for asynchronous data streams with automatic backpressure, error propagation, context integration, and resource cleanup. 150+ operators, 5 subject types, 40+ plugins.

**Official Resources:**

- [github.com/samber/ro](https://github.com/samber/ro)
- [ro.samber.dev](https://ro.samber.dev)
- [pkg.go.dev/github.com/samber/ro](https://pkg.go.dev/github.com/samber/ro)

This skill is not exhaustive. Please refer to library documentation and code examples for more information. Context7 can help as a discoverability platform.

## Why samber/ro (Streams vs Slices)

Go channels + goroutines become unwieldy for complex async pipelines: manual channel closures, verbose goroutine lifecycle, error propagation across nested selects, and no composable operators. `samber/ro` solves this with declarative, chainable stream operators.

**When to use which tool:**

| Scenario | Tool | Why |
| --- | --- | --- |
| Transform a slice (map, filter, reduce) | `samber/lo` | Finite, synchronous, eager — no stream overhead needed |
| Simple goroutine fan-out with error handling | `errgroup` | Standard lib, lightweight, sufficient for bounded concurrency |
| Infinite event stream (WebSocket, tickers, file watcher) | `samber/ro` | Declarative pipeline with backpressure, retry, timeout, combine |
| Real-time data enrichment from multiple async sources | `samber/ro` | CombineLatest/Zip compose dependent streams without manual select |
| Pub/sub with multiple consumers sharing one source | `samber/ro` | Hot observables (Share/Subjects) handle multicast natively |

**Key differences: lo vs ro**

| Aspect | `samber/lo` | `samber/ro` |
| --- | --- | --- |
| Data | Finite slices | Infinite streams |
| Execution | Synchronous, blocking | Asynchronous, non-blocking |
| Evaluation | Eager (allocates intermediate slices) | Lazy (processes items as they arrive) |
| Timing | Immediate | Time-aware (delay, throttle, interval, timeout) |
| Error model | Return `(T, error)` per call | Error channel propagates through pipeline |
| Use case | Collection transforms | Event-driven, real-time, async pipelines |

## Installation

```bash
go get github.com/samber/ro
```

## Core Concepts

Four building blocks:

1. **Observable** — a data source that emits values over time. Cold by default: each subscriber triggers independent execution from scratch
2. **Observer** — a consumer with three callbacks: `onNext(T)`, `onError(error)`, `onComplete()`
3. **Operator** — a function that transforms an observable into another observable, chained via `Pipe`
4. **Subscription** — the connection between observable and observer. Call `.Wait()` to block or `.Unsubscribe()` to cancel

```go
observable := ro.Pipe2(
    ro.RangeWithInterval(0, 5, 1*time.Second),
    ro.Filter(func(x int) bool { return x%2 == 0 }),
    ro.Map(func(x int) string { return fmt.Sprintf("even-%d", x) }),
)

observable.Subscribe(ro.NewObserver(
    func(s string) { fmt.Println(s) },      // onNext
    func(err error) { log.Println(err) },    // onError
    func() { fmt.Println("Done!") },         // onComplete
))
// Output: "even-0", "even-2", "even-4", "Done!"

// Or collect synchronously:
values, err := ro.Collect(observable)
```

## Cold vs Hot Observables

**Cold** (default): each `.Subscribe()` starts a new independent execution. Safe and predictable — use by default.

**Hot**: multiple subscribers share a single execution. Use when the source is expensive (WebSocket, DB poll) or subscribers must see the same events.

| Convert with | Behavior |
| --- | --- |
| `Share()` | Cold → hot with reference counting. Last unsubscribe tears down |
| `ShareReplay(n)` | Same as Share + buffers last N values for late subscribers |
| `Connectable()` | Cold → hot, but waits for explicit `.Connect()` call |
| Subjects | Natively hot — call `.Send()`, `.Error()`, `.Complete()` directly |

| Subject | Constructor | Replay behavior |
| --- | --- | --- |
| `PublishSubject` | `NewPublishSubject[T]()` | None — late subscribers miss past events |
| `BehaviorSubject` | `NewBehaviorSubject[T](initial)` | Replays last value to new subscribers |
| `ReplaySubject` | `NewReplaySubject[T](bufferSize)` | Replays last N values |
| `AsyncSubject` | `NewAsyncSubject[T]()` | Emits only last value, only on complete |
| `UnicastSubject` | `NewUnicastSubject[T](bufferSize)` | Single subscriber only |

For subject details and hot observable patterns, see [Subjects Guide](./references/subjects-guide.md).

## Operator Quick Reference

| Category | Key operators | Purpose |
| --- | --- | --- |
| Creation | `Just`, `FromSlice`, `FromChannel`, `Range`, `Interval`, `Defer`, `Future` | Create observables from various sources |
| Transform | `Map`, `MapErr`, `FlatMap`, `Scan`, `Reduce`, `GroupBy` | Transform or accumulate stream values |
| Filter | `Filter`, `Take`, `TakeLast`, `Skip`, `Distinct`, `Find`, `First`, `Last` | Selectively emit values |
| Combine | `Merge`, `Concat`, `Zip2`–`Zip6`, `CombineLatest2`–`CombineLatest5`, `Race` | Merge multiple observables |
| Error | `Catch`, `OnErrorReturn`, `OnErrorResumeNextWith`, `Retry`, `RetryWithConfig` | Recover from errors |
| Timing | `Delay`, `DelayEach`, `Timeout`, `ThrottleTime`, `SampleTime`, `BufferWithTime` | Control emission timing |
| Side effect | `Tap`/`Do`, `TapOnNext`, `TapOnError`, `TapOnComplete` | Observe without altering stream |
| Terminal | `
golang-benchmarkSkill

Golang benchmarking, profiling, and performance measurement. Use when writing, running, or comparing Go benchmarks, profiling hot paths with pprof, interpreting CPU/memory/trace profiles, analyzing results with benchstat, setting up CI benchmark regression detection, or investigating production performance with Prometheus runtime metrics. Also use when the developer needs deep analysis on a specific performance indicator - this skill provides the measurement methodology, while `samber/cc-skills-golang@golang-performance` provides the optimization patterns.

golang-cliSkill

Golang CLI application development. Use when building, modifying, or reviewing a Go CLI tool — especially for command structure, flag handling, configuration layering, version embedding, exit codes, I/O patterns, signal handling, shell completion, argument validation, and CLI unit testing. Also triggers when code uses cobra, viper, or urfave/cli. For cobra-specific APIs → See `samber/cc-skills-golang@golang-spf13-cobra` skill; for viper configuration layering → See `samber/cc-skills-golang@golang-spf13-viper` skill.

golang-code-styleSkill

Golang code style conventions — line length and breaking, variable declarations, control flow clarity, when comments help vs hurt. Use when writing or reviewing Go code, asking about style or clarity, or establishing project coding standards. Not for naming conventions (→ See `samber/cc-skills-golang@golang-naming` skill), linter configuration (→ See `samber/cc-skills-golang@golang-lint` skill), or doc comments (→ See `samber/cc-skills-golang@golang-documentation` skill).

golang-concurrencySkill

Golang concurrency patterns. Use when writing or reviewing concurrent Go code involving goroutines, channels, select, locks, sync primitives, errgroup, singleflight, worker pools, or fan-out/fan-in pipelines. Also triggers when you detect goroutine leaks, race conditions, channel ownership issues, or need to choose between channels and mutexes.

golang-contextSkill

Idiomatic context.Context usage in Golang — propagation through API boundaries, cancellation, timeouts and deadlines, request-scoped values, context.WithoutCancel for background work outliving requests. Apply when designing context propagation across layers, debugging leaked or unexpired contexts, choosing between context.Background/TODO/WithoutCancel, or storing values in context. Not for code that merely accepts ctx as first parameter.

golang-continuous-integrationSkill

CI/CD pipeline configuration using GitHub Actions for Golang projects — testing, linting, SAST, security scanning, code coverage, Dependabot, Renovate, GoReleaser, code review automation, and release pipelines. Use when setting up or improving Go project CI, configuring GitHub Actions workflows, adding linters or security scanners, automating dependency updates, or adding quality gates.

golang-data-structuresSkill

Golang data structures — slices (internals, capacity growth, preallocation, slices package), maps (internals, hash buckets, maps package), arrays, container/list/heap/ring, strings.Builder vs bytes.Buffer, generic collections, pointers (unsafe.Pointer, weak.Pointer), and copy semantics. Use when choosing or optimizing Go data structures, implementing generic containers, using container/ packages, unsafe or weak pointers, or questioning slice/map internals.

golang-databaseSkill

Comprehensive guide for Go database access — parameterized queries, struct scanning, NULLable columns, transactions, isolation levels, SELECT FOR UPDATE, connection pool, batch processing, context propagation, and migration tooling. Use when writing, reviewing, or debugging Golang code that interacts with PostgreSQL, MariaDB, MySQL, or SQLite; for database testing; or for questions about database/sql, sqlx, or pgx. Does NOT generate database schemas or migration SQL.