agent-crdt-synchronizer
The agent-crdt-synchronizer is a Claude Code skill that implements Conflict-free Replicated Data Types for distributed systems requiring eventually consistent state synchronization across multiple nodes. Use this skill when building applications that need automatic conflict resolution, delta synchronization, causal consistency tracking, and deterministic merging of concurrent updates from replicated data structures like counters, sets, and registers without manual reconciliation logic.
git clone --depth 1 https://github.com/ruvnet/ruflo /tmp/agent-crdt-synchronizer && cp -r /tmp/agent-crdt-synchronizer/.agents/skills/agent-crdt-synchronizer ~/.claude/skills/agent-crdt-synchronizerSKILL.md
---
name: crdt-synchronizer
type: synchronizer
color: "#4CAF50"
description: Implements Conflict-free Replicated Data Types for eventually consistent state synchronization
capabilities:
- state_based_crdts
- operation_based_crdts
- delta_synchronization
- conflict_resolution
- causal_consistency
priority: high
hooks:
pre: |
echo "🔄 CRDT Synchronizer syncing: $TASK"
# Initialize CRDT state tracking
if [[ "$TASK" == *"synchronization"* ]]; then
echo "📊 Preparing delta state computation"
fi
post: |
echo "🎯 CRDT synchronization complete"
# Verify eventual consistency
echo "✅ Validating conflict-free state convergence"
---
# CRDT Synchronizer
Implements Conflict-free Replicated Data Types for eventually consistent distributed state synchronization.
## Core Responsibilities
1. **CRDT Implementation**: Deploy state-based and operation-based conflict-free data types
2. **Data Structure Management**: Handle counters, sets, registers, and composite structures
3. **Delta Synchronization**: Implement efficient incremental state updates
4. **Conflict Resolution**: Ensure deterministic conflict-free merge operations
5. **Causal Consistency**: Maintain proper ordering of causally related operations
## Technical Implementation
### Base CRDT Framework
```javascript
class CRDTSynchronizer {
constructor(nodeId, replicationGroup) {
this.nodeId = nodeId;
this.replicationGroup = replicationGroup;
this.crdtInstances = new Map();
this.vectorClock = new VectorClock(nodeId);
this.deltaBuffer = new Map();
this.syncScheduler = new SyncScheduler();
this.causalTracker = new CausalTracker();
}
// Register CRDT instance
registerCRDT(name, crdtType, initialState = null) {
const crdt = this.createCRDTInstance(crdtType, initialState);
this.crdtInstances.set(name, crdt);
// Subscribe to CRDT changes for delta tracking
crdt.onUpdate((delta) => {
this.trackDelta(name, delta);
});
return crdt;
}
// Create specific CRDT instance
createCRDTInstance(type, initialState) {
switch (type) {
case 'G_COUNTER':
return new GCounter(this.nodeId, this.replicationGroup, initialState);
case 'PN_COUNTER':
return new PNCounter(this.nodeId, this.replicationGroup, initialState);
case 'OR_SET':
return new ORSet(this.nodeId, initialState);
case 'LWW_REGISTER':
return new LWWRegister(this.nodeId, initialState);
case 'OR_MAP':
return new ORMap(this.nodeId, this.replicationGroup, initialState);
case 'RGA':
return new RGA(this.nodeId, initialState);
default:
throw new Error(`Unknown CRDT type: ${type}`);
}
}
// Synchronize with peer nodes
async synchronize(peerNodes = null) {
const targets = peerNodes || Array.from(this.replicationGroup);
for (const peer of targets) {
if (peer !== this.nodeId) {
await this.synchronizeWithPeer(peer);
}
}
}
async synchronizeWithPeer(peerNode) {
// Get current state and deltas
const localState = this.getCurrentState();
const deltas = this.getDeltasSince(peerNode);
// Send sync request
const syncRequest = {
type: 'CRDT_SYNC_REQUEST',
sender: this.nodeId,
vectorClock: this.vectorClock.clone(),
state: localState,
deltas: deltas
};
try {
const response = await this.sendSyncRequest(peerNode, syncRequest);
await this.processSyncResponse(response);
} catch (error) {
console.error(`Sync failed with ${peerNode}:`, error);
}
}
}
```
### G-Counter Implementation
```javascript
class GCounter {
constructor(nodeId, replicationGroup, initialState = null) {
this.nodeId = nodeId;
this.replicationGroup = replicationGroup;
this.payload = new Map();
// Initialize counters for all nodes
for (const node of replicationGroup) {
this.payload.set(node, 0);
}
if (initialState) {
this.merge(initialState);
}
this.updateCallbacks = [];
}
// Increment operation (can only be performed by owner node)
increment(amount = 1) {
if (amount < 0) {
throw new Error('G-Counter only supports positive increments');
}
const oldValue = this.payload.get(this.nodeId) || 0;
const newValue = oldValue + amount;
this.payload.set(this.nodeId, newValue);
// Notify observers
this.notifyUpdate({
type: 'INCREMENT',
node: this.nodeId,
oldValue: oldValue,
newValue: newValue,
delta: amount
});
return newValue;
}
// Get current value (sum of all node counters)
value() {
return Array.from(this.payload.values()).reduce((sum, val) => sum + val, 0);
}
// Merge with another G-Counter state
merge(otherState) {
let changed = false;
for (const [node, otherValue] of otherState.payload) {
const currentValue = this.payload.get(node) || 0;
if (otherValue > currentValue) {
this.payload.set(node, otherValue);
changed = true;
}
}
if (changed) {
this.notifyUpdate({
type: 'MERGE',
mergedFrom: otherState
});
}
}
// Compare with another state
compare(otherState) {
for (const [node, otherValue] of otherState.payload) {
const currentValue = this.payload.get(node) || 0;
if (currentValue < otherValue) {
return 'LESS_THAN';
} else if (currentValue > otherValue) {
return 'GREATER_THAN';
}
}
return 'EQUAL';
}
// Clone current state
clone() {
const newCounter = new GCounter(this.nodeId, this.replicationGroup);
newCounter.payload = new Map(this.payload);
return newCounter;
}
onUpdate(callback) {
this.updateCallbacks.push(callback);
}
notifyUpdate(delta) {
this.updateCallbacks.forEach(callback => callback(delta));
}
}
```
### OR-Set Implementation
```javascriptAgent skill for adaptive-coordinator - invoke with $agent-adaptive-coordinator
Agent skill for agent - invoke with $agent-agent
Agent skill for agentic-payments - invoke with $agent-agentic-payments
Agent skill for analyze-code-quality - invoke with $agent-analyze-code-quality
Agent skill for app-store - invoke with $agent-app-store
Agent skill for arch-system-design - invoke with $agent-arch-system-design
Agent skill for architecture - invoke with $agent-architecture
Agent skill for authentication - invoke with $agent-authentication