Skip to main content
ClaudeWave
Skill82 repo starsupdated 3d ago

neo4j-kafka-skill

Configure and operate the Neo4j Connector for Kafka (sink + source) and the

Install in Claude Code
Copy
git clone --depth 1 https://github.com/neo4j-contrib/neo4j-skills /tmp/neo4j-kafka-skill && cp -r /tmp/neo4j-kafka-skill/neo4j-kafka-skill ~/.claude/skills/neo4j-kafka-skill
Then start a new Claude Code session; the skill loads automatically.

SKILL.md

# Neo4j Kafka Skill

## When to Use

- Writing Kafka events into Neo4j (sink connector — Cypher, Pattern, CDC, CUD strategies)
- Streaming Neo4j changes to Kafka topics (source connector — CDC or query-based)
- Querying Neo4j change events natively without Kafka (`db.cdc.query`)
- Configuring Confluent Cloud managed Neo4j sink connector
- Setting up schema registry (Avro/JSON Schema) for typed Kafka messages
- Enabling exactly-once semantics or dead-letter queue on sink

## When NOT to Use

- **Cypher query authoring** → `neo4j-cypher-skill`
- **Bulk CSV/JSON file import** → `neo4j-import-skill`
- **GDS algorithms** → `neo4j-gds-skill`
- **Live app write patterns** → `neo4j-cypher-skill`

---

## Decision Table — Which connector strategy?

| Use case | Strategy |
|---|---|
| Custom transformation of Kafka payload → graph | Sink: **Cypher** |
| Mirror another Neo4j CDC source | Sink: **CDC** (schema or source-id sub-strategy) |
| Map Kafka JSON fields to graph nodes/rels with no code | Sink: **Pattern** |
| Consume pre-formatted CUD JSON messages | Sink: **CUD** |
| Stream all Neo4j changes to Kafka (real-time) | Source: **CDC** (Neo4j 5.13+ EE/Aura BC/VDC) |
| Stream specific query results on a schedule | Source: **Query** |
| Consume CDC events in-process, no Kafka | **Native CDC API** (`db.cdc.query`) |

---

## Prerequisites

