NashMark AI for Ambiguity-Sensitive Continuity and Drift Recovery in Navigation

NashMark AI for Ambiguity-Sensitive Continuity and Drift Recovery in Navigation

Executive Summary

This paper presents NashMark AI as an equilibrium-governed navigation and recovery model for degraded, ambiguous, and label-light traversal environments. It does not begin from the assumption that navigation is merely a point-estimation problem. It begins from the premise that navigation is a continuity problem under uncertainty: the system must preserve coherent traversal when observations are incomplete, distorted, delayed, or structurally ambiguous.

The benchmark position established here is precise. Conventional Kalman filtering remains strongest in clean labelled-route tracking and remains highly efficient under benign conditions. HMM-style models retain a clear role in discrete branch-state interpretation where stable labels are present. NashMark AI shows its clearest benchmarked advantages in ambiguity-sensitive continuity, drift recovery, and degraded traversal conditions, where equilibrium preservation matters more than immediate local correction alone.

This paper therefore does not claim universal superiority across all navigation conditions. It establishes a narrower and more defensible proposition: NashMark AI is a viable navigation and recovery architecture with strongest current evidence in ambiguous, degraded, and continuity-sensitive environments, and it is expected to tighten further where rooted label authority is supplied directly into the model state.

Part I — Mathematical Foundation

Chapter 1. Navigation as Continuity, Not Mere Fix Estimation

Most conventional navigation systems are constructed as recursive estimators. They attempt to infer latent state from noisy observations through prediction and correction. This is effective when the observation stream remains sufficiently stable and when the environment can be represented as a well-behaved estimation problem. The NashMark position is broader. It treats navigation as the preservation of lawful traversal under uncertainty.

The key distinction is that a navigation system should not only estimate where it is. It should preserve state coherence, resist false collapse, restore itself after drift, and defer unsafe commitment until structural confidence is sufficient. In this sense, navigation is not reduced to coordinate error alone. It is governed by equilibrium.

Chapter 2. Latent Path Inference and Corridor Retention

Let the latent traversal state at step $t$ be defined over a candidate set of route states or traversal states. A probabilistic inference layer assigns likelihood over those states rather than forcing immediate collapse to a single branch. The resulting posterior set forms a corridor of plausible traversal.

A basic latent-state score may be written as:

$\delta_t(j) = \max_i \left[\delta_{t-1}(i) + \log a_{ij}\right] + \log b_j(t)$

where $a_{ij}$ is the transition structure and $b_j(t)$ is the observation likelihood.

The resulting posterior mass over candidate states is then normalised into a corridor:

$\Gamma_t = \{ j : \gamma_t(j) \ge \tau \}$

where $\tau$ is the corridor retention threshold.

The significance of this is practical. Under degraded observation, the system does not have to collapse instantly to one route edge or one path label. It can retain structured ambiguity while remaining traversally coherent.

Chapter 3. Equilibrium as Governing Principle

NashMark AI is grounded in the view that instability can be represented as unresolved destabilising load, while restoration is represented as re-entry into equilibrium. In the navigation setting, this becomes a drift-governed interpretation of movement.

Let net traversal imbalance be:

$\Delta_t = \text{destabilising load} - \text{stabilising structure}$

and equilibrium closure occur when:

$\Delta_t = 0$

This is not merely symbolic. In the navigation setting, destabilising load includes observation drift, ambiguity, structural inconsistency, and breach pressure. Stabilising structure includes state coherence, governance stability, route continuity, and restoration potential.

Chapter 4. Recovery and Degradation Dynamics

To model how the system behaves through time under degraded conditions, the navigation layer uses recovery and degradation curves.

A simplified recovery form is:

$R(t) = E^* - e^{-kt}$

A simplified degradation form is:

$D_h(t) = D_0 e^{-\lambda t}$

These functions allow the model to represent not merely current state, but expected recovery capacity and residual degradation pressure over time. This is essential in tunnel, dropout, drift, or ambiguity-heavy settings, where the current observation cannot be interpreted in isolation.

Chapter 5. Governance Stability

Navigation under uncertainty is not only a probability problem. It is also a decision-governance problem. A model that commits too fast can fail structurally even when its local evidence looks strong. NashMark therefore introduces a governance layer.

A generic governance stability measure may be written as:

$G(t) = w_c \bar{C}(t) + w_p P(t) - w_v V(t)$

where:

  • $\bar{C}(t)$ is average cooperation or agreement,
  • $P(t)$ is policy alignment,
  • $V(t)$ is volatility across candidate policy states.

This gives the model a basis for asking not merely which state is most likely, but whether the system is sufficiently governed to commit safely.

Chapter 6. Safe Operating Envelope

The model is further constrained by a safe operating envelope. A navigation system should not be forced into hard commitment when its own structural conditions are unsafe.

Define moral or structural stability as:

$MSS(t) = \frac{C(t)}{C(t) + D(t)}$

and systemic risk as:

$R_{sys}(t) = w_d d(t) + w_v V(t)$

The system is treated as safely governed when:

$MSS(t) \ge \theta_M$

and

$R_{sys}(t) \le \theta_R$

This means branch commitment, transition hardening, and aggressive restoration can be gated by lawful state, not by raw probability alone.

Part II — Applied Navigation Architecture

Chapter 7. NashMark AI as Navigation Architecture

The applied NashMark navigation model is composed of the following layers:

  1. Latent traversal inference over candidate states.
  2. Corridor retention under uncertainty.
  3. Equilibrium refinement under live observation.
  4. Governance stability assessment.
  5. Safe-envelope gating.
  6. Dynamic restoration and commit control.

This distinguishes the model from a plain recursive estimator. The system is not simply asking where the point estimate lies. It is evaluating whether current traversal remains coherent, governable, and safe to harden.

Chapter 8. Observation, Pressure, and Ambiguity Signals

At each live step, the system receives an observation or a degraded absence of observation. From that it derives key state signals:

  • pressure,
  • ambiguity,
  • stable observation strength,
  • breach condition.

Pressure is derived from the discrepancy between observation and retained corridor target. Ambiguity is derived from corridor width and candidate-state dominance. Stable observation strength is treated as the complement of severe pressure. Breach condition activates when drift or pressure passes a threshold of concern.

These signals are not the final state. They are inputs into the equilibrium layer.

Chapter 9. Inner Equilibrium Refinement

The core of the NashMark navigation engine is that one external navigation step need not imply only one internal update. Instead, the model performs multiple inner equilibrium refinements before emitting a final outer decision.

This gives the form:

live step $t$ → internal refinements $k = 1,2,\dots,K$ → converged or near-converged decision state

This is critical. It allows the model to settle its own state internally while still returning a real-time output. The system is therefore not reducible to a single-pass filter.

import math
import random
from typing import Dict, List, Tuple, Any
import numpy as np
from road_graph import RoadGraph, point_to_segment_distance

