neo4j-spark-skill
Use when reading from or writing to Neo4j with Apache Spark or Databricks using the
git clone --depth 1 https://github.com/neo4j-contrib/neo4j-skills /tmp/neo4j-spark-skill && cp -r /tmp/neo4j-spark-skill/neo4j-spark-skill ~/.claude/skills/neo4j-spark-skillSKILL.md
# Neo4j Connector for Apache Spark
## When to Use
- Reading Neo4j nodes/relationships into Spark DataFrames
- Writing Spark DataFrames to Neo4j as nodes or relationships
- Databricks notebooks connecting to Neo4j
- Delta Lake → Neo4j ingestion pipelines
- Partitioned parallel reads from large Neo4j graphs
## When NOT to Use
- **Python bolt driver / execute_query** → `neo4j-driver-python-skill`
- **Cypher query writing** → `neo4j-cypher-skill`
- **GDS graph algorithms** → `neo4j-gds-skill`
- **Spring Boot + Neo4j** → `neo4j-spring-data-skill`
---
## Version Matrix
| Connector | Spark | Scala | Databricks Runtime | Neo4j |
|-----------|-------|-------|--------------------|-------|
| 5.4.x | 3.3, 3.4, 3.5 | 2.12, 2.13 | 12.2, 13.3, 14.3 LTS | 4.4, 5.x, 2025.x |
Maven artifact (Scala 2.12, Spark 3):
```
org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3
```
Scala 2.13 variant:
```
org.neo4j:neo4j-connector-apache-spark_2.13:5.4.2_for_spark_3
```
---
## Setup
### Standalone Spark (PySpark)
```python
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("neo4j-app")
.config("spark.jars.packages",
"org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
.config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", "neo4j")
.config("neo4j.authentication.basic.password", "password")
.getOrCreate())
```
### Standalone Spark (Scala)
```scala
val spark = SparkSession.builder
.appName("neo4j-app")
.config("spark.jars.packages",
"org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
.config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", "neo4j")
.config("neo4j.authentication.basic.password", "password")
.getOrCreate()
```
### Databricks — Cluster Installation
1. Cluster → **Libraries** → **Install New** → **Maven**
2. Search: `org.neo4j:neo4j-connector-apache-spark_2.12` — match Scala version to runtime
3. Cluster → **Advanced Options** → **Spark** tab — add config:
```
neo4j.url neo4j+s://xxxx.databases.neo4j.io
neo4j.authentication.type basic
neo4j.authentication.basic.username {{secrets/neo4j/username}}
neo4j.authentication.basic.password {{secrets/neo4j/password}}
```
4. Use **Single user** access mode (Unity Catalog shared mode not supported)
### Databricks — Secrets (preferred over plaintext)
```python
# Store credentials once:
# databricks secrets create-scope --scope neo4j
# databricks secrets put --scope neo4j --key url
# databricks secrets put --scope neo4j --key username
# databricks secrets put --scope neo4j --key password
neo4j_url = dbutils.secrets.get(scope="neo4j", key="url")
neo4j_user = dbutils.secrets.get(scope="neo4j", key="username")
neo4j_pass = dbutils.secrets.get(scope="neo4j", key="password")
spark.conf.set("neo4j.url", neo4j_url)
spark.conf.set("neo4j.authentication.type", "basic")
spark.conf.set("neo4j.authentication.basic.username", neo4j_user)
spark.conf.set("neo4j.authentication.basic.password", neo4j_pass)
```
---
## Key Configuration Options
| Option | Description | Default |
|--------|-------------|---------|
| `neo4j.url` | Bolt/Neo4j URI | — (required) |
| `neo4j.authentication.type` | `none`, `basic`, `kerberos`, `bearer` | `basic` |
| `neo4j.authentication.basic.username` | Username | driver default |
| `neo4j.authentication.basic.password` | Password | driver default |
| `neo4j.authentication.bearer.token` | Bearer token | — |
| `neo4j.database` | Target database | driver default |
| `neo4j.access.mode` | `read` or `write` | `read` |
| `neo4j.encryption.enabled` | TLS (ignored with `+s`/`+ssc` URI) | `false` |
---
## Reading from Neo4j
Three mutually exclusive read modes — use exactly one per `.read()` call.
### Label scan (nodes)
```python
# PySpark
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("labels", ":Person")
.load())
df.printSchema()
df.show()
```
```scala
// Scala
val df = spark.read
.format("org.neo4j.spark.DataSource")
.option("labels", ":Person")
.load()
```
Multi-label filter (AND): `.option("labels", ":Person:Employee")`
Result includes `<id>` (internal Neo4j id) and `<labels>` columns.
### Cypher query read
```python
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (p:Person)-[:ACTED_IN]->(m:Movie) RETURN p.name AS actor, m.title AS movie, m.year AS year")
.load())
```
Use explicit RETURN aliases — they become DataFrame column names. No `SKIP`/`LIMIT` in query (connector handles pagination).
### Relationship scan
```python
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("relationship", "BOUGHT")
.option("relationship.source.labels", ":Customer")
.option("relationship.target.labels", ":Product")
.load())
```
Result columns: `<rel.id>`, `<rel.type>`, `<source.*>`, `<target.*>`, plus relationship properties.
### Read partition tuning
```python
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("labels", ":Transaction")
.option("partitions", "10") # parallel partitions (default: 1)
.option("batch.size", "5000") # rows per partition batch (default: 5000)
.option("schema.flatten.limit", "100") # rows sampled for schema inference
.load())
```
Full read options reference: [references/read-patterns.md](references/read-patterns.md)
---
## Writing to Neo4j
### SaveMode
| SaveMode | Cypher | Requires |
|----------|--------|----------|
| `Append` | `CREATE` | nothing extra |
| `Overwrite` | `MERGE` | `node.keys` (nodes) or `*.node.keys` (rels) |
| `ErrorIfExists` | `CREATE` + error if exists | — |
Always create uniqueness constraints on `node.keys` properties before writing in `Overwrite` mode.
### Write nodes — Append (CREATE)
```python
from pyspark.sql import Row
pAuthoritative reference for the neo4j-agent-memory Python package — a graph-native memory system for AI agents built on Neo4j — and for the hosted service (NAMS) at memory.neo4jlabs.com. Use this skill whenever the user mentions neo4j-agent-memory, agent memory with Neo4j, context graphs, the POLE+O model, MemoryClient/MemorySettings, the memory MCP server, or any of the framework integrations (LangChain, PydanticAI, CrewAI, AWS Strands, Google ADK, Microsoft Agent Framework, OpenAI Agents, LlamaIndex). Also use when the user mentions the hosted service at memory.neo4jlabs.com, NAMS, the Neo4j Agent Memory Service, the `nams_` API key prefix, or the hosted MCP endpoint. Also use when writing documentation, blog posts, tutorials, PRDs, or code samples for the project, when comparing agent memory approaches, or when positioning graph-native memory against vector-only approaches — even if the user doesn't explicitly name the package.
Manages Neo4j Aura Agents via the v2beta1 REST API — create, list, get, update, delete,
Serverless Aura Graph Analytics (AGA) GDS Sessions — covers GdsSessions,
Provisions and manages Neo4j Aura instances via CLI (aura-cli v1.7+) or REST API.
Use when working with Neo4j command-line tools — neo4j-cli (modern unified
Generates, optimizes, and validates Cypher 25 queries for Neo4j 2025.x and 2026.x.
Ingests unstructured and semi-structured documents into Neo4j as a knowledge graph.
Neo4j .NET Driver v6 — IDriver lifecycle, DI registration (singleton), ExecutableQuery