Skip to main content
ClaudeWave
Skill200 estrellas del repoactualizado 4d ago

vda5050_integration

Bridge a VDA 5050 v3.0.0 fleet-control interface (MQTT/JSON) onto Nav 2 under Clean Architecture — domain entities, MQTT/Nav 2 adapters behind ports, order→NavigateThroughPoses mapping, state aggregation, action handlers. Trigger when the user asks to build or extend a VDA 5050 connector / fleet bridge.

Instalar en Claude Code
Copiar
git clone --depth 1 https://github.com/harunkurtdev/ros2-claude-code-template /tmp/vda5050_integration && cp -r /tmp/vda5050_integration/.claude/skills/vda5050_integration ~/.claude/skills/vda5050_integration
Después abre una sesión nueva de Claude Code; el skill carga automáticamente.

SKILL.md

# VDA 5050 ↔ ROS 2 / Nav 2 Integration

How to bridge a **VDA 5050 v3.0.0** fleet-control interface (MQTT/JSON)
onto a Nav 2 stack **without breaking Clean Architecture**.

- Protocol overview: `rules/vda5050_protocol.md`.
- Complete message/field spec + processes: `rules/vda5050_messages.md`.
- **Code-generation format analysis** (3 proven idioms — pydantic /
  ROS `.msg`+bridge / C++ structs): `rules/vda5050_implementation_formats.md`.
  Real reference repos in `~/nav2_ws/src/`: `isaac_mission_dispatch`
  (fleet/pydantic), `isaac_ros_cloud_control` (robot/ROS 2),
  `vda5050_core` (C++ core).
- Authoritative spec + JSON schemas: `~/nav2_ws/src/VDA5050/`
  (`VDA5050_EN.md`, `json_schemas/*.schema`).
- **Reference implementation** (working, Clean-Architecture):
  `~/nav2_ws/src/vda5050_connector/`. Study it before writing a new one.

## The one rule that drives the design

VDA 5050 is an **outer-world protocol** — MQTT topics, JSON payloads,
schema versions. By the layer rules in `rules/clean_architecture.md`:

> The protocol does **not** belong in the domain. Domain entities model
> *orders, nodes, edges, actions, state* as plain Python/C++ — no MQTT,
> no `json`, no `*_msgs`. MQTT and Nav 2 are **infrastructure adapters**
> behind domain ports.

```
PRESENTATION   launch entrypoint, CLI, RViz
     │
APPLICATION    OrderHandler · StateAssembler · InstantActionHandler
     │            (graph traversal, action queue, state aggregation)
DOMAIN         entities: Order/Node/Edge/Action/State/Connection/Factsheet
     ▲            ports:  MqttClientInterface · NavigationAdapterInterface
     │ implements
INFRASTRUCTURE MqttBridge (paho) · Nav2Adapter (rclpy + NavigateToPose)
```

The connector mirrors this exactly:

```
vda5050_connector/
├── domain/
│   ├── entities/        order.py state.py action.py connection.py
│   │                    factsheet.py visualization.py header.py graph.py
│   └── interfaces/      mqtt_client.py  navigation_adapter.py   ← PORTS
├── application/         order_handler.py  state_assembler.py
│                        action_handler.py
└── infrastructure/      mqtt_bridge.py  nav2_adapter.py
                         fleet_adapter.py  vda5050_node.py        ← composition root
```

## Layer responsibilities

### Domain — entities + ports (no ROS, no MQTT, no JSON)

Model each message as a dataclass tree. Keep field names aligned with
the JSON schema so (de)serialization is mechanical, but **do not** put
`json.loads` here — that is an adapter concern.

```python
# domain/entities/order.py  (excerpt of the real connector)
@dataclass
class NodePosition:
    x: float = 0.0
    y: float = 0.0
    theta: float = 0.0
    map_id: str = ''

@dataclass
class OrderNode:
    node_id: str = ''
    sequence_id: int = 0
    released: bool = False
    node_position: Optional[NodePosition] = None
    actions: List[Action] = field(default_factory=list)

@dataclass
class Order:
    order_id: str = ''
    order_update_id: int = 0
    nodes: List[OrderNode] = field(default_factory=list)
    edges: List[OrderEdge] = field(default_factory=list)
```

Ports (abstract) live in `domain/interfaces/`:

```python
class NavigationAdapterInterface(ABC):
    @abstractmethod
    def navigate_to(self, node: OrderNode, on_done) -> None: ...
    @abstractmethod
    def cancel(self) -> None: ...
    @abstractmethod
    def get_position(self) -> AGVPosition: ...

class MqttClientInterface(ABC):
    @abstractmethod
    def publish(self, topic, payload, qos=0, retain=False) -> None: ...
    @abstractmethod
    def subscribe(self, topic, callback, qos=0) -> None: ...
    @abstractmethod
    def set_last_will(self, topic, payload, qos=1, retain=True) -> None: ...
```

### Application — protocol logic, depends only on domain