- Neo4j Connector for Kafka ≥ 5.0 (download from [neo4j.com/labs/kafka](https://neo4j.com/labs/kafka/) or Confluent Hub)
- Kafka Connect ≥ 3.x or Confluent Platform ≥ 7.x
- For CDC source/sink: Neo4j 5.13+ Enterprise Edition, AuraDB Business Critical, or AuraDB VDC
- For query source: any Neo4j edition
- Java 11+

---

## Core Connection Config (all connectors)

```json
{
  "neo4j.uri": "neo4j+s://your-instance.databases.neo4j.io:7687",
  "neo4j.authentication.type": "BASIC",
  "neo4j.authentication.basic.username": "neo4j",
  "neo4j.authentication.basic.password": "${file:/opt/secrets.properties:neo4j.password}",
  "neo4j.database": "neo4j"
}
```

Authentication types: `BASIC` | `BEARER` | `KERBEROS` | `CUSTOM` | `NONE`

Never hardcode passwords — use Kafka Connect secrets provider (`${file:...}` or `${env:...}`).

---

## Sink Connector

### Strategy 1 — Cypher

Connector auto-prepends `UNWIND $events AS __value` — write query using `__value`:

```json
{
  "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
  "topics": "person-creates,person-updates",
  "neo4j.uri": "neo4j+s://...",
  "neo4j.authentication.type": "BASIC",
  "neo4j.authentication.basic.username": "neo4j",
  "neo4j.authentication.basic.password": "secret",
  "neo4j.cypher.topic.person-creates":
    "MERGE (p:Person {id: __value.id}) SET p += __value.properties",
  "neo4j.cypher.topic.person-updates":
    "MATCH (p:Person {id: __value.id}) SET p += __value.properties",
  "neo4j.cypher.bind-value-as": "__value",
  "neo4j.cypher.bind-key-as": "__key",
  "neo4j.cypher.bind-header-as": "__header"
}
```

MERGE pattern — idempotent upsert:
```cypher
MERGE (p:Person {id: __value.id})
ON CREATE SET p.createdAt = datetime(), p += __value.properties
ON MATCH  SET p.updatedAt = datetime(), p += __value.properties
```

### Strategy 2 — Pattern

No Cypher needed — map message fields to graph via pattern syntax:

```json
{
  "neo4j.pattern.topic.users": "(:User{!userId, name, email})",
  "neo4j.pattern.topic.friendships":
    "(:User{!userId: from.userId})-[:KNOWS{since}]->(:User{!userId: to.userId})"
}
```

Pattern rules:
- `!prop` = key property (used for MERGE)
- `prop: field.path` = map from nested message field
- `*` = map all message fields
- `-prop` = exclude property (cannot mix with inclusions)

### Strategy 3 — CDC (mirror another Neo4j)

```json
{
  "neo4j.cdc.schema.topics": "neo4j-cdc-events"
}
```

Or with source-id tracking (stores elementId as property):
```json
{
  "neo4j.cdc.source-id.topics": "neo4j-cdc-events",
  "neo4j.cdc.source-id.label-name": "SourceEvent",
  "neo4j.cdc.source-id.property-name": "sourceId"
}
```

### Exactly-Once Semantics (EOS)

Requires: connector ≥ 5.3.0, Kafka broker EOS support, and a NODE KEY constraint.

Step 1 — Create constraint:
```cypher
CREATE CONSTRAINT kafka_offset_key IF NOT EXISTS
FOR (n:__KafkaOffset)
REQUIRE (n.strategy, n.topic, n.partition) IS NODE KEY;
```

Step 2 — Add to connector config:
```json
{
  "neo4j.eos-offset-label": "__KafkaOffset"
}
```

Without EOS: connector provides at-least-once — write idempotent Cypher (MERGE, not CREATE).

### Error Handling / DLQ

```json
{
  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "errors.deadletterqueue.topic.name": "neo4j-dlq",
  "errors.deadletterqueue.context.headers.enable": "true",
  "errors.deadletterqueue.topic.replication.factor": "3"
}
```

`errors.tolerance=none` (default) — stops on first error. Use `all` + DLQ for production.

---

## Source Connector

### CDC-Based Source (recommended, Neo4j 5.13+)

```json
{
  "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
  "neo4j.uri": "neo4j+s://...",
  "neo4j.authentication.type": "BASIC",
  "neo4j.authentication.basic.username": "neo4j",
  "neo4j.authentication.basic.password": "secret",
  "neo4j.source-strategy": "CDC",
  "neo4j.start-from": "NOW",
  "neo4j.cdc.poll-interval": "1s",
  "neo4j.cdc.poll-duration": "5s",
  "neo4j.cdc.topic.person-creates.patterns.0.pattern": "(:Person)",
  "neo4j.cdc.topic.person-creates.patterns.0.operation": "CREATE",
  "neo4j.cdc.topic.person-updates.patterns.0.pattern": "(:Person)",
  "neo4j.cdc.topic.person-updates.patterns.0.operation": "UPDATE",
  "neo4j.cdc.topic.person-deletes.patterns.0.pattern": "(:Person)",
  "neo4j.cdc.topic.person-deletes.patterns.0.operation": "DELETE"
}
```

`neo4j.start-from` options: `NOW` | `EARLIEST` | a specific cursor string

Multiple patterns per topic — indexed 0, 1, 2...:
```json
{
  "neo4j.cdc.topic.all-changes.patterns.0.pattern": "(:Person)",
  "neo4j.cdc.topic.all-changes.patterns.1
neo4j-agent-memory-skillSkill

Authoritative reference for the neo4j-agent-memory Python package — a graph-native memory system for AI agents built on Neo4j — and for the hosted service (NAMS) at memory.neo4jlabs.com. Use this skill whenever the user mentions neo4j-agent-memory, agent memory with Neo4j, context graphs, the POLE+O model, MemoryClient/MemorySettings, the memory MCP server, or any of the framework integrations (LangChain, PydanticAI, CrewAI, AWS Strands, Google ADK, Microsoft Agent Framework, OpenAI Agents, LlamaIndex). Also use when the user mentions the hosted service at memory.neo4jlabs.com, NAMS, the Neo4j Agent Memory Service, the `nams_` API key prefix, or the hosted MCP endpoint. Also use when writing documentation, blog posts, tutorials, PRDs, or code samples for the project, when comparing agent memory approaches, or when positioning graph-native memory against vector-only approaches — even if the user doesn't explicitly name the package.

neo4j-aura-agent-skillSkill

Manages Neo4j Aura Agents via the v2beta1 REST API — create, list, get, update, delete,

neo4j-aura-graph-analytics-skillSkill

Serverless Aura Graph Analytics (AGA) GDS Sessions — covers GdsSessions,

neo4j-aura-provisioning-skillSkill

Provisions and manages Neo4j Aura instances via CLI (aura-cli v1.7+) or REST API.

neo4j-cli-tools-skillSkill

Use when working with Neo4j command-line tools — neo4j-cli (modern unified

neo4j-cypher-skillSkill

Generates, optimizes, and validates Cypher 25 queries for Neo4j 2025.x and 2026.x.

neo4j-document-import-skillSkill

Ingests unstructured and semi-structured documents into Neo4j as a knowledge graph.

neo4j-driver-dotnet-skillSkill

Neo4j .NET Driver v6 — IDriver lifecycle, DI registration (singleton), ExecutableQuery