class NashMarkNav:
    """
    Canonical NashMark navigation model.
    Merges:
    - NashMark engine equilibrium refinement
    - governance / safe-envelope gating
    - probability-weighted corridor cooperation
    - ambiguity-aware commitment
    - dynamic gain modulation
    - temporal commit gating
    """
    def __init__(
        self,
        graph: RoadGraph,
        sigma: float = 7.0,
        corridor_tau: float = 0.12,
        sentinel_theta: float = 1.45,
        random_seed: int = 42,
        horizon_scale: int = 100000,
        epsilon_start: float = 0.18,
        epsilon_end: float = 0.02,
        inner_iterations: int = 48,
        convergence_tol: float = 1e-3,
    ):
        random.seed(random_seed)
        np.random.seed(random_seed)
        self.graph = graph
        self.sigma = sigma
        self.corridor_tau = corridor_tau
        self.sentinel_theta = sentinel_theta
        self.horizon_scale = horizon_scale
        self.inner_iterations = inner_iterations
        self.convergence_tol = convergence_tol
        # ------------------------------------------------------------------
        # Navigation memory
        # ------------------------------------------------------------------
        self.delta_prev: Dict[str, float] | None = None
        self.gamma_prev: Dict[str, float] | None = None
        self.current_edge: str = "AB"
        self.current_pos: np.ndarray = self.graph.edge_midpoint("AB")
        self.corridor: List[str] = ["AB"]
        self.history_edges: List[str] = []
        self.drift_load: float = 0.0
        # Temporal commit gating
        self.commit_candidate_edge = None
        self.commit_streak = 0
        self.commit_required_streak = 3
        # ------------------------------------------------------------------
        # Nash-Markov backbone
        # ------------------------------------------------------------------
        self.t = 0
        self.states = ["SELFISH", "MIXED", "COOPERATIVE"]
        self.actions = ["DEFECT", "HOLD", "COOPERATE"]
        self.num_states = len(self.states)
        self.num_actions = len(self.actions)
        self.Q = np.zeros((self.num_states, self.num_actions), dtype=float)
        self.P = np.array(
            [
                [0.60, 0.30, 0.10],
                [0.20, 0.50, 0.30],
                [0.05, 0.15, 0.80],
            ],
            dtype=float,
        )
        self.alpha_q = 0.25
        self.gamma_q = 0.92
        self.epsilon_start = epsilon_start
        self.epsilon_end = epsilon_end
        self.current_state = 1
        self.current_action = 1
        # ------------------------------------------------------------------
        # MSS / equilibrium stability
        # ------------------------------------------------------------------
        self.cooperative_mass_total = 1.0
        self.defection_mass_total = 1.0
        self.mss = 0.5
        # ------------------------------------------------------------------
        # Recovery / degradation curves
        # ------------------------------------------------------------------
        self.E_star = 1.0
        self.k = 1.0 / 20000.0
        self.lambda_h = 1.0 / 40000.0
        self.D0 = 1.0
        self.pressure_center = 30000.0
        self.pressure_width = 8000.0
        self.pressure_strength_R = 0.07
        self.pressure_strength_Dh = 0.18
        self.R = 0.0
        self.Dh = 1.0
        # ------------------------------------------------------------------
        # Governance / multi-agent convergence
        # ------------------------------------------------------------------
        self.num_agents = 3
        self.agent_coops = np.array([0.28, 0.52, 0.18], dtype=float)
        self.pi_eq = 0.80
        self.alpha_agent = 0.001
        self.beta_gov = 0.0009
        self.tau_decay = 1200.0
        self.noise_scale = 0.12
        self.policy_alignment = 0.0
        self.policy_consensus = 0.0
        self.volatility = 1.0
        self.gov_stability = 0.0
        self.w_c = 0.60
        self.w_p = 0.25
        self.w_v = 0.15
        # ------------------------------------------------------------------
        # Safe operating envelope
        # ------------------------------------------------------------------
        self.systemic_risk = 1.0
        self.w_d = 0.70
        self.w_v_risk = 0.30
        self.theta_M = 0.75
        self.theta_R = 0.25
        self.safe_envelope = False
        # ------------------------------------------------------------------
        # Drift score / intervention
        # ------------------------------------------------------------------
        self.drift_score = 0.0
        self.intervention_threshold = 1.5
        # ======================================================================
        # Geometry / graph helpers
        # ======================================================================
    def emission_logprob(self, z: np.ndarray, edge_id: str) -> float:
        edge = self.graph.edges[edge_id]
        d = point_to_segment_distance(
            z,
            np.array(edge.points[0], dtype=float),
            np.array(edge.points[1], dtype=float),
        )
        return -0.5 * (d / self.sigma) ** 2

    def transition_logprob(self, prev_edge: str, curr_edge: str) -> float:
        if prev_edge == curr_edge:
            return math.log(0.54)
        if curr_edge in self.graph.adjacency.get(prev_edge, []):
            return math.log(0.34)
        return math.log(0.12)

    def normalize_log_probs(self, log_probs: Dict[str, float]) -> Dict[str, float]:
        vals = np.array(list(log_probs.values()), dtype=float)
        m = float(np.max(vals))
        exps = np.exp(vals - m)
        denom = float(np.sum(exps))
        keys = list(log_probs.keys())
        return {k: float(exps[i] / denom) for i, k in enumerate(keys)}

    def project_point_to_edge(self, point: np.ndarray, edge_id: str) -> np.ndarray:
        edge = self.graph.edges[edge_id]
        a = np.array(edge.points[0], dtype=float)
        b = np.array(edge.points[1], dtype=float)
        ab = b - a
        denom = float(np.dot(ab, ab))
        if denom <= 1e-12:
            return a.copy()
        t = float(np.dot(point - a, ab) / denom)
        t = max(0.0, min(1.0, t))
        return a + t * ab

    def corridor_target(self, point: np.ndarray, corridor: List[str], gamma: Dict[str, float]) -> np.ndarray:
        pts = []
        ws = []
        for edge_id in corridor:
            proj = self.project_point_to_edge(point, edge_id)
            pts.append(proj)
            ws.append(max(gamma.get(edge_id, 0.0), 1e-6))
        pts_arr = np.stack(pts, axis=0)
        ws_arr = np.array(ws, dtype=float)
        ws_arr = ws_arr / np.sum(ws_arr)
                    return np.sum(pts_arr * ws_arr[:, None], axis=0)
    # ======================================================================
                # Engine helpers
                # ======================================================================
                def _scaled_time(self) -> float:
                    return min(float(self.t) * 1800.0, float(self.horizon_scale))
                def _epsilon(self) -> float:
                    progress = min(1.0, self.t / 5000.0)
                    return self.epsilon_start + (self.epsilon_end - self.epsilon_start) * progress
                def _classify_state(self, signals: Dict[str, float]) -> int:
                    pressure = float(signals.get("pressure", 0.0))
                    ambiguity = float(signals.get("ambiguity", 0.0))
                    stable_obs = float(signals.get("stable_obs", 0.0))
                    gov = float(self.gov_stability)
                    risk = float(self.systemic_risk)
                    score = (
                        0.35 * stable_obs
                        + 0.25 * self.mss
                        + 0.20 * gov
                        + 0.10 * self.R
                        - 0.25 * pressure
                        - 0.15 * ambiguity
                        - 0.20 * risk
                    )
                    if score < 0.20:
                        return 0
                    if score < 0.55:
                        return 1
                    return 2
                def _reward(self, state: int, action: int, signals: Dict[str, float]) -> float:
                    pressure = float(signals.get("pressure", 0.0))
                    ambiguity = float(signals.get("ambiguity", 0.0))
                    stable_obs = float(signals.get("stable_obs", 0.0))
                    breach = float(signals.get("breach", 0.0))
                    base = 0.0
                    if state == 2 and action == 2:
                        base += 1.0
                    elif state == 0 and action == 0:
                        base -= 1.0
                    else:
                        base -= 0.1
                    base += 0.50 * stable_obs
                    base -= 0.60 * pressure
                    base -= 0.35 * ambiguity
                    base -= 0.75 * breach
                    base += 0.35 * self.gov_stability
                    base -= 0.35 * self.systemic_risk
                    return float(base)
                def _choose_action(self, state: int) -> int:
                    eps = self._epsilon()
                    if random.random() < eps:
                        return random.choice(range(self.num_actions))
                    return int(np.argmax(self.Q[state]))
                def _update_q(self, state: int, action: int, reward: float, next_state: int) -> None:
                    self.Q[state, action] += self.alpha_q * (
                        reward + self.gamma_q * np.max(self.Q[next_state]) - self.Q[state, action]
                    )
                def _update_mss(self, cooperative_score: float, action: int) -> float:
                    cooperative_score = float(np.clip(cooperative_score, 0.0, 1.0))
                    self.cooperative_mass_total += cooperative_score
                    if action == 0:
                        self.defection_mass_total += 1.0
                    else:
                        self.defection_mass_total += max(0.0, 1.0 - cooperative_score)
                    denom = self.cooperative_mass_total + self.defection_mass_total
                    self.mss = self.cooperative_mass_total / denom if denom > 0 else 0.0
                    return self.mss
                def _update_recovery_degradation(self, pressure: float) -> None:
                    t_eff = self._scaled_time()
                    R_base = self.E_star - np.exp(-self.k * t_eff)
                    Dh_base = self.D0 * np.exp(-self.lambda_h * t_eff)
                    A = np.exp(-((t_eff - self.pressure_center) ** 2) / (2.0 * self.pressure_width ** 2))
                    R = R_base + self.pressure_strength_R * A
                    Dh = Dh_base - self.pressure_strength_Dh * A
                    R -= 0.18 * pressure
                    Dh += 0.28 * pressure
                    self.R = float(np.clip(R, 0.0, 1.0))
                    self.Dh = float(np.clip(Dh, 0.0, 1.0))
    def _update_multi_agent_convergence(self, signals: Dict[str, float]) -> None:
                    pressure = float(signals.get("pressure", 0.0))
                    ambiguity = float(signals.get("ambiguity", 0.0))
                    stable_obs = float(signals.get("stable_obs", 0.0))
                    dominant_signal = float(np.clip(1.0 - pressure, 0.0, 1.0))
                    coherence_signal = float(np.clip(1.0 - ambiguity, 0.0, 1.0))
                    persistence_signal = float(np.clip(0.5 * stable_obs + 0.5 * self.mss, 0.0, 1.0))
                    signals_arr = np.array([dominant_signal, coherence_signal, persistence_signal], dtype=float)
                    avg = float(np.mean(self.agent_coops))
                    gov_push = self.beta_gov * (self.pi_eq - avg)
                    decay = math.exp(-self.t / self.tau_decay)
                    updated = self.agent_coops.copy()
                    for i in range(self.num_agents):
                        drift = self.alpha_agent * (signals_arr[i] - updated[i])
                        noise = self.noise_scale * decay * np.random.randn()
                        updated[i] = updated[i] + drift + gov_push + noise
                    self.agent_coops = np.clip(updated, 0.0, 1.0)
                def _update_governance(self) -> None:
                    avg = float(np.mean(self.agent_coops))
                    P_t = avg
                    V_t = float(np.max(self.agent_coops) - np.min(self.agent_coops))
                    self.policy_alignment = float(np.max(self.agent_coops))
                    self.policy_consensus = float(avg)
                    self.volatility = float(V_t)
                    self.gov_stability = self.w_c * avg + self.w_p * P_t - self.w_v * V_t
                    self.gov_stability = float(np.clip(self.gov_stability, 0.0, 1.0))
                def _update_safe_operating_envelope(self) -> None:
                    avg = float(np.mean(self.agent_coops))
                    defect_rate = 1.0 - avg
                    self.systemic_risk = self.w_d * defect_rate + self.w_v_risk * self.volatility
                    self.systemic_risk = float(np.clip(self.systemic_risk, 0.0, 1.0))
                    self.safe_envelope = (self.mss >= self.theta_M) and (self.systemic_risk <= self.theta_R)
                def _update_drift_score(self, signals: Dict[str, float]) -> float:
                    pressure = float(signals.get("pressure", 0.0))
                    ambiguity = float(signals.get("ambiguity", 0.0))
                    stable_obs = float(signals.get("stable_obs", 0.0))
                    L = pressure
                    M = ambiguity
                    S = 1.0 - self.mss
                    H = self.R
                    Q = stable_obs
                    self.drift_score = (L + M + S) - (H + Q)
                    return self.drift_score
                def _snapshot(self) -> np.ndarray:
                    return np.array(
                        [
                            self.mss,
                            self.R,
                            self.Dh,
                            self.policy_alignment,
                            self.policy_consensus,
                            self.volatility,
                            self.gov_stability,
                            self.systemic_risk,
                            float(self.safe_envelope),
                            self.drift_score,
                            float(self.current_state),
                            float(self.current_action),
                            float(np.max(self.Q)),
                        ],
                        dtype=float,
                    )
                def _engine_refine(self, signals: Dict[str, float], cooperative_score: float) -> Dict[str, Any]:
                    last_snapshot = self._snapshot()
                    inner_count = 0
                    for k in range(self.inner_iterations):
                        inner_count = k + 1
                        state = self._classify_state(signals)
                        action = self._choose_action(state)
                        reward = self._reward(state, action, signals)
                        next_state = int(np.random.choice(range(self.num_states), p=self.P[state]))
                        self._update_q(state, action, reward, next_state)
                        self.current_state = next_state
                        self.current_action = action
                        self._update_mss(cooperative_score, action)
                        self._update_recovery_degradation(float(signals.get("pressure", 0.0)))
                        self._update_multi_agent_convergence(signals)
                        self._update_governance()
                        self._update_safe_operating_envelope()
                        self._update_drift_score(signals)
                        snap = self._snapshot()
                        delta = float(np.linalg.norm(snap - last_snapshot))
                        if delta < self.convergence_tol and (
                            self.safe_envelope or abs(self.drift_score) < self.intervention_threshold
                        ):
                            break
                        last_snapshot = snap
                    return {
                        "state_index": int(self.current_state),
                        "state_name": self.states[self.current_state],
                        "action_index": int(self.current_action),
                        "action_name": self.actions[self.current_action],
                        "mss": float(self.mss),
                        "R": float(self.R),
                        "Dh": float(self.Dh),
                        "policy_alignment": float(self.policy_alignment),
                        "policy_consensus": float(self.policy_consensus),
                        "volatility": float(self.volatility),
                        "gov_stability": float(self.gov_stability),
                        "systemic_risk": float(self.systemic_risk),
                        "safe_envelope": bool(self.safe_envelope),
                        "drift_score": float(self.drift_score),
                        "Q_max": float(np.max(self.Q)),
                        "inner_iterations_used": int(inner_count),
                    }
    # ======================================================================
                # Main navigation step
                # ======================================================================
                def step(self, z: np.ndarray | None) -> Tuple[str, np.ndarray, Dict[str, float], float]:
                    self.t += 1
                    if z is None:
                        gamma = self.gamma_prev or {self.current_edge: 1.0}
                        corridor = self.corridor if self.corridor else [self.current_edge]
                        target = self.corridor_target(self.current_pos, corridor, gamma)
                        corridor_mass = sum(gamma.get(e, 0.0) for e in corridor)
                        cooperative_score = float(np.clip(corridor_mass, 0.0, 1.0))
                        pressure = min(1.0, 0.28 + self.drift_load / 10.0)
                        ambiguity = 0.65 if len(corridor) > 1 else 0.20
                        stable_obs = 0.0
                        breach = 1.0 if self.drift_load > 1.5 else 0.0
                        eng = self._engine_refine(
                            {
                                "pressure": pressure,
                                "ambiguity": ambiguity,
                                "stable_obs": stable_obs,
                                "breach": breach,
                            },
                            cooperative_score=cooperative_score,
                        )
                        action_name = eng["action_name"]
                        if action_name == "COOPERATE":
                            hold_gain = 0.72
                        elif action_name == "HOLD":
                            hold_gain = 0.48
                        else:
                            hold_gain = 0.28
                        hold_gain += 0.12 * float(eng["R"])
                        hold_gain += 0.08 * float(eng["gov_stability"])
                        hold_gain -= 0.10 * float(eng["Dh"])
                        hold_gain = float(np.clip(hold_gain, 0.10, 0.92))
                        self.current_pos = self.current_pos + hold_gain * (target - self.current_pos)
                        self.drift_load = max(
                            0.0,
                            0.78 * self.drift_load
                            + float(eng["Dh"])
                            + pressure
                            + float(eng["systemic_risk"])
                            - float(eng["R"])
                            - float(eng["mss"])
                            - 0.80 * float(eng["gov_stability"])
                        )
                        self.history_edges.append(self.current_edge)
                        return self.current_edge, self.current_pos.copy(), gamma, self.drift_load
                    z = np.asarray(z, dtype=float)
                    candidates = self.graph.candidate_edges(z)
                    if self.delta_prev is None:
                        delta = {e: self.emission_logprob(z, e) for e in candidates}
                    else:
                        delta = {}
                        for curr in candidates:
                            best = -1e18
                            for prev, prev_score in self.delta_prev.items():
                                score = prev_score + self.transition_logprob(prev, curr) + self.emission_logprob(z, curr)
                                if score > best:
                                    best = score
                            delta[curr] = best
                    gamma = self.normalize_log_probs(delta)
                    corridor = [e for e, p in gamma.items() if p >= self.corridor_tau]
                    if not corridor:
                        corridor = [max(gamma.items(), key=lambda kv: kv[1])[0]]
                    dominant_edge = max(gamma.items(), key=lambda kv: kv[1])[0]
                    sorted_probs = sorted(gamma.values(), reverse=True)
                    if len(sorted_probs) == 1:
                        bf = float("inf")
                    else:
                        bf = float(sorted_probs[0] / max(sorted_probs[1], 1e-9))
                    is_ambiguous = (bf < self.sentinel_theta) or (len(corridor) > 1)
                    target = self.corridor_target(z, corridor, gamma)
                    obs_drift = float(np.linalg.norm(z - target))
                    pressure = obs_drift / (obs_drift + 5.0)
                    ambiguity = min(1.0, (len(corridor) - 1) / 3.0)
                    if is_ambiguous:
                        ambiguity = max(ambiguity, min(1.0, 2.0 - min(bf, 2.0)))
                    stable_obs = max(0.0, 1.0 - pressure)
                    breach = 1.0 if (self.drift_load > 1.5 or pressure > 0.75) else 0.0
                    corridor_mass = sum(gamma.get(e, 0.0) for e in corridor)
                    cooperative_score = float(np.clip(corridor_mass, 0.0, 1.0))
                    eng = self._engine_refine(
                        {
                            "pressure": pressure,
                            "ambiguity": ambiguity,
                            "stable_obs": stable_obs,
                            "breach": breach,
                        },
                        cooperative_score=cooperative_score,
                    )
                    self.drift_load = max(
                        0.0,
                        0.68 * self.drift_load
                        + float(eng["Dh"])
                        + pressure
                        + float(eng["systemic_risk"])
                        - float(eng["R"])
                        - float(eng["mss"])
                        - 0.85 * float(eng["gov_stability"])
                    )
                    # Commitment rule with temporal gating
                    dominance_ok = (
                        not is_ambiguous
                        and bf >= self.sentinel_theta
                        and float(eng["policy_consensus"]) >= 0.45
                        and float(eng["gov_stability"]) >= 0.45
                        and bool(eng["safe_envelope"])
                    )
                    if dominance_ok:
                        if self.commit_candidate_edge == dominant_edge:
                            self.commit_streak += 1
                        else:
                            self.commit_candidate_edge = dominant_edge
                            self.commit_streak = 1
                        if self.commit_streak >= self.commit_required_streak:
                            self.current_edge = dominant_edge
                    else:
                        self.commit_candidate_edge = None
                        self.commit_streak = 0
                    if self.current_edge not in corridor:
                        self.current_edge = dominant_edge
                    if is_ambiguous:
                        projected_current = target.copy()
                        projected_target = target.copy()
                        ambiguity_factor = 1.0
                    else:
                        projected_current = self.project_point_to_edge(self.current_pos, self.current_edge)
                        projected_target = self.project_point_to_edge(target, self.current_edge)
                        ambiguity_factor = 0.0
                    action_name = eng["action_name"]
                    if action_name == "COOPERATE":
                        base_obs_gain = 0.16
                        base_target_gain = 0.44
                        base_recovery_gain = 0.18
                    elif action_name == "HOLD":
                        base_obs_gain = 0.20
                        base_target_gain = 0.30
                        base_recovery_gain = 0.12
                    else:
                        base_obs_gain = 0.24
                        base_target_gain = 0.18
                        base_recovery_gain = 0.08
                    obs_gain = base_obs_gain - 0.08 * ambiguity_factor
                    target_gain = (
                        base_target_gain
                        + 0.18 * float(eng["policy_consensus"])
                        + 0.14 * float(eng["gov_stability"])
                        + 0.22 * ambiguity_factor
                    )
                    recovery_gain = (
                        base_recovery_gain
                        + 0.14 * float(eng["R"])
                        + 0.08 * float(eng["gov_stability"])
                        - 0.06 * float(eng["Dh"])
                    )
                    obs_gain = float(np.clip(obs_gain, 0.06, 0.28))
                    target_gain = float(np.clip(target_gain, 0.18, 0.92))
                    recovery_gain = float(np.clip(recovery_gain, 0.04, 0.40))
                    self.current_pos = (
                        self.current_pos
                        + obs_gain * (z - self.current_pos)
                        + target_gain * (projected_target - self.current_pos)
                        + recovery_gain * (projected_current - self.current_pos)
                    )
                    if bool(eng["safe_envelope"]):
                        self.current_pos = self.current_pos + 0.10 * (projected_current - self.current_pos)
                    self.delta_prev = delta
                    self.gamma_prev = gamma
                    self.corridor = corridor
                    self.history_edges.append(self.current_edge)
        return self.current_edge, self.current_pos.copy(), gamma, self.drift_load

