diff --git a/README.md b/README.md index 96e1168..6094f34 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,9 @@ cd packages/device-connect-server && python3 -m pytest tests/ -v # Agent-tools unit tests (no Docker) cd packages/device-connect-agent-tools && python3 -m pytest tests/test_connection_unit.py tests/test_tools_unit.py -v +# Portal local-route + small-fleet shortcut integration tests +cd tests && pytest tests/test_tools_portal.py -v -m portal --timeout=120 + # Integration tests (requires Docker) cd tests && docker compose -f docker-compose-itest.yml up -d DEVICE_CONNECT_ALLOW_INSECURE=true python3 -m pytest tests/ -v -m "not llm" diff --git a/packages/device-connect-agent-tools/README.md b/packages/device-connect-agent-tools/README.md index c3ad08d..2de4c9e 100644 --- a/packages/device-connect-agent-tools/README.md +++ b/packages/device-connect-agent-tools/README.md @@ -20,6 +20,7 @@ Framework-agnostic tools for Device Connect — discover and invoke devices from - [Connection](#connection) - [Auto-Discovery](#auto-discovery) - [JWT Credentials](#jwt-credentials) + - [Portal-assisted local Zenoh routes](#portal-assisted-local-zenoh-routes) - [Explicit Configuration](#explicit-configuration) - [Environment Variables](#environment-variables) - [Device-to-Device Mode (No Infrastructure)](#device-to-device-mode-no-infrastructure) @@ -115,7 +116,7 @@ describe_fleet() ~200 tokens (counts by type and location └─▸ invoke_device(...) call a function ``` -**Small-fleet shortcut:** When the fleet has 5 or fewer devices, `describe_fleet()` and `list_devices()` automatically include full function schemas in the response — the agent can skip straight to `invoke_device()` in one or two calls. The threshold is configurable via the `DEVICE_CONNECT_SMALL_FLEET_THRESHOLD` environment variable (set to `0` to always require drill-down). +**Small-fleet shortcut:** When the fleet has 5 or fewer devices, `describe_fleet()` and `list_devices()` automatically include full function schemas in the response — the agent can skip straight to `invoke_device()` in one or two calls. The threshold is configurable via the `DEVICE_CONNECT_SMALL_FLEET_THRESHOLD` environment variable (set to `0` to always require drill-down). The same shortcut applies regardless of how the agent connected (portal local Zenoh, portal NATS/registry, or explicit `NATS_URL`). **Example — an agent resolving "check the lobby cameras":** @@ -325,6 +326,93 @@ connect( ) ``` +### Portal-assisted local Zenoh routes + +A **credential bundle** can carry two independent pieces of configuration: + +| Block | Role | +|-------|------| +| `nats` (portal route) | Identity and policy — JWT/NKey (or TLS) to reach the tenant through the portal or cloud NATS router, plus registry-backed discovery when that path is used. | +| `local` (optional) | Data-plane shortcut — explicit Zenoh locator(s) and optional scoped TLS files for same-LAN unicast to a router or device, without replacing portal identity. | + +When `DEVICE_CONNECT_PORTAL_CREDENTIALS_FILE` (or `DEVICE_CONNECT_CREDENTIALS_FILE`) points at such a bundle **and** you do not also set broker URLs via `connect(...)` or `NATS_URL` / `ZENOH_CONNECT` / `MESSAGING_URLS`, agent tools: + +1. Prefer the **local** Zenoh route when `DEVICE_CONNECT_PREFER_LOCAL` is true (default) and `local.routes` is present. +2. Use **D2D presence discovery** on that Zenoh session (peers on the same router/LAN), not the registry. +3. **Fall back** to the portal `nats` route if the local Zenoh `connect()` fails (then discovery uses the registry again). +4. Set **`zone` / `tenant`** from the bundle’s `tenant` (or `zone`) field when `connect(zone=...)` is omitted. + +Set `DEVICE_CONNECT_PREFER_LOCAL=false` to skip the local block and use the portal NATS route even when `local` is present. + +#### Example bundle + +```json +{ + "tenant": "lab-a", + "device_id": "robot-001", + "nats": { + "urls": ["nats://portal.example:4222"], + "jwt": "...", + "nkey_seed": "..." + }, + "local": { + "routes": ["tls/192.168.1.42:7447"], + "tls": { + "ca_file": "/path/to/ca.pem", + "cert_file": "/path/to/client.pem", + "key_file": "/path/to/client-key.pem" + }, + "expires_at": "2026-05-20T18:00:00Z" + } +} +``` + +Equivalent keys are accepted for tooling compatibility: top-level `local_routes`, block `local_zenoh`, or `zenoh.local_routes` / `zenoh.tls` (see `load_portal_credentials_file()` in code). + +#### Running an agent with a portal bundle + +```bash +export DEVICE_CONNECT_ALLOW_INSECURE=true # dev / lab only +export DEVICE_CONNECT_PORTAL_CREDENTIALS_FILE=~/lab-agent.creds.json +# Do not set NATS_URL or ZENOH_CONNECT — the bundle selects the route. + +python -c " +from device_connect_agent_tools import connect, disconnect, describe_fleet, invoke_device +connect() +print(describe_fleet()) +disconnect() +" +``` + +Integration tests for this path live in [`tests/tests/test_tools_portal.py`](../tests/tests/test_tools_portal.py) (`pytest -m portal`). + +#### Explicit local routes vs “auto-discover on my subnet” + +**What this release does:** the agent does **not** scan the subnet or infer a device IP. It only uses **explicit** `local.routes` from the bundle (or falls back to portal NATS). That is intentional for the first cut: the locator, TLS trust, and expiry are security-sensitive and are meant to be **issued** (portal, ops, or device provisioning), not guessed. + +**What you might expect (and how it relates):** + +- **Portal credentials only, no `local` block** — Works today. The agent uses NATS + registry like any remote agent; no LAN shortcut. +- **Same LAN, no portal bundle** — [Device-to-Device mode](#device-to-device-mode-no-infrastructure) uses Zenoh multicast scouting; peers appear without knowing their IP. That path does not mix in portal JWT policy; it is a separate dev/LAN workflow. +- **Portal identity + automatic LAN shortcut** — Reasonable future direction, not implemented here. An agent could: + - read device endpoints from the registry (if devices publish a reachable Zenoh/NATS locator and scoped certs), then + - probe reachability (subnet, or any routable IP), then + - open a direct session while still using portal-issued trust material. + +That would require devices (or the portal) to **advertise** a connectable local endpoint in registration data, plus new agent logic to try direct routes before the portal path. “Same subnet” alone is not enough: the agent still needs a **locator** (`tcp/192.168.1.5:7447`, `tls/...`) and often **mTLS** trust roots tied to the tenant. + +**Registry-advertised local routes (implemented):** when the bundle has portal `nats` but no `local` block, agent-tools can query the registry (over the portal route) for devices whose `status.local_zenoh` advertises LAN Zenoh locators, then try those routes before staying on NATS. Devices running on Zenoh with explicit `messaging_urls` publish `local_zenoh` automatically (disable with `DEVICE_CONNECT_ADVERTISE_LOCAL_ZENOH=false` on the device). For **containerized devices**, set `DEVICE_CONNECT_LOCAL_ZENOH_ROUTES` to the host/LAN-reachable Zenoh router (see [device-connect-edge — containers](../device-connect-edge/README.md#local-zenoh-shortcuts-from-containers)). Disable agent-side discovery with `DEVICE_CONNECT_DISCOVER_LOCAL_FROM_REGISTRY=false`. + +**What the portal emits today:** the [Device Connect Portal](../device-connect-server/device_connect_server/portal/README.md) typically downloads agent `.creds.json` files with `tenant` + `nats` (JWT) only. The portal UI may later copy `local_zenoh` from registered devices into agent bundles; until then, devices advertise routes in the registry and agents discover them as above. + +#### Precedence (summary) + +| Source | Effect | +|--------|--------| +| `connect(messaging_urls=[...])` or `NATS_URL` / `ZENOH_CONNECT` / `MESSAGING_URLS` | Ignores the portal bundle for broker selection (bundle may still supply credentials if not overridden). | +| Portal bundle + no URL env | Local Zenoh if `local.routes` and prefer-local; else portal NATS. | +| `security_infra/` auto-discovery | Used only when the bundle and env did not supply credentials/TLS. | + ### Explicit Configuration ```python @@ -347,6 +435,12 @@ connect( | `NATS_CREDENTIALS_FILE` | Path to `.creds.json` file | | `NATS_JWT` + `NATS_NKEY_SEED` | Direct JWT auth | | `NATS_TLS_CA_FILE` | CA certificate for TLS | +| `DEVICE_CONNECT_PORTAL_CREDENTIALS_FILE` | Portal-issued bundle with portal credentials and optional local Zenoh route | +| `DEVICE_CONNECT_CREDENTIALS_FILE` | Generic alias for a portal-issued credential bundle | +| `DEVICE_CONNECT_PREFER_LOCAL` | Set to `false` to ignore local Zenoh routes in portal bundles | +| `DEVICE_CONNECT_DISCOVER_LOCAL_FROM_REGISTRY` | Set to `false` to skip registry `status.local_zenoh` discovery when the bundle has no `local` block (default: `true`) | +| `DEVICE_CONNECT_ADVERTISE_LOCAL_ZENOH` | On devices: set to `false` to omit `status.local_zenoh` from registration/heartbeats (default: `true` when using Zenoh with explicit URLs) | +| `DEVICE_CONNECT_LOCAL_ZENOH_ROUTES` | On devices: comma-separated Zenoh locators to **advertise** in `status.local_zenoh` (connect URLs unchanged; use for Docker/K8s) | | `TENANT` | Device Connect zone/namespace (default: `"default"`) | | `DEVICE_CONNECT_DISCOVERY_MODE` | Set to `d2d` to skip registry and discover via presence | @@ -459,6 +553,7 @@ invoke_device = wrap_tool(_invoke_device) | `connect(messaging_urls, zone, credentials, tls_config)` | Initialize messaging connection | | `disconnect()` | Close connection and release resources | | `get_connection()` | Get current connection (auto-connects if needed) | +| `load_portal_credentials_file(path)` | Parse a portal bundle into `{tenant, portal, local}` (see [portal routes](#portal-assisted-local-zenoh-routes)) | ### Hierarchical Discovery Tools diff --git a/packages/device-connect-agent-tools/device_connect_agent_tools/connection.py b/packages/device-connect-agent-tools/device_connect_agent_tools/connection.py index 482f5a1..b1d43f2 100644 --- a/packages/device-connect-agent-tools/device_connect_agent_tools/connection.py +++ b/packages/device-connect-agent-tools/device_connect_agent_tools/connection.py @@ -110,6 +110,144 @@ def _auto_discover_tls() -> Optional[Dict[str, Any]]: return None +def _env_flag(name: str, default: bool = False) -> bool: + value = os.getenv(name) + if value is None: + return default + return value.strip().lower() in ("1", "true", "yes", "on") + + +def _portal_credentials_path() -> Optional[str]: + """Return a portal-issued credential bundle path, if configured.""" + for env_name in ( + "DEVICE_CONNECT_PORTAL_CREDENTIALS_FILE", + "DEVICE_CONNECT_CREDENTIALS_FILE", + ): + value = os.getenv(env_name) + if value: + return value + return None + + +def _as_list(value: Any) -> List[str]: + if value is None: + return [] + if isinstance(value, str): + return [value] + if isinstance(value, list): + return [v for v in value if isinstance(v, str) and v] + return [] + + +def load_portal_credentials_file(path: str | Path) -> Dict[str, Any]: + """Load a portal-issued credential bundle. + + The portal remains the authority for identity and policy, but may include + scoped local Zenoh route material for same-LAN unicast access. The returned + shape separates the portal route from the optional local fast path so the + connection layer can prefer local access and still fall back to the portal. + """ + with open(path, "r") as f: + data = json.load(f) + if not isinstance(data, dict): + raise ValueError(f"expected JSON object in credentials file: {path}") + + tenant = data.get("tenant") or data.get("zone") + + nats_config = data.get("nats") or {} + portal: Optional[Dict[str, Any]] = None + if isinstance(nats_config, dict): + nats_auth = {} + if nats_config.get("jwt"): + nats_auth["jwt"] = nats_config["jwt"] + if nats_config.get("nkey_seed"): + nats_auth["nkey_seed"] = nats_config["nkey_seed"] + + nats_urls = _as_list(nats_config.get("urls")) or _as_list(nats_config.get("url")) + if nats_urls or nats_auth: + portal = { + "backend": "nats", + "servers": nats_urls, + "credentials": nats_auth or None, + "tls": nats_config.get("tls") or None, + } + + zenoh_config = data.get("zenoh") or {} + if not isinstance(zenoh_config, dict): + zenoh_config = {} + local_config = data.get("local") or data.get("local_zenoh") or {} + if not isinstance(local_config, dict): + local_config = {} + + local_routes = ( + _as_list(data.get("local_routes")) + or _as_list(local_config.get("routes")) + or _as_list(local_config.get("urls")) + or _as_list(zenoh_config.get("local_routes")) + ) + local_tls = local_config.get("tls") or zenoh_config.get("tls") or None + local: Optional[Dict[str, Any]] = None + if local_routes: + local = { + "backend": "zenoh", + "servers": local_routes, + "credentials": local_config.get("credentials") or None, + "tls": local_tls, + "device_id": local_config.get("device_id") or data.get("device_id"), + "expires_at": local_config.get("expires_at") or data.get("expires_at"), + } + + return {"tenant": tenant, "portal": portal, "local": local} + + +def normalize_local_zenoh_dict(data: Any) -> Optional[Dict[str, Any]]: + """Normalize a ``local_zenoh`` / ``local`` block into a Zenoh route config.""" + if not isinstance(data, dict): + return None + routes = ( + _as_list(data.get("routes")) + or _as_list(data.get("urls")) + or _as_list(data.get("local_routes")) + ) + if not routes: + return None + return { + "backend": "zenoh", + "servers": routes, + "credentials": data.get("credentials"), + "tls": data.get("tls"), + } + + +def collect_local_route_candidates_from_devices( + devices: List[Dict[str, Any]], +) -> List[Dict[str, Any]]: + """Collect unique LAN Zenoh route configs advertised in registry device records.""" + seen: set[tuple[str, ...]] = set() + candidates: List[Dict[str, Any]] = [] + for raw in devices: + if not isinstance(raw, dict): + continue + status = raw.get("status") or {} + for source in (status.get("local_zenoh"), raw.get("local_zenoh")): + cfg = normalize_local_zenoh_dict(source) + if not cfg: + continue + key = tuple(cfg["servers"]) + if key in seen: + continue + seen.add(key) + candidates.append(cfg) + return candidates + + +def _resolve_portal_credentials() -> Optional[Dict[str, Any]]: + path = _portal_credentials_path() + if not path: + return None + return load_portal_credentials_file(path) + + # ── Device payload helper ────────────────────────────────────────── @@ -152,6 +290,7 @@ def flatten_device(raw: Dict[str, Any]) -> Dict[str, Any]: # when neither source provided any label -- discover() treats that # as "no label-based match," not "matches everything." "labels": merged_labels, + "local_zenoh": status.get("local_zenoh"), } @@ -206,16 +345,71 @@ def __init__( tls_config: Optional[Dict[str, Any]] = None, request_timeout: float = 30.0, ): + portal = _resolve_portal_credentials() + if zone == "default" and portal and portal.get("tenant"): + zone = portal["tenant"] self.zone = zone self._request_timeout = request_timeout - # Resolve config: explicit params -> env vars (via MessagingConfig) -> auto-discovery - config = MessagingConfig( - servers=[nats_url] if nats_url else None, - credentials=credentials, - tls_config=tls_config, + env_has_urls = any( + os.getenv(name) + for name in ("ZENOH_CONNECT", "MESSAGING_URLS", "NATS_URL", "NATS_URLS") + ) + portal_cfg = (portal or {}).get("portal") or {} + local_cfg = (portal or {}).get("local") or {} + prefer_local = _env_flag("DEVICE_CONNECT_PREFER_LOCAL", True) + can_use_portal_bundle = portal is not None and not nats_url and not env_has_urls + + config_backend: Optional[str] = None + config_servers: Optional[List[str]] = [nats_url] if nats_url else None + config_credentials = credentials + config_tls = tls_config + self._using_local_route = False + self._fallback_config: Optional[Dict[str, Any]] = None + self._registry_local_discovery = False + self._stored_portal_cfg: Optional[Dict[str, Any]] = None + + discover_local_from_registry = _env_flag( + "DEVICE_CONNECT_DISCOVER_LOCAL_FROM_REGISTRY", True, ) + if can_use_portal_bundle and prefer_local and local_cfg.get("servers"): + config_backend = "zenoh" + config_servers = local_cfg.get("servers") + config_credentials = local_cfg.get("credentials") + config_tls = local_cfg.get("tls") + self._using_local_route = True + if portal_cfg.get("servers"): + self._fallback_config = portal_cfg + elif ( + can_use_portal_bundle + and prefer_local + and discover_local_from_registry + and portal_cfg.get("servers") + ): + self._registry_local_discovery = True + self._stored_portal_cfg = dict(portal_cfg) + self._fallback_config = dict(portal_cfg) + config_backend = portal_cfg.get("backend") + config_servers = portal_cfg.get("servers") + config_credentials = credentials or portal_cfg.get("credentials") + config_tls = tls_config or portal_cfg.get("tls") + elif can_use_portal_bundle and portal_cfg.get("servers"): + config_backend = portal_cfg.get("backend") + config_servers = portal_cfg.get("servers") + config_credentials = credentials or portal_cfg.get("credentials") + config_tls = tls_config or portal_cfg.get("tls") + + # Resolve config: explicit params -> env vars (via MessagingConfig) -> auto-discovery + config_kwargs = { + "servers": config_servers, + "credentials": config_credentials, + "tls_config": config_tls, + } + if config_backend: + config_kwargs["backend"] = config_backend + config = MessagingConfig(**config_kwargs) + self._backend = config.backend # "nats", "zenoh", or "mqtt" (auto-detected) self._servers = config.servers self._credentials = config.credentials @@ -229,7 +423,14 @@ def __init__( # If no explicit server URL was given but TLS was discovered, # default to tls:// instead of nats:// - if not nats_url and not os.getenv("NATS_URL") and not os.getenv("NATS_URLS") and not os.getenv("MESSAGING_URLS") and not os.getenv("ZENOH_CONNECT"): + if ( + not self._using_local_route + and not nats_url + and not os.getenv("NATS_URL") + and not os.getenv("NATS_URLS") + and not os.getenv("MESSAGING_URLS") + and not os.getenv("ZENOH_CONNECT") + ): if self._tls_config: self._servers = ["tls://localhost:4222"] @@ -248,14 +449,20 @@ def __init__( ) self._d2d_mode = ( os.getenv("DEVICE_CONNECT_DISCOVERY_MODE", "").lower() in ("d2d", "p2p") - or (self._backend == "zenoh" and no_explicit_urls) + or self._using_local_route + or (self._backend == "zenoh" and no_explicit_urls and not portal_cfg) ) self._d2d_collector = None # lazy-initialized PresenceCollector # In D2D mode with Zenoh and no explicit URLs, use empty servers (multicast scouting). # When DEVICE_CONNECT_DISCOVERY_MODE=d2d is forced alongside a router URL (ZENOH_CONNECT), # keep the router URL so we can still communicate with devices connected to it. - if self._d2d_mode and self._backend == "zenoh" and no_explicit_urls: + if ( + self._d2d_mode + and self._backend == "zenoh" + and no_explicit_urls + and not self._using_local_route + ): self._servers = [] # Dedicated event loop for sync-to-async bridging @@ -276,7 +483,126 @@ def connect(self) -> None: """Establish the messaging connection.""" self._run(self._async_connect()) + def _apply_local_route( + self, + local_cfg: Dict[str, Any], + *, + portal_fallback: Optional[Dict[str, Any]] = None, + ) -> None: + """Configure this connection for a local Zenoh route with optional portal fallback.""" + self._backend = local_cfg.get("backend") or "zenoh" + self._servers = local_cfg.get("servers") or [] + self._credentials = local_cfg.get("credentials") + self._tls_config = local_cfg.get("tls") + self._using_local_route = True + self._d2d_mode = True + if portal_fallback and portal_fallback.get("servers"): + self._fallback_config = portal_fallback + + def _apply_portal_route(self, portal_cfg: Dict[str, Any]) -> None: + """Configure this connection for the portal (remote) route only.""" + self._backend = portal_cfg.get("backend") or "nats" + self._servers = portal_cfg.get("servers") or [] + self._credentials = portal_cfg.get("credentials") + self._tls_config = portal_cfg.get("tls") + self._using_local_route = False + self._d2d_mode = False + self._fallback_config = None + + async def _fetch_registry_local_route_candidates( + self, + portal_cfg: Dict[str, Any], + ) -> List[Dict[str, Any]]: + """Query the registry over the portal route for devices advertising ``local_zenoh``.""" + client = create_client(backend=portal_cfg.get("backend") or "nats") + try: + await client.connect( + servers=portal_cfg.get("servers") or [], + credentials=portal_cfg.get("credentials"), + tls_config=portal_cfg.get("tls"), + ) + registry = _SDKRegistryClient( + client, + tenant=self.zone, + timeout=self._request_timeout, + cache_ttl=0, + ) + devices = await registry.list_devices() + candidates = collect_local_route_candidates_from_devices(devices) + logger.info( + "Discovered %d registry-advertised local Zenoh route(s)", + len(candidates), + ) + return candidates + finally: + try: + await client.close() + except Exception: + logger.debug("cleanup error closing registry probe client", exc_info=True) + + async def _async_connect_registry_local_discovery(self) -> None: + """Probe registry for ``local_zenoh``, try each route, else use portal NATS.""" + portal = self._stored_portal_cfg or self._fallback_config or {} + candidates: List[Dict[str, Any]] = [] + try: + candidates = await self._fetch_registry_local_route_candidates(portal) + except Exception as e: + logger.warning("Registry local-route discovery failed: %s", e) + + for local_cfg in candidates: + self._apply_local_route(local_cfg, portal_fallback=portal) + try: + await self._async_connect_current() + self._registry_local_discovery = False + return + except Exception as e: + logger.info( + "Registry-advertised local route %s failed: %s", + local_cfg.get("servers"), + e, + ) + if self._client: + try: + await self._client.close() + except Exception: + logger.debug( + "cleanup error closing failed local client", + exc_info=True, + ) + self._client = None + + logger.info("No usable registry local route; connecting via portal") + self._apply_portal_route(portal) + self._registry_local_discovery = False + await self._async_connect_current() + async def _async_connect(self) -> None: + if self._registry_local_discovery: + await self._async_connect_registry_local_discovery() + return + try: + await self._async_connect_current() + except Exception: + if not self._fallback_config: + raise + logger.info("Local Device Connect route failed; falling back to portal route") + if self._client: + try: + await self._client.close() + except Exception: + logger.debug("cleanup error closing failed local client", exc_info=True) + + fallback = self._fallback_config + self._fallback_config = None + self._using_local_route = False + self._d2d_mode = False + self._backend = fallback.get("backend") or "nats" + self._servers = fallback.get("servers") or [] + self._credentials = fallback.get("credentials") + self._tls_config = fallback.get("tls") + await self._async_connect_current() + + async def _async_connect_current(self) -> None: self._client = create_client(backend=self._backend) await self._client.connect( servers=self._servers, @@ -288,6 +614,7 @@ async def _async_connect(self) -> None: # Initialize discovery provider if self._d2d_mode: from device_connect_edge.discovery import PresenceCollector, D2DRegistry + self._d2d_collector = PresenceCollector(self._client, self.zone) await self._d2d_collector.start() await self._d2d_collector.wait_for_peers(timeout=3.0) diff --git a/packages/device-connect-agent-tools/tests/test_connection_unit.py b/packages/device-connect-agent-tools/tests/test_connection_unit.py index 306933e..7ee6510 100644 --- a/packages/device-connect-agent-tools/tests/test_connection_unit.py +++ b/packages/device-connect-agent-tools/tests/test_connection_unit.py @@ -10,7 +10,7 @@ import json import os -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch from device_connect_agent_tools import connection as conn_mod @@ -90,6 +90,129 @@ def test_config_skips_autodiscovery_when_already_set(self, MockConfig, ad_creds, assert conn._tls_config == {"ca_file": "/from-env/ca.pem"} conn.close() + @patch.object(conn_mod, "_auto_discover_tls", return_value=None) + @patch.object(conn_mod, "_auto_discover_credentials", return_value=None) + def test_portal_bundle_prefers_local_zenoh_route(self, _ad_creds, _ad_tls, tmp_path): + bundle = tmp_path / "agent.creds.json" + bundle.write_text(json.dumps({ + "tenant": "lab-a", + "device_id": "robot-001", + "nats": { + "urls": ["nats://portal.example:4222"], + "jwt": "portal-jwt", + "nkey_seed": "portal-seed", + }, + "local": { + "routes": ["tls/192.168.1.42:7447"], + "tls": { + "ca_file": "/tmp/ca.pem", + "cert_file": "/tmp/client.pem", + "key_file": "/tmp/client-key.pem", + }, + }, + })) + + with patch.dict(os.environ, {"DEVICE_CONNECT_PORTAL_CREDENTIALS_FILE": str(bundle)}, clear=True): + conn = conn_mod.DeviceConnection() + + assert conn.zone == "lab-a" + assert conn._backend == "zenoh" + assert conn._servers == ["tls/192.168.1.42:7447"] + assert conn._tls_config == { + "ca_file": "/tmp/ca.pem", + "cert_file": "/tmp/client.pem", + "key_file": "/tmp/client-key.pem", + } + assert conn._d2d_mode is True + assert conn._fallback_config["servers"] == ["nats://portal.example:4222"] + conn.close() + + @patch.object(conn_mod, "_auto_discover_tls", return_value=None) + @patch.object(conn_mod, "_auto_discover_credentials", return_value=None) + def test_portal_bundle_can_disable_local_preference(self, _ad_creds, _ad_tls, tmp_path): + bundle = tmp_path / "agent.creds.json" + bundle.write_text(json.dumps({ + "tenant": "lab-a", + "nats": {"urls": ["nats://portal.example:4222"], "jwt": "j", "nkey_seed": "s"}, + "local_routes": ["tls/192.168.1.42:7447"], + "zenoh": {"tls": {"ca_file": "/tmp/ca.pem"}}, + })) + + env = { + "DEVICE_CONNECT_PORTAL_CREDENTIALS_FILE": str(bundle), + "DEVICE_CONNECT_PREFER_LOCAL": "false", + } + with patch.dict(os.environ, env, clear=True): + conn = conn_mod.DeviceConnection() + + assert conn._backend == "nats" + assert conn._servers == ["nats://portal.example:4222"] + assert conn._credentials == {"jwt": "j", "nkey_seed": "s"} + assert conn._d2d_mode is False + conn.close() + + @patch.object(conn_mod, "_auto_discover_tls", return_value=None) + @patch.object(conn_mod, "_auto_discover_credentials", return_value=None) + def test_local_route_connect_failure_falls_back_to_portal(self, _ad_creds, _ad_tls, tmp_path): + bundle = tmp_path / "agent.creds.json" + bundle.write_text(json.dumps({ + "tenant": "lab-a", + "nats": { + "urls": ["nats://portal.example:4222"], + "jwt": "portal-jwt", + "nkey_seed": "portal-seed", + }, + "local": { + "routes": ["tls/192.168.1.42:7447"], + "tls": {"ca_file": "/tmp/local-ca.pem"}, + }, + })) + + local_client = MagicMock() + local_client.connect = AsyncMock(side_effect=RuntimeError("local route unavailable")) + local_client.close = AsyncMock() + portal_client = MagicMock() + portal_client.connect = AsyncMock() + portal_client.close = AsyncMock() + registry = MagicMock() + + env = {"DEVICE_CONNECT_PORTAL_CREDENTIALS_FILE": str(bundle)} + with patch.dict(os.environ, env, clear=True), \ + patch("device_connect_agent_tools.connection.create_client", + side_effect=[local_client, portal_client]) as create_client, \ + patch("device_connect_agent_tools.connection._SDKRegistryClient", + return_value=registry) as registry_client: + conn = conn_mod.DeviceConnection() + conn.connect() + + assert [call.kwargs["backend"] for call in create_client.call_args_list] == ["zenoh", "nats"] + local_client.connect.assert_awaited_once_with( + servers=["tls/192.168.1.42:7447"], + credentials=None, + tls_config={"ca_file": "/tmp/local-ca.pem"}, + ) + local_client.close.assert_awaited_once() + portal_client.connect.assert_awaited_once_with( + servers=["nats://portal.example:4222"], + credentials={"jwt": "portal-jwt", "nkey_seed": "portal-seed"}, + tls_config=None, + ) + registry_client.assert_called_once_with( + portal_client, + tenant="lab-a", + timeout=30.0, + cache_ttl=30.0, + ) + assert conn._backend == "nats" + assert conn._servers == ["nats://portal.example:4222"] + assert conn._credentials == {"jwt": "portal-jwt", "nkey_seed": "portal-seed"} + assert conn._tls_config is None + assert conn._using_local_route is False + assert conn._d2d_mode is False + assert conn._fallback_config is None + assert conn._provider is registry + conn.close() + # ── Auto-discovery helpers ─────────────────────────────────────── @@ -133,6 +256,85 @@ def test_auto_discover_tls_found(self, tmp_path): result = conn_mod._auto_discover_tls() assert result == {"ca_file": str(certs_dir / "ca-cert.pem")} + def test_normalize_local_zenoh_dict(self): + cfg = conn_mod.normalize_local_zenoh_dict( + {"routes": ["tcp/10.0.0.5:7447"], "tls": {"ca_file": "/tmp/ca.pem"}}, + ) + assert cfg == { + "backend": "zenoh", + "servers": ["tcp/10.0.0.5:7447"], + "credentials": None, + "tls": {"ca_file": "/tmp/ca.pem"}, + } + + def test_collect_local_route_candidates_from_devices(self): + devices = [ + { + "device_id": "a", + "status": {"local_zenoh": {"routes": ["tcp/10.0.0.1:7447"]}}, + }, + { + "device_id": "b", + "status": {"local_zenoh": {"routes": ["tcp/10.0.0.1:7447"]}}, + }, + { + "device_id": "c", + "status": {"local_zenoh": {"routes": ["tcp/10.0.0.2:7447"]}}, + }, + ] + candidates = conn_mod.collect_local_route_candidates_from_devices(devices) + assert len(candidates) == 2 + assert {tuple(c["servers"]) for c in candidates} == { + ("tcp/10.0.0.1:7447",), + ("tcp/10.0.0.2:7447",), + } + + @patch.object(conn_mod, "_auto_discover_tls", return_value=None) + @patch.object(conn_mod, "_auto_discover_credentials", return_value=None) + def test_portal_bundle_registry_local_discovery_flag(self, _ad_creds, _ad_tls, tmp_path): + bundle = tmp_path / "agent.creds.json" + bundle.write_text(json.dumps({ + "tenant": "lab-a", + "nats": {"urls": ["nats://portal.example:4222"], "jwt": "j", "nkey_seed": "s"}, + })) + + with patch.dict(os.environ, {"DEVICE_CONNECT_PORTAL_CREDENTIALS_FILE": str(bundle)}, clear=True): + conn = conn_mod.DeviceConnection() + + assert conn._registry_local_discovery is True + assert conn._stored_portal_cfg["servers"] == ["nats://portal.example:4222"] + assert conn._fallback_config["servers"] == ["nats://portal.example:4222"] + conn.close() + + def test_load_portal_credentials_file_splits_portal_and_local_routes(self, tmp_path): + bundle = tmp_path / "agent.creds.json" + bundle.write_text(json.dumps({ + "tenant": "lab-a", + "nats": {"urls": ["nats://portal.example:4222"], "jwt": "j", "nkey_seed": "s"}, + "local_routes": ["tls/192.168.1.42:7447"], + "zenoh": {"tls": {"ca_file": "/tmp/ca.pem"}}, + })) + + result = conn_mod.load_portal_credentials_file(bundle) + + assert result == { + "tenant": "lab-a", + "portal": { + "backend": "nats", + "servers": ["nats://portal.example:4222"], + "credentials": {"jwt": "j", "nkey_seed": "s"}, + "tls": None, + }, + "local": { + "backend": "zenoh", + "servers": ["tls/192.168.1.42:7447"], + "credentials": None, + "tls": {"ca_file": "/tmp/ca.pem"}, + "device_id": None, + "expires_at": None, + }, + } + # ── Singleton connect/disconnect ────────────────────────────────── diff --git a/packages/device-connect-edge/README.md b/packages/device-connect-edge/README.md index 6d73b84..b990936 100644 --- a/packages/device-connect-edge/README.md +++ b/packages/device-connect-edge/README.md @@ -160,6 +160,46 @@ To force D2D mode even when a router URL is set (e.g., router available but no r DEVICE_CONNECT_DISCOVERY_MODE=d2d ZENOH_CONNECT=tcp/localhost:7447 DEVICE_CONNECT_ALLOW_INSECURE=true python my_device.py ``` +### Local Zenoh shortcuts from containers + +When a device runs in Docker (or Kubernetes), its **connect** URL is often an internal +service name (`tcp/zenoh:7447`), while agents on the host need a **published** locator +(`tcp/localhost:7447` or `tcp/host.docker.internal:7447`). + +Use separate advertise routes (connect unchanged): + +```bash +ZENOH_CONNECT=tcp/zenoh:7447 +DEVICE_CONNECT_LOCAL_ZENOH_ROUTES=tcp/host.docker.internal:7447 +``` + +Docker Compose example (device service on the same network as `zenoh`): + +```yaml +services: + zenoh: + image: eclipse/zenoh:latest + ports: + - "7447:7447" + my-device: + environment: + ZENOH_CONNECT: tcp/zenoh:7447 + DEVICE_CONNECT_LOCAL_ZENOH_ROUTES: tcp/host.docker.internal:7447 + extra_hosts: + - "host.docker.internal:host-gateway" +``` + +Advertise the **Zenoh router** reachable from the agent, not the container’s bridge IP. +Manual override: set `status={"local_zenoh": {"routes": ["tcp/localhost:7447"]}}` on +`DeviceRuntime` (wins over auto-advertise and `DEVICE_CONNECT_LOCAL_ZENOH_ROUTES`). +Disable auto-advertise entirely with `DEVICE_CONNECT_ADVERTISE_LOCAL_ZENOH=false`. + +| Variable | Role | +|----------|------| +| `ZENOH_CONNECT` / `MESSAGING_URLS` | Where the device connects | +| `DEVICE_CONNECT_LOCAL_ZENOH_ROUTES` | Comma-separated locators published in `status.local_zenoh` | +| `DEVICE_CONNECT_ADVERTISE_LOCAL_ZENOH` | Set to `false` to omit auto `local_zenoh` | + **How it works:** Each device announces its presence (capabilities, identity, status) via `device-connect.{tenant}.{device_id}.presence` messages. Other devices subscribe to a wildcard and maintain an in-memory peer table. Device-to-device RPC works identically to infrastructure mode. **Trade-offs vs full infrastructure:** diff --git a/packages/device-connect-edge/device_connect_edge/__init__.py b/packages/device-connect-edge/device_connect_edge/__init__.py index 4812c51..115b8dc 100644 --- a/packages/device-connect-edge/device_connect_edge/__init__.py +++ b/packages/device-connect-edge/device_connect_edge/__init__.py @@ -40,6 +40,7 @@ async def alert(self, level: str, msg: str): DeviceCapabilities, DeviceIdentity, DeviceStatus, + LocalZenohRoute, FunctionDef, EventDef, ) @@ -64,6 +65,7 @@ async def alert(self, level: str, msg: str): "DeviceCapabilities", "DeviceIdentity", "DeviceStatus", + "LocalZenohRoute", "FunctionDef", "EventDef", "DiscoveryProvider", diff --git a/packages/device-connect-edge/device_connect_edge/device.py b/packages/device-connect-edge/device_connect_edge/device.py index c776d1a..6a1bfbf 100644 --- a/packages/device-connect-edge/device_connect_edge/device.py +++ b/packages/device-connect-edge/device_connect_edge/device.py @@ -90,6 +90,20 @@ async def capture_image(self, resolution: str = "1080p") -> dict: logger = logging.getLogger(__name__) +def _env_local_zenoh_advertise_routes() -> Optional[List[str]]: + """Zenoh locators advertised in ``status.local_zenoh`` (may differ from connect URLs). + + Set ``DEVICE_CONNECT_LOCAL_ZENOH_ROUTES`` to comma-separated locators reachable + from agents on the host/LAN (e.g. a published router port) while the device + connects internally via ``ZENOH_CONNECT`` / ``messaging_urls``. + """ + raw = os.getenv("DEVICE_CONNECT_LOCAL_ZENOH_ROUTES", "").strip() + if not raw: + return None + routes = [part.strip() for part in raw.split(",") if part.strip()] + return routes or None + + def build_rpc_response(id_: str, result: Any) -> bytes: return json.dumps({"jsonrpc": "2.0", "id": id_, "result": result}).encode() @@ -940,18 +954,62 @@ async def enqueue_event(self, event: str, payload: dict) -> None: self._logger.error("Event queue still full after drop; event lost: %s", event) + def _status_has_local_zenoh(self) -> bool: + """True when the operator set ``local_zenoh`` in device status (manual advertisement).""" + local = self.status.get("local_zenoh") + if not isinstance(local, dict): + return False + routes = local.get("routes") or local.get("urls") or [] + return bool(routes) + + def _build_local_zenoh_advertisement(self) -> Optional[dict]: + """Return auto-generated ``local_zenoh`` status material for registry/agent shortcuts.""" + if self._status_has_local_zenoh(): + return None + if os.getenv("DEVICE_CONNECT_ADVERTISE_LOCAL_ZENOH", "true").strip().lower() in ( + "0", "false", "no", "off", + ): + return None + if self._messaging_backend != "zenoh": + return None + routes = _env_local_zenoh_advertise_routes() or list(self.messaging_urls or []) + if not routes: + return None + tls_payload = None + if self.messaging_tls: + tls_payload = { + k: self.messaging_tls[k] + for k in ("ca_file", "cert_file", "key_file") + if self.messaging_tls.get(k) + } + return { + "routes": routes, + "tls": tls_payload or None, + } + + def _advertised_local_zenoh(self) -> Optional[dict]: + """``local_zenoh`` for registration/heartbeats: manual status wins over auto-build.""" + if self._status_has_local_zenoh(): + local = self.status.get("local_zenoh") + return local if isinstance(local, dict) else None + return self._build_local_zenoh_advertisement() + def _build_registration_params(self) -> dict: """Build the registration payload shared by _register and requestRegistration.""" caps = self._driver.capabilities if self._driver else self.capabilities + status = { + **self.status, + "ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + } + local_zenoh = self._advertised_local_zenoh() + if local_zenoh: + status["local_zenoh"] = local_zenoh params = { "device_id": self.device_id, "device_ttl": self.ttl, "capabilities": caps.model_dump(), "identity": self.identity, - "status": { - **self.status, - "ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), - }, + "status": status, } if hasattr(self, '_attestation_token') and self._attestation_token: params["attestation"] = self._attestation_token @@ -1003,6 +1061,9 @@ async def _heartbeat_loop(self) -> None: while True: # Build heartbeat payload beat = {"device_id": self.device_id, "ts": time.time()} + local_zenoh = self._advertised_local_zenoh() + if local_zenoh: + beat["local_zenoh"] = local_zenoh self._logger.debug(f"Heartbeat loop iteration, beat={beat}") if self._heartbeat_provider: try: diff --git a/packages/device-connect-edge/device_connect_edge/types.py b/packages/device-connect-edge/device_connect_edge/types.py index 5440708..6d7a2d5 100644 --- a/packages/device-connect-edge/device_connect_edge/types.py +++ b/packages/device-connect-edge/device_connect_edge/types.py @@ -216,6 +216,27 @@ class DeviceIdentity(BaseModel): ) +class LocalZenohRoute(BaseModel): + """LAN-reachable Zenoh locator advertised for agent local shortcuts. + + Devices publish this in ``DeviceStatus.local_zenoh`` so agents with portal + credentials can discover a same-site Zenoh router or device endpoint via the + registry, then connect directly while retaining portal identity as fallback. + """ + routes: List[str] = Field( + default_factory=list, + description="Zenoh locators (e.g. tcp/192.168.1.10:7447, tls/host:7447)", + ) + tls: Optional[Dict[str, str]] = Field( + default=None, + description="Optional TLS file paths (ca_file, cert_file, key_file) for mTLS", + ) + expires_at: Optional[str] = Field( + default=None, + description="ISO-8601 expiry after which agents should not use this route", + ) + + class DeviceStatus(BaseModel): """Runtime device status - dynamic state that changes over time. @@ -230,7 +251,8 @@ class DeviceStatus(BaseModel): busy_score=0.7, battery=85, online=True, - error_state=None + error_state=None, + local_zenoh=LocalZenohRoute(routes=["tcp/192.168.1.10:7447"]), ) """ ts: datetime = Field( @@ -265,6 +287,10 @@ class DeviceStatus(BaseModel): default=None, description="Error description if device is in error state" ) + local_zenoh: Optional[LocalZenohRoute] = Field( + default=None, + description="Optional LAN Zenoh route for agents to use as a local fast path", + ) # Type aliases for callbacks diff --git a/packages/device-connect-edge/tests/test_device.py b/packages/device-connect-edge/tests/test_device.py index 14d03fa..8e267b0 100644 --- a/packages/device-connect-edge/tests/test_device.py +++ b/packages/device-connect-edge/tests/test_device.py @@ -573,6 +573,54 @@ def test_no_attestation_when_unset(self): assert "attestation" not in params + def test_local_zenoh_advertised_for_zenoh_unicast(self): + driver = StubDriver() + rt = DeviceRuntime( + driver=driver, + device_id="sensor-zenoh", + messaging_urls=["tcp/192.168.0.10:7447"], + ) + rt._messaging_backend = "zenoh" + + params = rt._build_registration_params() + + assert params["status"]["local_zenoh"]["routes"] == ["tcp/192.168.0.10:7447"] + + def test_local_zenoh_env_override_for_container_router(self, monkeypatch): + """Connect URL can be in-container; advertise host-published router port.""" + monkeypatch.setenv( + "DEVICE_CONNECT_LOCAL_ZENOH_ROUTES", + "tcp/host.docker.internal:7447", + ) + driver = StubDriver() + rt = DeviceRuntime( + driver=driver, + device_id="sensor-container", + messaging_urls=["tcp/zenoh:7447"], + ) + rt._messaging_backend = "zenoh" + + params = rt._build_registration_params() + + assert params["status"]["local_zenoh"]["routes"] == [ + "tcp/host.docker.internal:7447" + ] + + def test_manual_status_local_zenoh_not_overwritten(self, monkeypatch): + monkeypatch.setenv("DEVICE_CONNECT_LOCAL_ZENOH_ROUTES", "tcp/ignored:7447") + driver = StubDriver() + rt = DeviceRuntime( + driver=driver, + device_id="sensor-manual", + messaging_urls=["tcp/zenoh:7447"], + status={"local_zenoh": {"routes": ["tcp/localhost:7447"]}}, + ) + rt._messaging_backend = "zenoh" + + params = rt._build_registration_params() + + assert params["status"]["local_zenoh"]["routes"] == ["tcp/localhost:7447"] + # ── _cmd_subscription requestRegistration ─────────────────────── diff --git a/packages/device-connect-server/device_connect_server/portal/README.md b/packages/device-connect-server/device_connect_server/portal/README.md index 0204c69..672a740 100644 --- a/packages/device-connect-server/device_connect_server/portal/README.md +++ b/packages/device-connect-server/device_connect_server/portal/README.md @@ -188,6 +188,27 @@ All settings are via environment variables: | POST | `/api/admin/nats/reload` | Regenerate config + SIGHUP | | POST | `/api/admin/health/verify` | Run isolation verification | +### Agent credential files and LAN Zenoh shortcuts + +The portal UI can download a per-tenant **agent** `.creds.json` (NATS JWT to the +shared router, plus `tenant`). That file is what +[`device-connect-agent-tools`](../../../device-connect-agent-tools/README.md#portal-assisted-local-zenoh-routes) +accepts via `DEVICE_CONNECT_PORTAL_CREDENTIALS_FILE`. + +Agent-tools also supports an optional **`local`** block in the same JSON shape +(Zenoh locator + scoped TLS) for same-LAN unicast while keeping portal identity +in `nats`. The portal does **not** populate `local` automatically yet; operators +can merge it in when they know a site router or device endpoint (for example +`tcp/192.168.1.42:7447`). A future portal release could attach `local` from +provisioning or registry data when a device advertises a reachable LAN locator. + +Agent-tools can also **discover** `status.local_zenoh` from the registry when the +agent bundle has portal credentials but no `local` block (see +`DEVICE_CONNECT_DISCOVER_LOCAL_FROM_REGISTRY`). Devices publish `local_zenoh` +when running on Zenoh with explicit router URLs (`DEVICE_CONNECT_ADVERTISE_LOCAL_ZENOH`). +Containerized devices can set `DEVICE_CONNECT_LOCAL_ZENOH_ROUTES` so the registry +advertises a host-published router port while the device connects via internal DNS. + ### Agent API (`/api/agent/v1/*`, requires Bearer token) JSON-only namespace for coding agents and CI clients. Distinct from browser diff --git a/packages/device-connect-server/device_connect_server/registry/service/main.py b/packages/device-connect-server/device_connect_server/registry/service/main.py index 9661d6e..e0d32de 100644 --- a/packages/device-connect-server/device_connect_server/registry/service/main.py +++ b/packages/device-connect-server/device_connect_server/registry/service/main.py @@ -256,6 +256,7 @@ class DeviceStatus(BaseModel): task_stack: list[str] | None = None battery: int | None = None # Battery level percentage (0-100) online: bool | None = None # Whether device is online + local_zenoh: dict | None = None # LAN Zenoh route advertisement for agent shortcuts class RegisterParams(BaseModel): diff --git a/tests/README.md b/tests/README.md index e3bce09..6f993a8 100644 --- a/tests/README.md +++ b/tests/README.md @@ -19,7 +19,9 @@ tests/ │ ├── test_sensor_device.py # Sensor device patterns │ ├── test_multi_device_scenario.py # Multi-device scenarios │ ├── test_d2d_discovery.py # D2D device discovery -│ ├── test_tools_discover.py # Agent tools: device discovery +│ ├── test_tools_discover.py # Agent tools: selector discovery +│ ├── test_tools_hierarchical.py # Agent tools: hierarchical + small-fleet shortcut +│ ├── test_tools_portal.py # Agent tools: portal local-route bundles │ ├── test_tools_invoke.py # Agent tools: function invocation │ ├── test_strands_agent.py # Strands agent integration (LLM) │ ├── test_messaging_conformance.py # Messaging backend conformance @@ -75,6 +77,9 @@ pip install -r requirements.txt ```bash docker compose -f docker-compose-itest.yml up -d +# After device-connect-edge registration schema changes, rebuild the registry image: +# docker compose -f docker-compose-itest.yml build device-registry-service +# docker compose -f docker-compose-itest.yml up -d device-registry-service --force-recreate ``` ### Tier 1: Core integration tests (no LLM) @@ -119,6 +124,7 @@ docker compose -f docker-compose-itest.yml down -v --remove-orphans | `llm` | Requires a real LLM API key | | `slow` | Takes > 30 seconds | | `conformance` | Messaging backend conformance tests | +| `portal` | Portal credential bundle tests (own messaging env) | ## CI/CD diff --git a/tests/conftest.py b/tests/conftest.py index 19f9612..8c0bab8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -62,6 +62,10 @@ def pytest_configure(config): config.addinivalue_line("markers", "llm: requires real LLM API key") config.addinivalue_line("markers", "slow: takes > 30 seconds") config.addinivalue_line("markers", "conformance: messaging backend conformance test") + config.addinivalue_line( + "markers", + "portal: portal credential bundle tests (manage their own messaging env)", + ) def pytest_collection_modifyitems(config, items): @@ -108,8 +112,20 @@ def messaging_url(messaging_backend): @pytest.fixture(autouse=True) -def _set_backend_env(messaging_backend): - """Set env vars so SDK/agent-tools auto-detect the correct backend.""" +def _set_backend_env(request): + """Set env vars so SDK/agent-tools auto-detect the correct backend. + + Skipped for ``@pytest.mark.portal`` tests, which configure messaging via + ``DEVICE_CONNECT_PORTAL_CREDENTIALS_FILE`` instead of parametrized backends. + """ + if request.node.get_closest_marker("portal") is not None: + yield + return + if "messaging_backend" not in request.fixturenames: + yield + return + + messaging_backend = request.getfixturevalue("messaging_backend") os.environ["MESSAGING_BACKEND"] = messaging_backend if messaging_backend == "zenoh": os.environ["DEVICE_CONNECT_DISCOVERY_MODE"] = "d2d" diff --git a/tests/docker-compose-itest.yml b/tests/docker-compose-itest.yml index 0c6002c..9da9cee 100644 --- a/tests/docker-compose-itest.yml +++ b/tests/docker-compose-itest.yml @@ -80,7 +80,9 @@ services: - itest-net restart: unless-stopped - # Zenoh router (no auth, no multicast scouting — use router mode for tests) + # Zenoh router (no auth, no multicast scouting — use router mode for tests). + # Containerized devices: ZENOH_CONNECT=tcp/zenoh:7447 and + # DEVICE_CONNECT_LOCAL_ZENOH_ROUTES=tcp/host.docker.internal:7447 (with extra_hosts). zenoh: image: eclipse/zenoh:latest container_name: itest-zenoh diff --git a/tests/fixtures/devices.py b/tests/fixtures/devices.py index 3fcae8e..503c1e0 100644 --- a/tests/fixtures/devices.py +++ b/tests/fixtures/devices.py @@ -10,7 +10,7 @@ import asyncio import logging import uuid -from typing import List, Optional, Tuple +from typing import Dict, List, Optional, Tuple from device_connect_edge import DeviceRuntime @@ -43,11 +43,12 @@ async def _spawn( device_id: str, wait_for_registration: bool = True, registration_timeout: float = 10.0, + status: Optional[dict] = None, ) -> Tuple[DeviceRuntime, object]: """Common spawn logic for any driver.""" driver._device_id = device_id - device = DeviceRuntime( + runtime_kwargs = dict( driver=driver, device_id=device_id, messaging_urls=[self.messaging_url], @@ -55,6 +56,10 @@ async def _spawn( ttl=self.default_ttl, allow_insecure=True, ) + if status is not None: + runtime_kwargs["status"] = status + + device = DeviceRuntime(**runtime_kwargs) task = asyncio.create_task(device.run()) self._devices.append(device) diff --git a/tests/tests/test_tools_portal.py b/tests/tests/test_tools_portal.py new file mode 100644 index 0000000..24fb3c9 --- /dev/null +++ b/tests/tests/test_tools_portal.py @@ -0,0 +1,411 @@ +# Copyright (c) 2024-2026, Arm Limited and Contributors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Integration tests for portal-assisted local Zenoh route shortcuts. + +Exercises ``DEVICE_CONNECT_PORTAL_CREDENTIALS_FILE`` bundles that pair a +portal (NATS/registry) route with an optional same-LAN Zenoh fast path: + +- Prefer local Zenoh when ``DEVICE_CONNECT_PREFER_LOCAL`` is true (default) +- Fall back to the portal NATS route when the local connect fails +- Use the portal route directly when local preference is disabled + +Also verifies the small-fleet shortcut (auto-expanded schemas) works when +the agent connects through the portal local route. +""" + +from __future__ import annotations + +import asyncio +import json +import os +import time +from typing import Any +from unittest.mock import patch + +import pytest + +ITEST_NATS_URL = os.getenv("NATS_URL", "nats://localhost:4222") +ITEST_ZENOH_URL = os.getenv("ZENOH_CONNECT", "tcp/localhost:7447") + +SETTLE_TIME = 0.3 +DISCOVERY_TIMEOUT = 8.0 + +# Placeholder local route in the fallback bundle (connect failure is injected via mock). +BROKEN_LOCAL_ZENOH = "tcp/127.0.0.1:19999" + +_URL_ENV_VARS = ( + "ZENOH_CONNECT", + "MESSAGING_URLS", + "NATS_URL", + "NATS_URLS", +) + + +def _write_portal_bundle( + path, + *, + local_routes: list[str] | None, + local_tls: dict[str, str] | None = None, + portal_nats_url: str = ITEST_NATS_URL, + tenant: str = "default", +) -> None: + data: dict[str, Any] = {"tenant": tenant, "nats": {"urls": [portal_nats_url]}} + if local_routes: + local: dict[str, Any] = {"routes": local_routes} + if local_tls: + local["tls"] = local_tls + data["local"] = local + path.write_text(json.dumps(data)) + + +@pytest.fixture +def portal_bundle_path(tmp_path): + """Factory fixture: write a portal credential bundle and return its path.""" + + def _factory( + *, + local_routes: list[str] | None, + local_tls: dict[str, str] | None = None, + portal_nats_url: str = ITEST_NATS_URL, + tenant: str = "default", + ) -> str: + bundle = tmp_path / f"portal-{tenant}.creds.json" + _write_portal_bundle( + bundle, + local_routes=local_routes, + local_tls=local_tls, + portal_nats_url=portal_nats_url, + tenant=tenant, + ) + return str(bundle) + + return _factory + + +@pytest.fixture +def portal_agent_env(monkeypatch): + """Clear broker URL env vars so only the portal bundle configures the agent.""" + + def _activate( + bundle_path: str, + *, + prefer_local: str | None = None, + device_d2d: bool = False, + ) -> None: + for name in _URL_ENV_VARS: + monkeypatch.delenv(name, raising=False) + monkeypatch.delenv("MESSAGING_BACKEND", raising=False) + monkeypatch.delenv("DEVICE_CONNECT_DISCOVERY_MODE", raising=False) + monkeypatch.setenv("DEVICE_CONNECT_ALLOW_INSECURE", "true") + monkeypatch.setenv("DEVICE_CONNECT_PORTAL_CREDENTIALS_FILE", bundle_path) + if device_d2d: + monkeypatch.setenv("DEVICE_CONNECT_DISCOVERY_MODE", "d2d") + if prefer_local is not None: + monkeypatch.setenv("DEVICE_CONNECT_PREFER_LOCAL", prefer_local) + else: + monkeypatch.delenv("DEVICE_CONNECT_PREFER_LOCAL", raising=False) + + return _activate + + +async def _wait_for_device_ids(expected_ids: set[str]) -> None: + from device_connect_agent_tools.connection import get_connection + + deadline = time.monotonic() + DISCOVERY_TIMEOUT + while True: + conn = get_connection() + devices = await asyncio.to_thread(conn.list_devices) + ids = {d.get("device_id") for d in devices} + if expected_ids.issubset(ids) or time.monotonic() > deadline: + return + await asyncio.sleep(0.25) + + +# ── Local Zenoh shortcut ─────────────────────────────────────────── + + +@pytest.mark.asyncio +@pytest.mark.integration +@pytest.mark.portal +async def test_portal_prefers_local_zenoh_route( + infrastructure, + portal_bundle_path, + portal_agent_env, +): + """Portal bundle connects via local Zenoh and discovers a peer on the itest router.""" + from fixtures.devices import DeviceFactory + + from device_connect_agent_tools import connect, disconnect, invoke + from device_connect_agent_tools.connection import get_connection + + zenoh_url = ITEST_ZENOH_URL + bundle = portal_bundle_path(local_routes=[zenoh_url]) + portal_agent_env(bundle, device_d2d=True) + + factory = DeviceFactory(messaging_url=zenoh_url) + try: + await factory.spawn_sensor( + "itest-portal-local-sensor", + initial_temp=21.0, + initial_humidity=55.0, + ) + await asyncio.sleep(SETTLE_TIME) + + await asyncio.to_thread(connect) + try: + conn = get_connection() + assert conn._using_local_route is True + assert conn._backend == "zenoh" + assert conn._servers == [zenoh_url] + assert conn._fallback_config is not None + assert conn._fallback_config["servers"] == [ITEST_NATS_URL] + + await _wait_for_device_ids({"itest-portal-local-sensor"}) + + result = await asyncio.to_thread( + invoke, + "device(itest-portal-local-sensor).function(get_reading)", + {"unit": "celsius"}, + ) + assert result["success"] is True + assert result["device_id"] == "itest-portal-local-sensor" + assert "temperature" in result["result"] + finally: + await asyncio.to_thread(disconnect) + finally: + await factory.cleanup() + + +@pytest.mark.asyncio +@pytest.mark.integration +@pytest.mark.portal +async def test_portal_local_route_small_fleet_shortcut( + infrastructure, + portal_bundle_path, + portal_agent_env, +): + """Small-fleet auto-expand works when the agent uses the portal local Zenoh route.""" + from fixtures.devices import DeviceFactory + + from device_connect_agent_tools import connect, disconnect + from device_connect_agent_tools.tools import describe_fleet + + zenoh_url = ITEST_ZENOH_URL + bundle = portal_bundle_path(local_routes=[zenoh_url]) + portal_agent_env(bundle, device_d2d=True) + + factory = DeviceFactory(messaging_url=zenoh_url) + try: + await factory.spawn_sensor("itest-portal-shortcut-sensor") + await asyncio.sleep(SETTLE_TIME) + + await asyncio.to_thread(connect) + try: + deadline = time.monotonic() + DISCOVERY_TIMEOUT + result = None + while True: + result = await asyncio.to_thread(describe_fleet) + if ( + result.get("total_devices", 0) >= 1 + and any( + d.get("device_id") == "itest-portal-shortcut-sensor" + for d in result.get("devices", []) + ) + ): + break + if time.monotonic() > deadline: + break + await asyncio.sleep(0.25) + + assert result is not None + assert "devices" in result, "Small fleet should auto-include device details" + sensor = next( + d for d in result["devices"] + if d["device_id"] == "itest-portal-shortcut-sensor" + ) + assert len(sensor.get("functions", [])) >= 1 + assert "parameters" in sensor["functions"][0] + finally: + await asyncio.to_thread(disconnect) + finally: + await factory.cleanup() + + +# ── Portal NATS route ────────────────────────────────────────────── + + +@pytest.mark.asyncio +@pytest.mark.integration +@pytest.mark.portal +async def test_portal_prefers_portal_nats_when_local_disabled( + infrastructure, + portal_bundle_path, + portal_agent_env, +): + """DEVICE_CONNECT_PREFER_LOCAL=false uses the portal NATS/registry route.""" + from fixtures.devices import DeviceFactory + + from device_connect_agent_tools import connect, disconnect, invoke + from device_connect_agent_tools.connection import get_connection + + nats_url = ITEST_NATS_URL + bundle = portal_bundle_path( + local_routes=[ITEST_ZENOH_URL], + portal_nats_url=nats_url, + ) + portal_agent_env(bundle, prefer_local="false") + + factory = DeviceFactory(messaging_url=nats_url) + try: + await factory.spawn_sensor("itest-portal-nats-sensor", initial_temp=19.5) + await asyncio.sleep(SETTLE_TIME) + + await asyncio.to_thread(connect) + try: + conn = get_connection() + assert conn._using_local_route is False + assert conn._backend == "nats" + assert conn._servers == [nats_url] + assert conn._fallback_config is None + + await _wait_for_device_ids({"itest-portal-nats-sensor"}) + + result = await asyncio.to_thread( + invoke, + "device(itest-portal-nats-sensor).function(get_reading)", + {"unit": "celsius"}, + ) + assert result["success"] is True + finally: + await asyncio.to_thread(disconnect) + finally: + await factory.cleanup() + + +# ── Fallback to portal ───────────────────────────────────────────── + + +@pytest.mark.asyncio +@pytest.mark.integration +@pytest.mark.portal +async def test_portal_falls_back_to_nats_when_local_unavailable( + infrastructure, + portal_bundle_path, + portal_agent_env, +): + """When the local Zenoh connect fails, the agent falls back to portal NATS/registry.""" + from fixtures.devices import DeviceFactory + + from device_connect_agent_tools import connect, disconnect, invoke + from device_connect_agent_tools import connection as conn_mod + from device_connect_agent_tools.connection import get_connection + + nats_url = ITEST_NATS_URL + bundle = portal_bundle_path( + local_routes=[BROKEN_LOCAL_ZENOH], + portal_nats_url=nats_url, + ) + portal_agent_env(bundle) + + factory = DeviceFactory(messaging_url=nats_url) + try: + await factory.spawn_sensor("itest-portal-fallback-sensor", initial_temp=18.0) + await asyncio.sleep(SETTLE_TIME) + + original_connect = conn_mod.DeviceConnection._async_connect_current + attempts = 0 + + async def _connect_with_simulated_local_failure(self): + nonlocal attempts + attempts += 1 + if attempts == 1: + raise RuntimeError("local route unavailable") + return await original_connect(self) + + with patch.object( + conn_mod.DeviceConnection, + "_async_connect_current", + _connect_with_simulated_local_failure, + ): + await asyncio.to_thread(connect) + + try: + conn = get_connection() + assert attempts == 2 + assert conn._using_local_route is False + assert conn._backend == "nats" + assert conn._servers == [nats_url] + assert conn._fallback_config is None + + await _wait_for_device_ids({"itest-portal-fallback-sensor"}) + + result = await asyncio.to_thread( + invoke, + "device(itest-portal-fallback-sensor).function(get_reading)", + {"unit": "celsius"}, + ) + assert result["success"] is True + assert result["device_id"] == "itest-portal-fallback-sensor" + finally: + await asyncio.to_thread(disconnect) + finally: + await factory.cleanup() + + +# ── Registry-advertised local routes ─────────────────────────────── + + +@pytest.mark.asyncio +@pytest.mark.integration +@pytest.mark.portal +async def test_portal_discovers_local_zenoh_from_registry( + infrastructure, + portal_bundle_path, + portal_agent_env, + monkeypatch, +): + """Portal bundle without ``local`` uses registry ``status.local_zenoh`` advertisements.""" + from fixtures.devices import DeviceFactory + + from device_connect_agent_tools import connect, disconnect, invoke + from device_connect_agent_tools.connection import get_connection + + nats_url = ITEST_NATS_URL + zenoh_url = ITEST_ZENOH_URL + bundle = portal_bundle_path(local_routes=None, portal_nats_url=nats_url) + portal_agent_env(bundle) + + nats_factory = DeviceFactory(messaging_url=nats_url) + zenoh_factory = DeviceFactory(messaging_url=zenoh_url) + try: + await nats_factory.spawn_sensor( + "itest-portal-registry-beacon", + status={"local_zenoh": {"routes": [zenoh_url]}}, + ) + monkeypatch.setenv("DEVICE_CONNECT_DISCOVERY_MODE", "d2d") + await zenoh_factory.spawn_sensor("itest-portal-registry-target", initial_temp=20.0) + await asyncio.sleep(SETTLE_TIME) + + await asyncio.to_thread(connect) + try: + conn = get_connection() + assert conn._using_local_route is True + assert conn._backend == "zenoh" + assert conn._servers == [zenoh_url] + + await _wait_for_device_ids({"itest-portal-registry-target"}) + + result = await asyncio.to_thread( + invoke, + "device(itest-portal-registry-target).function(get_reading)", + {"unit": "celsius"}, + ) + assert result["success"] is True + assert result["device_id"] == "itest-portal-registry-target" + finally: + await asyncio.to_thread(disconnect) + finally: + await nats_factory.cleanup() + await zenoh_factory.cleanup()