- **`OrderHandler`** — validates an incoming `Order`, enforces
  base/horizon, runs the node/edge graph in `sequenceId` order,
  resolves missing `nodePosition` from a `NavigationGraph` (VDA 5050
  allows omitting position when both MC and robot know the node),
  tracks `nodeStates`/`edgeStates`, and calls `NavigationAdapterInterface`.
  Handles order **updates / stitching** (same `orderId`, next
  `orderUpdateId`, first node = `lastNodeId`).
- **`StateAssembler`** — aggregates position, velocity, battery, errors,
  `actionStates`, `nodeStates`/`edgeStates` into a `State` entity for
  periodic publish.
- **`InstantActionHandler`** — `cancelOrder`, `startPause`/`stopPause`,
  `stateRequest`, `factsheetRequest`, `pick`/`drop`, etc.; drives the
  `actionStatus` state machine (`WAITING → … → FINISHED/FAILED/RETRIABLE`).

### Infrastructure — adapters implementing the ports

- **`MqttBridge`** (paho-mqtt) — topic `vda5050/v3/<mfr>/<serial>/<topic>`,
  QoS 0 for everything except `connection` (QoS 1), MQTT **last will** on
  `connection` = `CONNECTION_BROKEN`. (De)serializes JSON ↔ domain
  entities here, and **validates against `json_schemas/*.schema`**.
- **`Nav2Adapter`** (`rclpy`) — maps `navigate_to(node)` →
  `NavigateToPose` action; reads telemetry from running topics
  (`/amcl_pose` → position, `/odom` → velocity, `/battery_state`,
  `/scan` → safety, `/diagnostics` → errors/info).
- **`vda5050_node.py`** — composition root: instantiates adapters,
  injects ports into the application services, runs the publish timers.

## VDA 5050 → Nav 2 mapping cheat-sheet

| VDA 5050 | Nav 2 / ROS 2 |
|----------|----------------|
| `order` node (released base) | `NavigateToPose` goal (per node) or `NavigateThroughPoses` (whole base) |
| `nodePosition {x,y,theta,mapId}` | `geometry_msgs/PoseStamped` in `map` frame |
| `edge.maximumSpeed` | controller `setSpeedLimit` / velocity-smoother cap |
| `edge.trajectory` (NURBS) | sample → `nav_msgs/Path` (or let the planner replan) |
| `cancelOrder` instant action | cancel the Nav 2 action goal |
| `startPause`/`stopPause` | cancel/resume goal or velocity smoother gate |
| `pick`/`drop`/`finePositioning` | custom behavior / action server (your hardware) |
|
behaviortree-reviewerSubagent

Use proactively before opening a PR that adds or changes BehaviorTree.CPP nodes or BehaviorTree.ROS2 wrappers (RosActionNode/RosServiceNode/RosTopicPub/SubNode, TreeExecutionServer). Reviews a diff against BT.CPP v4 conventions — node base-class choice, non-blocking ticks, ports/blackboard typing, factory/plugin registration, XML v4, and the ROS 2 wrapper contract. Returns a punch list with file:line anchors, not a rewrite.

clean-arch-architectSubagent

Use when a design decision touches Clean Architecture boundaries in a ROS 2 project — which layer a new behaviour belongs to, whether a port belongs in domain or application, whether a new node should be lifecycle-managed, whether to compose nodes or split packages. Returns an architectural recommendation with trade-offs, not implementation.

ecs-architectSubagent

Use when a design decision touches the gz-sim ECS — where new state should live, which system phase should write it, how to avoid coupling, whether to add a component vs. a member variable, whether a new system should be split or merged with an existing one. Returns an architectural recommendation with trade-offs, not implementation.

gz-style-reviewerSubagent

Use proactively before opening any gz-sim PR. Reviews a diff against the project's C++17 style, ECS conventions, plugin registration patterns, CMake structure, test placement, Migration.md / Changelog.md expectations, and pre-commit configuration. Returns a punch list, not a rewrite.

ros2-controllers-reviewerSubagent

Use proactively before opening a PR that adds or changes a ros2_control controller, broadcaster, or hardware component (incl. URDF <ros2_control> bringup). Reviews a diff against ros2_controllers / ros2_control_demos conventions — controller & hardware lifecycle, command/state interface configuration, real-time safety of update()/read()/write(), generate_parameter_library usage, pluginlib registration, chainable-controller correctness, URDF wiring, and tests. Returns a punch list with file:line anchors, not a rewrite.

ros2-style-reviewerSubagent

Use proactively before opening any ROS 2 / Nav 2 PR. Reviews a diff against this template's Clean Architecture, ROS 2 communication, lifecycle, testing, and Nav 2 plugin conventions. Returns a punch list with file:line anchors, not a rewrite.

vda5050-reviewerSubagent

Use proactively before opening a PR that touches a VDA 5050 connector / fleet bridge. Reviews a diff against VDA 5050 v3.0.0 protocol compliance (topics, QoS, header rules, base/horizon, action state machine, schema validation) and the template's Clean Architecture for the MQTT↔Nav 2 bridge. Returns a punch list with file:line anchors, not a rewrite.

buildSlash Command

Build the colcon workspace (optionally a single package) and report the outcome.