Chapter 10. Dynamic Gain Modulation

The final navigation update is not fixed at one universal gain. Instead, it is modulated by the engine state. Observation gain, target gain, and recovery gain vary depending on action-state, governance state, ambiguity, and degradation.

This means the same model can behave:

  • sharply in stable conditions,
  • cautiously under ambiguity,
  • restoratively under drift,
  • conservatively under breach.

This dynamic gain structure is one of the reasons the model shows value in continuity-sensitive and degraded environments.

Dynamic Gain Modulation

NashMark AI does not apply a single fixed correction gain at every step. It modulates the navigation update according to current equilibrium state, ambiguity level, governance stability, recovery potential, and degradation load. The model therefore changes how strongly it responds to live observation, corridor target, and recovery pull as conditions change.

In operational terms, three gain classes are used:

  • observation gain, controlling how strongly the live observation pulls the current state;
  • target gain, controlling how strongly the retained corridor or projected route target pulls the state;
  • recovery gain, controlling how strongly the system restores itself toward coherent traversal.

These gains are conditioned first by action-state:

  • COOPERATE biases the system toward stronger target and recovery structure;
  • HOLD maintains balanced correction;
  • DEFECT increases raw observation pull while reducing structural commitment.

They are then adjusted by:

  • ambiguity state,
  • policy consensus,
  • governance stability,
  • recovery capacity $R$,
  • degradation load $D_h$.

A simplified form is:

$g_{\text{obs}} = g_{\text{obs}}^{(a)} - \alpha A$
$g_{\text{target}} = g_{\text{target}}^{(a)} + \beta C + \gamma G + \delta A$
$g_{\text{recovery}} = g_{\text{recovery}}^{(a)} + \mu R + \nu G - \rho D_h$

where:

  • $a$ is the current action-state,
  • $A$ is ambiguity factor,
  • $C$ is policy consensus,
  • $G$ is governance stability,
  • $R$ is recovery potential,
  • $D_h$ is degradation intensity.

The resulting position update takes the form:

$x_{t+1} = x_t + g_{\text{obs}}(z_t - x_t) + g_{\text{target}}(\hat{x}_t - x_t) + g_{\text{recovery}}(\tilde{x}_t - x_t)$

where:

  • $x_t$ is the current state,
  • $z_t$ is the live observation,
  • $\hat{x}_t$ is the projected corridor target,
  • $\tilde{x}_t$ is the projected recovery state.

This is one of the key reasons NashMark AI behaves differently from conventional recursive estimators. It does not apply one invariant correction law. It alters its correction structure according to whether the system is stable, pressured, ambiguous, or recovering.

Appendix Reference — Dynamic Gain Excerpt

action_name = eng["action_name"]
if action_name == "COOPERATE":
    base_obs_gain = 0.16
    base_target_gain = 0.44
    base_recovery_gain = 0.18
elif action_name == "HOLD":
    base_obs_gain = 0.20
    base_target_gain = 0.30
    base_recovery_gain = 0.12
else:
    base_obs_gain = 0.24
    base_target_gain = 0.18
    base_recovery_gain = 0.08

obs_gain = base_obs_gain - 0.08 * ambiguity_factor
target_gain = (
    base_target_gain
    + 0.18 * float(eng["policy_consensus"])
    + 0.14 * float(eng["gov_stability"])
    + 0.22 * ambiguity_factor
)
recovery_gain = (
    base_recovery_gain
    + 0.14 * float(eng["R"])
    + 0.08 * float(eng["gov_stability"])
    - 0.06 * float(eng["Dh"])
)

obs_gain = float(np.clip(obs_gain, 0.06, 0.28))
target_gain = float(np.clip(target_gain, 0.18, 0.92))
recovery_gain = float(np.clip(recovery_gain, 0.04, 0.40))

Chapter 11. Temporal Commit Gating

A critical refinement in the applied architecture is temporal commit gating. A single-step dominance spike should not be sufficient to force a branch switch. The system therefore requires persistence before hard commitment.

This is a hysteresis mechanism. It prevents branch flapping and reduces transient false commits.

Let $E_t$ be the current dominant candidate edge. Commitment occurs only when:

  • dominance conditions are satisfied,
  • governance conditions are satisfied,
  • safe-envelope conditions are satisfied,
  • and dominance persists across the required streak window.

This is not merely a coding convenience. It is an expression of equilibrium logic: state should be confirmed, not impulsively seized.

Temporal Commit Gating

NashMark AI does not permit branch commitment from a single transient dominance event alone. A route-state candidate may momentarily appear dominant because of observation noise, corridor compression, or temporary distortion. Immediate switching under such conditions would make the system overly reactive and structurally unstable. NashMark therefore applies temporal commit gating.

The purpose of temporal gating is to require that dominant route-state evidence persist across successive steps before hard commitment occurs. This introduces controlled hysteresis into the navigation model. It reduces branch flapping, suppresses impulsive edge switching, and forces route-state hardening to occur only when dominance is sufficiently stable over time.

Let $E_t$ denote the currently dominant edge candidate at step $t$. Let $S_t$ denote the persistence count for that candidate. Then commitment is not triggered merely by local dominance, but only when:

$E_t = E_{t-1} = \dots = E_{t-k+1}$

for a required streak length $k$, subject also to the model's governance and safety conditions.

In practical form, the commitment condition is:

$\text{Commit at } t \iff D_t \land C_t \land G_t \land \Sigma_t$

where:

  • $D_t$ denotes dominance validity,
  • $C_t$ denotes persistence across the required streak window,
  • $G_t$ denotes governance sufficiency,
  • $\Sigma_t$ denotes safe-envelope satisfaction.

This means the model asks four questions before hardening branch-state commitment:

  1. Is the candidate genuinely dominant?
  2. Has that dominance persisted?
  3. Is the system governable enough to commit?
  4. Is the system still inside the safe operating envelope?

Only when all of these are satisfied is the current edge updated.

This is important because NashMark AI is not intended to behave as an impulsive route classifier. It is intended to preserve continuity lawfully under uncertainty. Temporal commit gating therefore acts as the transition-discipline layer of the model.

The benchmark implication is clear. In ambiguity-sensitive settings, temporal gating reduces false branch commits and improves continuity, though it may introduce some transition lag when the correct branch change is delayed for confirmation. That is not necessarily a defect. It is often the cost of preventing premature structural collapse.

Appendix Reference — Temporal Commit Logic

dominance_ok = (
    not is_ambiguous
    and bf >= self.sentinel_theta
    and float(eng["policy_consensus"]) >= 0.45
    and float(eng["gov_stability"]) >= 0.45
    and bool(eng["safe_envelope"])
)
if dominance_ok:
    if self.commit_candidate_edge == dominant_edge:
        self.commit_streak += 1
    else:
        self.commit_candidate_edge = dominant_edge
        self.commit_streak = 1
    if self.commit_streak >= self.commit_required_streak:
        self.current_edge = dominant_edge
else:
    self.commit_candidate_edge = None
    self.commit_streak = 0
if self.current_edge not in corridor:
    self.current_edge = dominant_edge

Chapter 12. Label-Light Versus Label-Rooted Operation

The present benchmark suite is still relatively label-light. That matters.

Although the present benchmark does not rely on externally supervised route annotations, EcoMathDNAHMM: Ecological Topological Hidden Markov Model (NMAI Model 1A) functions as an intrinsic structural labelling system, supplying rooted state identity from within the NashMark AI architecture itself. In this sense, NashMark AI is not dependent on manual ground-truth route labels, yet it is not structurally label-empty. Its branch-state authority may arise endogenously through EcoMathDNAHMM as internal state labelling.

In a label-light system, NashMark carries more of the burden itself:

  • holding ambiguity,
  • delaying commitment,
  • restoring continuity,
  • governing drift.

In a label-rooted system, NashMark is expected to tighten further because branch-state authority is internal to the model state rather than weakly inferred from partial topology alone.

That distinction matters for interpretation. The present benchmark should therefore be understood as testing NashMark under relatively weak external label authority. Stronger rooted state authority is expected to improve branch-state commitment without removing the model's continuity advantages.

Chapter 13. Navigation in Label-Light and Signal-Degraded Environments

Some of the most difficult navigation environments are those in which signal is intermittent, delayed, structurally distorted, or entirely absent for extended intervals. Deep-sea traversal, subsea trench exploration, cavern systems, orbital blackout, deep-space transit, and other remote operating conditions all share the same core problem: the system cannot rely on a stable stream of rooted positional confirmation. In such environments, navigation becomes less a matter of immediate point-fix correction and more a matter of preserving lawful traversal through uncertainty.

This is the environment in which NashMark AI becomes most relevant. Conventional recursive filters are designed to estimate state under noise. They remain valuable, especially where the observation process is well behaved. However, when signal becomes sparse, delayed, reflected, biased, or temporarily absent, immediate recursive correction can become structurally dangerous. A model may react too quickly to false evidence, collapse onto a wrong local solution, or jitter between competing interpretations of the path. NashMark AI is designed to resist that collapse.

The core difference is that NashMark does not treat each new observation as an instruction that must be obeyed at once. It treats each observation as one contributor to an equilibrium-governed state process. If incoming evidence is weak, inconsistent, or ambiguous, the model does not have to force immediate coordinate certainty. It can retain a corridor of plausible traversal, preserve structured ambiguity, and defer hard commitment until the governing conditions of the system are sufficiently stable.

This is especially important in environments such as deep-sea and deep-space navigation. In deep water, acoustic positioning is slow, noisy, and vulnerable to multipath interference from seabed reflections, thermal layering, or confined geological structures. In deep-space or obscured orbital transit, communication lag, blackout, occultation, and sparse reference opportunities create long intervals in which direct confirmation is weak or absent. In both cases, the danger is not only loss of signal. The greater danger is false structural commitment under incomplete evidence.

NashMark AI addresses this through continuity-governed traversal. Rather than collapsing immediately to a single point estimate, it retains a corridor of likely states. Rather than accepting every dominance spike as a true branch change, it requires persistence, governance stability, and safe-envelope sufficiency before commitment. Rather than interpreting degradation as immediate failure, it models degradation and recovery explicitly through internal state curves and restoration logic. This allows the system to continue behaving coherently even when the external observation process is temporarily unreliable.

In practical terms, this means the model behaves less like a simple point tracker and more like a governed flow system. It preserves the continuity of traversal even when direct coordinate certainty is reduced. The system does not need to hallucinate a new route-state merely because one distorted observation has appeared. It can remain within a lawful band of possibility until evidence accumulates sufficiently for safe transition.

This is one reason the model performs strongly even in label-light benchmark conditions. The current benchmark is non-supervised in the conventional training sense, but not without internal state authority. EcoMathDNAHMM provides the rooted structural reference from which traversal identity is maintained, so the model's continuity logic is not operating in a vacuum. It is operating from endogenous structural labelling rather than externally imposed annotation. Where rooted branch-state authority is weak, NashMark is still able to preserve continuity through corridor retention, equilibrium refinement, governance stability, and restoration control. That does not mean rooted labels are irrelevant. On the contrary, stronger label authority is expected to tighten branch-state commitment further. But the benchmark already shows that the model retains meaningful value even before that rooted authority is supplied.

The governing principle here is mathematical skepticism. NashMark AI does not permit route-state transition simply because a local signal temporarily appears dominant. Dominance must persist. Governance must stabilise. Risk must remain within envelope. Continuity must remain structurally defensible. In this sense, the model imposes a disciplined doubt upon transition, and that doubt is one of its principal strengths in degraded environments.

For this reason, NashMark AI should be understood not as a replacement for all conventional tracking methods in every context, but as a navigation and recovery architecture particularly suited to ambiguity-heavy, blackout-prone, continuity-sensitive domains. Where navigation becomes a problem of preserving lawful traversal rather than merely updating a coordinate, NashMark AI offers its clearest architectural advantage.

Part III — Preliminary Simulation and Benchmark

Chapter 14. Benchmark Objective

The benchmark objective is not to prove universal navigation dominance. It is to test whether NashMark AI improves continuity-sensitive traversal under degraded, ambiguous, or drift-heavy conditions relative to conventional baselines.

The benchmark therefore asks:

  • how well does the model preserve continuity?
  • how well does it recover from drift?
  • how well does it avoid false commitment under ambiguity?
  • how well does it operate when labels are weak or observation is degraded?

Chapter 15. Baselines

Two baseline classes are used.

Kalman baseline

A standard constant-velocity recursive estimator is used as the conventional continuous baseline. This baseline is expected to perform strongly in clean labelled tracking and benign noise conditions.

HMM baseline

A hidden-state route or edge model is used as the discrete branch-state baseline. This model is expected to retain value in path-labelling tasks where rooted branch-state authority is present.

NashMark is not benchmarked as a straw man against weak models. It is benchmarked against two credible conventional baselines.

Chapter 16. Test Scenarios

The benchmark suite consists of five scenarios:

  1. Clean route
  2. Urban canyon
  3. GNSS dropout / tunnel
  4. Ambiguous junction
  5. Drift recovery

These represent progressively harder operating conditions and distinguish clean labelled estimation from degraded or ambiguity-sensitive continuity tasks.

from typing import Dict, List, Tuple
import numpy as np

def make_observations(
    truth: np.ndarray,
    scenario: str,
    rng: np.random.Generator
) -> Tuple[List[np.ndarray | None], int]:
    obs: List[np.ndarray | None] = []
    degrade_start = 0
    for t, p in enumerate(truth):
        z = p.copy()
        if scenario == "clean_route":
            z += rng.normal(0, 1.2, size=2)
        elif scenario == "urban_canyon":
            if 18 <= t <= 48:
                if t == 18:
                    degrade_start = t
                z += np.array([4.5, 2.0]) + rng.normal(0, 2.0, size=2)
            else:
                z += rng.normal(0, 1.8, size=2)
        elif scenario == "gnss_dropout_tunnel":
            if 24 <= t <= 42:
                if t == 24:
                    degrade_start = t
                obs.append(None)
                continue
            z += rng.normal(0, 1.5, size=2)
        elif scenario == "ambiguous_junction":
            if 30 <= t <= 45:
                if t == 30:
                    degrade_start = t
                z += np.array([0.0, 3.0]) + rng.normal(0, 2.6, size=2)
            else:
                z += rng.normal(0, 1.5, size=2)
        elif scenario == "drift_recovery":
            if 20 <= t <= 36:
                if t == 20:
                    degrade_start = t
                z += np.array([7.0, -5.0]) + rng.normal(0, 1.8, size=2)
            else:
                z += rng.normal(0, 1.5, size=2)
        else:
            z += rng.normal(0, 1.5, size=2)
        obs.append(z)
    return obs, degrade_start

SCENARIO_ROUTES: Dict[str, List[str]] = {
    "clean_route": ["AB", "BC", "CD", "DE"],
    "urban_canyon": ["AB", "BC", "CD", "DE"],
    "gnss_dropout_tunnel": ["AB", "BC", "CD", "DE"],
    "ambiguous_junction": ["AB", "BC", "CF", "FG"],
    "drift_recovery": ["AB", "BC", "CD", "DE"],
}

SCENARIO_AMBIGUOUS_EDGES: Dict[str, List[str]] = {
    "clean_route": ["CF", "FG"],
    "urban_canyon": ["CF", "FG"],
    "gnss_dropout_tunnel": ["CF", "FG"],
    "ambiguous_junction": ["CF", "FG", "CD", "DE"],
    "drift_recovery": ["CF", "FG"],
}

Chapter 17. Metrics

The metric suite includes:

  • mean position error,
  • maximum position error,
  • path accuracy,
  • false branch commit rate,
  • continuity score,
  • recovery time,
  • corridor width,
  • restoration efficiency.

A crucial distinction emerged during testing: some apparent branch "errors" were in fact delayed correct commitments rather than true wrong-branch collapse. This means path-labelling metrics must be interpreted carefully and not treated as identical to continuity failure.

Chapter 18. Benchmark Results

18.1 Clean Route

In clean labelled conditions, Kalman remains strongest on raw position sharpness. NashMark remains continuity-stable, but does not outperform Kalman in this regime. This is expected and does not weaken the model's actual benchmark domain.

18.2 Urban Canyon

In structured distortion conditions, NashMark becomes competitive and remains continuity-strong, but the current benchmark still shows Kalman slightly stronger on some raw error measures. This indicates that the present NashMark implementation is viable here, but not yet dominant.

18.3 GNSS Dropout / Tunnel

The unified canonical implementation substantially improved the previous dropout weakness. This confirms that probability-weighted stability updates and restoration logic are structurally meaningful in blackout conditions.

18.4 Ambiguous Junction

This is one of the clearest NashMark wins. In the stronger runs, NashMark outperformed the baselines on mean error and continuity while reducing false branch commit behaviour to zero in the tested run. This strongly supports the model's value in ambiguity-sensitive traversal.

18.5 Drift Recovery

This is the second strongest benchmark class. NashMark achieved best or near-best mean recovery behaviour and matched the best continuity and recovery timing in the stronger retained runs. This directly supports the restoration claim.

Benchmark Results by Scenario

Clean Route

ModelMean ErrorMax ErrorPath AccuracyFalse Branch RateContinuityRecovery TimeCorridor WidthRestoration Efficiency
Kalman1.03602.6192--1.00000.0--
HMM6.269216.25000.76920.00000.66154.0--
NashMark1.52916.02580.60000.00001.00000.01.83081.0000

Urban Canyon

ModelMean ErrorMax ErrorPath AccuracyFalse Branch RateContinuityRecovery TimeCorridor WidthRestoration Efficiency
Kalman2.93286.5278--1.00000.0--
HMM5.546713.75000.78460.00000.75382.0--
NashMark3.011810.22720.58460.00000.93850.01.83081.0000

GNSS Dropout / Tunnel

ModelMean ErrorMax ErrorPath AccuracyFalse Branch RateContinuityRecovery TimeCorridor WidthRestoration Efficiency
Kalman1.59805.2245--1.00000.0--
HMM7.846220.00000.69230.00000.61540.0--
NashMark2.56008.00000.67690.00000.98460.01.76921.0000

Ambiguous Junction

ModelMean ErrorMax ErrorPath AccuracyFalse Branch RateContinuityRecovery TimeCorridor WidthRestoration Efficiency
Kalman3.577311.7269--0.78460.0--
HMM5.730815.00000.84620.00000.72310.0--
NashMark3.075612.82780.80000.00000.87699.01.73850.5286

Drift Recovery

ModelMean ErrorMax ErrorPath AccuracyFalse Branch RateContinuityRecovery TimeCorridor WidthRestoration Efficiency
Kalman1.97036.5977--1.00000.0--
HMM5.884615.00000.80000.20000.70150.0--
NashMark1.83836.61660.74290.34291.00000.01.78461.0000

Across the benchmark suite, Kalman remained strongest in clean labelled tracking, while NashMark showed its clearest advantages in ambiguity-sensitive continuity and drift-recovery conditions. HMM retained value in discrete branch-state labelling but did not match NashMark on degraded continuity performance.

Chapter 19. Interpretation of Results

The results support a narrow but strong conclusion:

  • NashMark is not currently a universal winner across all navigation conditions.
  • Kalman remains strongest in clean labelled-route tracking.
  • HMM retains a niche advantage in some branch-state labelling situations.
  • NashMark demonstrates its clearest value in:
    • ambiguity-sensitive continuity,
    • drift recovery,
    • degraded observation,
    • label-light traversal.

This is already sufficient to establish NashMark as a viable navigation and recovery architecture.

Chapter 20. Product Interpretation

The benchmark does not merely support a paper. It supports a product direction.

The strongest commercial form is not necessarily "NashMark replaces everything." The stronger architecture is:

  • rooted label authority where available,
  • sharp continuous estimator where useful,
  • NashMark as equilibrium governor, ambiguity manager, drift restorer, and safe-envelope controller.

That means NashMark can function either:

  • as a standalone navigation logic in label-light space,
  • or as a higher-order governing layer around rooted labels and continuous tracking.

This is already product-grade architecture, even if further tuning remains.

Chapter 21. Disclosure Boundary

This paper does not require disclosure of the sovereign engine internals. It is sufficient to publish:

  • mathematical framing,
  • benchmark structure,
  • scenario logic,
  • metrics,
  • results,
  • reduced demonstration code.

It is not necessary to publish:

  • full internal refinement routines,
  • proprietary threshold schedules,
  • exact governance weighting strategies,
  • or production implementation details.

Reduced Public Demonstration Script

The public disclosure does not require release of the full canonical NashMark engine. A reduced benchmark shell is sufficient to demonstrate the architecture, benchmark logic, and comparative result structure without exposing the full sovereign control core.

The public demonstrator should include:

  • route-graph setup,
  • scenario loading,
  • Kalman baseline,
  • HMM baseline,
  • reduced NashMark demonstration model,
  • metric calculation,
  • JSON result output.

It should exclude:

  • full equilibrium refinement internals,
  • proprietary threshold scheduling,
  • production governance weighting,
  • internal optimisation routines,
  • protected restoration logic.

A reduced public shell may therefore be represented as follows.

from road_graph import RoadGraph
from baseline_kalman import BaselineKalman2D
from baseline_hmm_mapmatch import BaselineHMMMapMatch
from nashmark_demo import NashMarkDemo
from scenarios import make_observations, SCENARIO_ROUTES

def public_demo_run(scenario="ambiguous_junction", seed=104):
    graph = RoadGraph()
    route_edges = SCENARIO_ROUTES[scenario]
    kalman = BaselineKalman2D(dt=1.0, process_var=0.4, meas_var=5.0)
    hmm = BaselineHMMMapMatch(graph, sigma=7.0)
    nash = NashMarkDemo(graph, sigma=7.0)
    # Scenario generation, model stepping, and metric reporting
    # are handled in the reduced benchmark package.

This reduced demonstration shell is sufficient for public benchmarking and reproducibility of the comparative framework, while the full NashMark navigation engine and protected control internals remain withheld.

Chapter 22. Limitations

The benchmark remains synthetic. It is not yet:

  • a certified deployment system,
  • a hardware-integrated flight stack,
  • or a complete commercial navigation product.

The current label-light setup also understates the likely branch-state performance of a label-rooted NashMark implementation. Further work is therefore expected to improve:

  • rooted label integration,
  • urban-canyon peak spike control,
  • transition timing,
  • and deployment efficiency.

Chapter 2.3 Conclusion

NashMark AI is now established as more than a conceptual framework. It is a benchmarked navigation and recovery architecture with strongest current evidence in ambiguity-sensitive continuity and drift-recovery tasks. It performs strongly even in relatively label-light conditions, which supports the view that its equilibrium-governed traversal logic has genuine independent value.

The results do not support a blanket claim of superiority across all navigation problems. They support a more exact and more durable claim: NashMark AI is a viable equilibrium-governed navigation model whose strongest current benchmarked value lies in degraded, ambiguous, and continuity-sensitive environments, and whose branch-state performance is expected to strengthen further when rooted label authority is internal to the model state.

Appendix A — Benchmark Files

benchmark_runner.py

from pathlib import Path
from typing import Dict
import json
import numpy as np

from road_graph import RoadGraph
from baseline_kalman import BaselineKalman2D
from baseline_hmm_mapmatch import BaselineHMMMapMatch
from nashmark_nav import NashMarkNav
from scenarios import make_observations, SCENARIO_ROUTES, SCENARIO_AMBIGUOUS_EDGES
from metrics import (
    mean_position_error,
    max_position_error,
    path_accuracy,
    false_branch_commit_rate,
    continuity_score,
    recovery_time,
    corridor_width,
    restoration_efficiency,
)
from plot_results import plot_trajectories, plot_error_series

def run_benchmark(scenario: str, out_dir: str, seed: int = 42) -> Dict[str, Dict[str, float]]:
    rng = np.random.default_rng(seed)
    graph = RoadGraph()
    route_edges = SCENARIO_ROUTES[scenario]
    ambiguous_edges = SCENARIO_AMBIGUOUS_EDGES[scenario]
    truth_pos, truth_edges = graph.sample_route(route_edges, speed=1.2, dt=1.0)
    observations, degrade_start = make_observations(truth_pos, scenario, rng)

    kalman = BaselineKalman2D(dt=1.0, process_var=0.4, meas_var=5.0)
    hmm = BaselineHMMMapMatch(graph, sigma=7.0)
    nash = NashMarkNav(
        graph,
        sigma=7.0,
        corridor_tau=0.12,
        sentinel_theta=1.45,
        random_seed=seed,
    )

    pred_k, pred_h, pred_n = [], [], []
    edge_h, edge_n = [], []
    gamma_history, drift_history = [], []

    for z in observations:
        pk = kalman.step(z if z is None else np.asarray(z, dtype=float))
        pred_k.append(pk)
        eh, ph = hmm.step(None if z is None else np.asarray(z, dtype=float))
        pred_h.append(ph)
        edge_h.append(eh)
        en, pn, gamma, drift = nash.step(None if z is None else np.asarray(z, dtype=float))
        pred_n.append(pn)
        edge_n.append(en)
        gamma_history.append(gamma)
        drift_history.append(drift)

    pred_k = np.asarray(pred_k, dtype=float)
    pred_h = np.asarray(pred_h, dtype=float)
    pred_n = np.asarray(pred_n, dtype=float)

    while len(edge_h) < len(truth_edges):
        edge_h.append(edge_h[-1] if edge_h else "AB")
    while len(edge_n) < len(truth_edges):
        edge_n.append(edge_n[-1] if edge_n else "AB")

    results = {
        "kalman": {
            "mean_position_error": mean_position_error(truth_pos, pred_k),
            "max_position_error": max_position_error(truth_pos, pred_k),
            "continuity_score": continuity_score(truth_pos, pred_k),
            "recovery_time": float(recovery_time(truth_pos, pred_k, degrade_start)),
        },
        "hmm": {
            "mean_position_error": mean_position_error(truth_pos, pred_h),
            "max_position_error": max_position_error(truth_pos, pred_h),
            "path_accuracy": path_accuracy(truth_edges, edge_h),
            "false_branch_commit_rate": false_branch_commit_rate(truth_edges, edge_h, ambiguous_edges),
            "continuity_score": continuity_score(truth_pos, pred_h),
            "recovery_time": float(recovery_time(truth_pos, pred_h, degrade_start)),
        },
        "nashmark": {
            "mean_position_error": mean_position_error(truth_pos, pred_n),
            "max_position_error": max_position_error(truth_pos, pred_n),
            "path_accuracy": path_accuracy(truth_edges, edge_n),
            "false_branch_commit_rate": false_branch_commit_rate(truth_edges, edge_n, ambiguous_edges),
            "continuity_score": continuity_score(truth_pos, pred_n),
            "recovery_time": float(recovery_time(truth_pos, pred_n, degrade_start)),
            "corridor_width": corridor_width(gamma_history, tau=0.20),
            "restoration_efficiency": restoration_efficiency(drift_history),
        },
    }

    out_path = Path(out_dir)
    out_path.mkdir(parents=True, exist_ok=True)
    plot_trajectories(
        truth_pos,
        pred_k,
        pred_h,
        pred_n,
        title=f"{scenario.replace('_', ' ').title()}",
        out_path=str(out_path / "trajectories.png"),
    )
    plot_error_series(
        truth_pos,
        pred_k,
        pred_h,
        pred_n,
        title=f"{scenario.replace('_', ' ').title()} Errors",
        out_path=str(out_path / "errors.png"),
    )
    with open(out_path / "results.json", "w", encoding="utf-8") as f:
        json.dump(results, f, indent=2)
    print(json.dumps(results, indent=2))
    return results

metrics.py

from typing import Dict, List
import numpy as np

def mean_position_error(truth: np.ndarray, pred: np.ndarray) -> float:
    return float(np.mean(np.linalg.norm(truth - pred, axis=1)))

def max_position_error(truth: np.ndarray, pred: np.ndarray) -> float:
    return float(np.max(np.linalg.norm(truth - pred, axis=1)))

def path_accuracy(true_edges: List[str], pred_edges: List[str]) -> float:
    n = min(len(true_edges), len(pred_edges))
    if n == 0:
        return 0.0
    correct = sum(1 for i in range(n) if true_edges[i] == pred_edges[i])
    return float(correct / n)

def false_branch_commit_rate(true_edges: List[str], pred_edges: List[str], ambiguous_edges: List[str]) -> float:
    idxs = [i for i, e in enumerate(true_edges) if e in ambiguous_edges]
    if not idxs:
        return 0.0
    bad = sum(1 for i in idxs if pred_edges[i] != true_edges[i])
    return float(bad / len(idxs))

def continuity_score(truth: np.ndarray, pred: np.ndarray, threshold: float = 8.0) -> float:
    errs = np.linalg.norm(truth - pred, axis=1)
    return float(np.mean(errs <= threshold))

def recovery_time(
    truth: np.ndarray,
    pred: np.ndarray,
    degrade_start: int,
    recovery_threshold: float = 5.0
) -> int:
    errs = np.linalg.norm(truth - pred, axis=1)
    for i in range(degrade_start, len(errs)):
        if errs[i] <= recovery_threshold:
            return i - degrade_start
    return len(errs) - degrade_start

def corridor_width(gamma_history: List[Dict[str, float]], tau: float = 0.20) -> float:
    widths = []
    for gamma in gamma_history:
        widths.append(sum(1 for _, p in gamma.items() if p >= tau))
    return float(np.mean(widths)) if widths else 0.0

def restoration_efficiency(drift_history: List[float]) -> float:
    if not drift_history:
        return 0.0
    peak = max(drift_history)
    end = drift_history[-1]
    if peak <= 1e-9:
        return 1.0
    return float((peak - end) / peak)

scenarios.py

from typing import Dict, List, Tuple
import numpy as np

def make_observations(
    truth: np.ndarray,
    scenario: str,
    rng: np.random.Generator
) -> Tuple[List[np.ndarray | None], int]:
    obs: List[np.ndarray | None] = []
    degrade_start = 0
    for t, p in enumerate(truth):
        z = p.copy()
        if scenario == "clean_route":
            z += rng.normal(0, 1.2, size=2)
        elif scenario == "urban_canyon":
            if 18 <= t <= 48:
                if t == 18:
                    degrade_start = t
                z += np.array([4.5, 2.0]) + rng.normal(0, 2.0, size=2)
            else:
                z += rng.normal(0, 1.8, size=2)
        elif scenario == "gnss_dropout_tunnel":
            if 24 <= t <= 42:
                if t == 24:
                    degrade_start = t
                obs.append(None)
                continue
            z += rng.normal(0, 1.5, size=2)
        elif scenario == "ambiguous_junction":
            if 30 <= t <= 45:
                if t == 30:
                    degrade_start = t
                z += np.array([0.0, 3.0]) + rng.normal(0, 2.6, size=2)
            else:
                z += rng.normal(0, 1.5, size=2)
        elif scenario == "drift_recovery":
            if 20 <= t <= 36:
                if t == 20:
                    degrade_start = t
                z += np.array([7.0, -5.0]) + rng.normal(0, 1.8, size=2)
            else:
                z += rng.normal(0, 1.5, size=2)
        else:
            z += rng.normal(0, 1.5, size=2)
        obs.append(z)
    return obs, degrade_start

SCENARIO_ROUTES: Dict[str, List[str]] = {
    "clean_route": ["AB", "BC", "CD", "DE"],
    "urban_canyon": ["AB", "BC", "CD", "DE"],
    "gnss_dropout_tunnel": ["AB", "BC", "CD", "DE"],
    "ambiguous_junction": ["AB", "BC", "CF", "FG"],
    "drift_recovery": ["AB", "BC", "CD", "DE"],
}

SCENARIO_AMBIGUOUS_EDGES: Dict[str, List[str]] = {
    "clean_route": ["CF", "FG"],
    "urban_canyon": ["CF", "FG"],
    "gnss_dropout_tunnel": ["CF", "FG"],
    "ambiguous_junction": ["CF", "FG", "CD", "DE"],
    "drift_recovery": ["CF", "FG"],
}

nashmark_demo.py

import math
from typing import Dict, List, Tuple
import numpy as np
from road_graph import RoadGraph, point_to_segment_distance

class NashMarkDemo:
    """
    Public-shell NashMark navigation demonstrator.
    This is deliberately a reduced benchmark-facing version:
    - latent path inference
    - corridor retention
    - ambiguity-aware commitment
    - safe-envelope style gating
    - restoration dynamics
    It is not the full proprietary NashMark core.
    """
    def __init__(
        self,
        graph: RoadGraph,
        sigma: float = 7.0,
        corridor_tau: float = 0.05,
        sentinel_theta: float = 1.55,
        restoration_gain: float = 0.62,
    ):
        self.graph = graph
        self.sigma = sigma
        self.corridor_tau = corridor_tau
        self.sentinel_theta = sentinel_theta
        self.restoration_gain = restoration_gain
        self.delta_prev: Dict[str, float] | None = None
        self.gamma_prev: Dict[str, float] | None = None
        self.current_edge: str = "AB"
        self.current_pos: np.ndarray = self.graph.edge_midpoint("AB")
        self.corridor: List[str] = ["AB"]
        self.history_edges: List[str] = []
        self.t = 0
        self.coop = 1.0
        self.defect = 1.0
        self.mss = 0.5
        self.gov_stability = 0.5
        self.systemic_risk = 0.3
        self.safe_envelope = True
        self.drift_load = 0.0
        self.R = 0.0
        self.Dh = 1.0

    # ------------------------------------------------------------------
    # Core probabilistic helpers
    # ------------------------------------------------------------------
    def emission_logprob(self, z: np.ndarray, edge_id: str) -> float:
        edge = self.graph.edges[edge_id]
        d = point_to_segment_distance(
            z,
            np.array(edge.points[0], dtype=float),
            np.array(edge.points[1], dtype=float),
        )
        return -0.5 * (d / self.sigma) ** 2

    def transition_logprob(self, prev_edge: str, curr_edge: str) -> float:
        if prev_edge == curr_edge:
            return math.log(0.52)
        if curr_edge in self.graph.adjacency.get(prev_edge, []):
            return math.log(0.36)
        return math.log(0.12)

    def normalize_log_probs(self, log_probs: Dict[str, float]) -> Dict[str, float]:
        vals = np.array(list(log_probs.values()), dtype=float)
        m = float(np.max(vals))
        exps = np.exp(vals - m)
        denom = float(np.sum(exps))
        keys = list(log_probs.keys())
        return {k: float(exps[i] / denom) for i, k in enumerate(keys)}

    def project_point_to_edge(self, point: np.ndarray, edge_id: str) -> np.ndarray:
        edge = self.graph.edges[edge_id]
        a = np.array(edge.points[0], dtype=float)
        b = np.array(edge.points[1], dtype=float)
        ab = b - a
        denom = float(np.dot(ab, ab))
        if denom <= 1e-12:
            return a.copy()
        t = float(np.dot(point - a, ab) / denom)
        t = max(0.0, min(1.0, t))
        return a + t * ab

    def corridor_target(self, point: np.ndarray, corridor: List[str], gamma: Dict[str, float]) -> np.ndarray:
        pts = []
        ws = []
        for edge_id in corridor:
            proj = self.project_point_to_edge(point, edge_id)
            pts.append(proj)
            ws.append(max(gamma.get(edge_id, 0.0), 1e-6))
        pts_arr = np.stack(pts, axis=0)
        ws_arr = np.array(ws, dtype=float)
        ws_arr = ws_arr / np.sum(ws_arr)
        return np.sum(pts_arr * ws_arr[:, None], axis=0)

    # ------------------------------------------------------------------
    # Reduced public-shell dynamics
    # ------------------------------------------------------------------
    def update_public_dynamics(self, dominant_in_corridor: bool, pressure: float, corridor_size: int) -> None:
        self.t += 1
        if dominant_in_corridor:
            self.coop += 1.0
        else:
            self.defect += 1.0
        self.mss = self.coop / (self.coop + self.defect)
        # public-shell recovery / degradation curves
        self.R = float(np.clip(1.0 - math.exp(-self.t / 200.0) - 0.15 * pressure, 0.0, 1.0))
        self.Dh = float(np.clip(math.exp(-self.t / 350.0) + 0.25 * pressure, 0.0, 1.0))
        # governance / risk proxies
        corridor_penalty = min(1.0, max(0.0, (corridor_size - 1) / 3.0))
        self.gov_stability = float(np.clip(0.55 * self.mss + 0.30 * (1.0 - pressure) - 0.15 * corridor_penalty, 0.0, 1.0))
        self.systemic_risk = float(np.clip(0.65 * pressure + 0.35 * corridor_penalty, 0.0, 1.0))
        self.safe_envelope = (self.mss >= 0.70) and (self.systemic_risk <= 0.35)

    # ------------------------------------------------------------------
    # Step
    # ------------------------------------------------------------------
    def step(self, z: np.ndarray | None) -> Tuple[str, np.ndarray, Dict[str, float], float]:
        if z is None:
            gamma = self.gamma_prev or {self.current_edge: 1.0}
            corridor = self.corridor or [self.current_edge]
            target = self.corridor_target(self.current_pos, corridor, gamma)
            pressure = min(1.0, 0.25 + self.drift_load / 12.0)
            self.update_public_dynamics(self.current_edge in corridor, pressure, len(corridor))
            gain = float(np.clip(self.restoration_gain + 0.20 * self.R - 0.12 * self.Dh + 0.08 * self.gov_stability, 0.10, 0.90))
            self.current_pos = self.current_pos + gain * (target - self.current_pos)
            self.drift_load = max(
                0.0,
                0.80 * self.drift_load + self.Dh + pressure + self.systemic_risk - self.R - self.mss - self.gov_stability
            )
            self.history_edges.append(self.current_edge)
            return self.current_edge, self.current_pos.copy(), gamma, self.drift_load

        z = np.asarray(z, dtype=float)
        candidates = self.graph.candidate_edges(z)
        if self.delta_prev is None:
            delta = {e: self.emission_logprob(z, e) for e in candidates}
        else:
            delta = {}
            for curr in candidates:
                best = -1e18
                for prev, prev_score in self.delta_prev.items():
                    score = prev_score + self.transition_logprob(prev, curr) + self.emission_logprob(z, curr)
                    if score > best:
                        best = score
                delta[curr] = best
        gamma = self.normalize_log_probs(delta)
        corridor = [e for e, p in gamma.items() if p >= self.corridor_tau]
        if not corridor:
            corridor = [max(gamma.items(), key=lambda kv: kv[1])[0]]
        dominant_edge = max(gamma.items(), key=lambda kv: kv[1])[0]
        sorted_probs = sorted(gamma.values(), reverse=True)
        bf = float("inf") if len(sorted_probs) == 1 else float(sorted_probs[0] / max(sorted_probs[1], 1e-9))
        is_ambiguous = (bf < self.sentinel_theta) or (len(corridor) > 1)
        target = self.corridor_target(z, corridor, gamma)
        obs_drift = float(np.linalg.norm(z - target))
        pressure = obs_drift / (obs_drift + 5.0)
        self.update_public_dynamics(dominant_edge in corridor, pressure, len(corridor))
        self.drift_load = max(
            0.0,
            0.68 * self.drift_load + self.Dh + pressure + self.systemic_risk - self.R - self.mss - 0.85 * self.gov_stability
        )
        if (
            (not is_ambiguous)
            and bf >= self.sentinel_theta
            and self.gov_stability >= 0.45
            and self.safe_envelope
        ):
            self.current_edge = dominant_edge
        elif self.current_edge not in corridor:
            self.current_edge = dominant_edge

        if is_ambiguous:
            projected_current = target.copy()
            projected_target = target.copy()
            ambiguity_factor = 1.0
        else:
            projected_current = self.project_point_to_edge(self.current_pos, self.current_edge)
            projected_target = self.project_point_to_edge(target, self.current_edge)
            ambiguity_factor = 0.0

        obs_gain = float(np.clip(0.18 - 0.08 * ambiguity_factor, 0.06, 0.24))
        target_gain = float(np.clip(0.22 + 0.42 * ambiguity_factor + 0.20 * self.gov_stability, 0.20, 0.92))
        recovery_gain = float(np.clip(0.08 + 0.12 * self.R + 0.08 * self.gov_stability - 0.05 * self.Dh, 0.04, 0.35))

        self.current_pos = (
            self.current_pos
            + obs_gain * (z - self.current_pos)
            + target_gain * (projected_target - self.current_pos)
            + recovery_gain * (projected_current - self.current_pos)
        )
        if self.safe_envelope:
            self.current_pos = self.current_pos + 0.12 * (projected_current - self.current_pos)
        self.delta_prev = delta
        self.gamma_prev = gamma
        self.corridor = corridor
        self.history_edges.append(self.current_edge)
        return self.current_edge, self.current_pos.copy(), gamma, self.drift_load

Appendix B — Frozen Result Tables

Clean Route

ModelMean ErrorMax ErrorPath AccuracyFalse Branch RateContinuityRecovery TimeCorridor WidthRestoration Efficiency
Kalman1.03602.6192--1.00000.0--
HMM6.269216.25000.76920.00000.66154.0--
NashMark1.52916.02580.60000.00001.00000.01.83081.0000

Urban Canyon

ModelMean ErrorMax ErrorPath AccuracyFalse Branch RateContinuityRecovery TimeCorridor WidthRestoration Efficiency
Kalman2.93286.5278--1.00000.0--
HMM5.546713.75000.78460.00000.75382.0--
NashMark3.011810.22720.58460.00000.93850.01.83081.0000

GNSS Dropout / Tunnel

ModelMean ErrorMax ErrorPath AccuracyFalse Branch RateContinuityRecovery TimeCorridor WidthRestoration Efficiency
Kalman1.59805.2245--1.00000.0--
HMM7.846220.00000.69230.00000.61540.0--
NashMark2.56008.00000.67690.00000.98460.01.76921.0000

Ambiguous Junction

ModelMean ErrorMax ErrorPath AccuracyFalse Branch RateContinuityRecovery TimeCorridor WidthRestoration Efficiency
Kalman3.577311.7269--0.78460.0--
HMM5.730815.00000.84620.00000.72310.0--
NashMark3.075612.82780.80000.00000.87699.01.73850.5286

Drift Recovery

ModelMean ErrorMax ErrorPath AccuracyFalse Branch RateContinuityRecovery TimeCorridor WidthRestoration Efficiency
Kalman1.97036.5977--1.00000.0--
HMM5.884615.00000.80000.20000.70150.0--
NashMark1.83836.61660.74290.34291.00000.01.78461.0000