@dataclass clase AgentConfig: horizonte: int = 6 replan_on_target_move: bool = Verdadero replan_on_obstacle_change: bool = Verdadero max_steps: int = 120 think_latency: float = 0.02 act_latency: float = 0.01 Risk_gate: float = 0.85 alt_search_ Depth: int = 2 @dataclass clase StreamingDecisionAgent: cfg: AgentConfig mundo: DynamicGridWorld start_time: float = campo(init=False, default_factory=time.time) step_id: int = campo(init=False, default=0) current_plan: Lista[Coord] = campo(init=False, default_factory=lista) acciones_actuales: Lista[str] = campo(init=False, default_factory=lista) última_instantánea: Dict[str, Any] = campo(init=False, default_factory=dict) estadísticas: Dict[str, Any] = campo(init=False, default_factory=lambda: defaultdict(int)) def _now(self) -> float: return time.time() – self.start_time def _emit(self, kind: str, msg: str, data: Opcional[Dict[str, Any]]= Ninguno) -> StreamEvent: return StreamEvent(t=self._now(), kind=kind, step=self.step_id, msg=msg, data=data o {}) def _need_replan(self, obs: Dict[str, Any]) -> bool: ch = obs[“changes”]
si obs[“done”]: devuelve False si no es self.current_plan o len(self.current_plan) <= 1: devuelve True si self.cfg.replan_on_target_move y ch.get("target_moved"): devuelve True si self.cfg.replan_on_obstacle_change y (ch.get("obstacles_added") o ch.get("obstacles_cleared")): devuelve True si len(self.current_plan) > 1 y self.current_plan[1] en self.world.obstacles: return True return False def _plan(self) -> PlanResult: time.sleep(self.cfg.think_latency) self.stats[“replans”] += 1 return astar(self.world, self.world.agent, self.world.target) def _choose_action(self, planeed_action: str) -> Tupla[str, str]: ax, ay = self.world.agent action_to_delta = {“R”: (1,0), “L”: (-1,0), “D”: (0,1), “U”: (0,-1), “S”: (0,0)} dx, dy = action_to_delta[planned_action]
nxt = (ax+dx, ay+dy) si no es self.world.in_bounds(nxt) o no self.world.passable(nxt): self.stats[“overrides”] += 1 devuelve “S”, “planned_move_invalid -> espera”. r = action_risk(self.world, nxt) si r > self.cfg.risk_gate: candidatos = [“U”,”D”,”L”,”R”,”S”]
best = (planned_action, float(“inf”), “keep_plan”) para a in candidatos: dx, dy = action_to_delta[a]
p = (ax+dx, ay+dy) si no es self.world.in_bounds(p) o no self.world.passable(p): continuar puntuación = action_risk(self.world, p) + 0,05 * self.world.manhattan(p, self.world.target) si puntuación < mejor[1]: mejor = (a, puntuación, "risk_avoidance_override") si es mejor[0] != acción_planeada: self.stats["overrides"] += 1 retorno mejor[0]mejor[2] devolver acción_planeada, "follow_plan" def ejecutar(self) -> Generador[StreamEvent, None, None]: rendimiento self._emit(“observe”, “Inicializar: lectura del estado inicial.”, {“agent”: self.world.agent, “target”: self.world.target}) rendimiento self._emit(“world”, “Instantánea mundial inicial.”, {“grid”: self.world.render()}) para self.step_id en el rango (1, self.cfg.max_steps + 1): si self.step_id == 1 o self._need_replan(self.last_snapshot): pr = self._plan() self.current_plan = pr.path self.current_actions = path_to_actions(pr.path) if pr.reason != “found_path”: rendimiento self._emit(“plan”, “El planificador no pudo encontrar una ruta dentro del presupuesto; cambiando a exploración reactiva.”, {“reason”: pr.reason, “expanded”: pr.expanded}) self.current_actions = []
más: horizonte_path = pr.path[: max(2, min(len(pr.path), self.cfg.horizon + 1))]
rendimiento self._emit(“plan”, f”Plan actualizado (en línea A*). Comprometerse a los siguientes {len(horizon_path)-1} movimientos, luego volver a evaluar.”, {“reason”: pr.reason, “path_len”: len(pr.path), “expanded”: pr.expanded, “commit_horizon”: self.cfg.horizon, “horizon_path”: horizonte_path, “grid_with_path”: self.world.render(ruta=ruta_horizonte)}) si self.current_actions: plane_action = self.current_actions[0]
más: ax, ay = self.world.agent tx, ty = self.world.target opciones = []
si tx > ax: opciones.append(“R”) si tx < ax: opciones.append("L") si ty > ay: opciones.append(“D”) si ty < ay: opciones.append("U") opciones += ["S","U","D","L","R"] acción_planeada = opciones[0] acción, por qué = self._choose_action(acción_planificada) rendimiento self._emit("decidir", f"Decisión intermedia: acción={acción} ({por qué}).", {"acción_planificada": acción_planificada, "acción_elegida": acción, "agente": self.world.agent, "target": self.world.target}) time.sleep(self.cfg.act_latency) obs = self.world.step(action) self.last_snapshot = obs si self.current_actions: si acción == acción_planificada: self.current_actions = self.current_actions[1:] si len(self.current_plan) > 1: self.current_plan = self.current_plan[1:]
ch =obs[“changes”]
sorpresa = []
if ch.get(“target_moved”): sorpresa.append(“target_moved”) if ch.get(“obstacles_added”): sorpresa.append(f”obstacles_added={len(ch[‘obstacles_added’])}”) if ch.get(“obstacles_cleared”): sorpresa.append(f”obstacles_cleared={len(ch[‘obstacles_cleared’])}”) sorpresa_msg = (“Sorpresas: ” + “, “.join(sorpresa)) if sorpresa else “Sin grandes sorpresas.” self.stats[“steps”] += 1 si obs[“moved”]: estadísticas propias[“moves”] += 1 si ch.get(“target_moved”): self.stats[“target_moves”] += 1 si ch.get(“obstacles_added”) o ch.get(“obstacles_cleared”): self.stats[“world_shifts”] += 1 rendimiento self._emit(“observe”, f”Resultado observado. {surprise_msg}”, {“moved”: obs[“moved”]”agente”: obs[“agent”]”objetivo”: obs[“target”]”hecho”: obs[“done”]”cambios”: ch, “grid”: self.world.render(ruta=self.current_plan[: min(len(self.current_plan), 10)])}) si obs[“done”]: rendimiento self._emit(“hecho”, “Objetivo alcanzado. Deteniendo la ejecución.”, {“final_agent”: obs[“agent”]”objetivo_final”: obs[“target”]”stats”: dict(self.stats)}) return rendimiento self._emit(“done”, “Pasos máximos alcanzados sin alcanzar la meta.”, {“final_agent”: self.world.agent, “final_target”: self.world.target, “stats”: dict(self.stats)})
si obs[“done”]: devuelve False si no es self.current_plan o len(self.current_plan) <= 1: devuelve True si self.cfg.replan_on_target_move y ch.get("target_moved"): devuelve True si self.cfg.replan_on_obstacle_change y (ch.get("obstacles_added") o ch.get("obstacles_cleared")): devuelve True si len(self.current_plan) > 1 y self.current_plan[1] en self.world.obstacles: return True return False def _plan(self) -> PlanResult: time.sleep(self.cfg.think_latency) self.stats[“replans”] += 1 return astar(self.world, self.world.agent, self.world.target) def _choose_action(self, planeed_action: str) -> Tupla[str, str]: ax, ay = self.world.agent action_to_delta = {“R”: (1,0), “L”: (-1,0), “D”: (0,1), “U”: (0,-1), “S”: (0,0)} dx, dy = action_to_delta[planned_action]
nxt = (ax+dx, ay+dy) si no es self.world.in_bounds(nxt) o no self.world.passable(nxt): self.stats[“overrides”] += 1 devuelve “S”, “planned_move_invalid -> espera”. r = action_risk(self.world, nxt) si r > self.cfg.risk_gate: candidatos = [“U”,”D”,”L”,”R”,”S”]
best = (planned_action, float(“inf”), “keep_plan”) para a in candidatos: dx, dy = action_to_delta[a]
p = (ax+dx, ay+dy) si no es self.world.in_bounds(p) o no self.world.passable(p): continuar puntuación = action_risk(self.world, p) + 0,05 * self.world.manhattan(p, self.world.target) si puntuación < mejor[1]: mejor = (a, puntuación, "risk_avoidance_override") si es mejor[0] != acción_planeada: self.stats["overrides"] += 1 retorno mejor[0]mejor[2] devolver acción_planeada, "follow_plan" def ejecutar(self) -> Generador[StreamEvent, None, None]: rendimiento self._emit(“observe”, “Inicializar: lectura del estado inicial.”, {“agent”: self.world.agent, “target”: self.world.target}) rendimiento self._emit(“world”, “Instantánea mundial inicial.”, {“grid”: self.world.render()}) para self.step_id en el rango (1, self.cfg.max_steps + 1): si self.step_id == 1 o self._need_replan(self.last_snapshot): pr = self._plan() self.current_plan = pr.path self.current_actions = path_to_actions(pr.path) if pr.reason != “found_path”: rendimiento self._emit(“plan”, “El planificador no pudo encontrar una ruta dentro del presupuesto; cambiando a exploración reactiva.”, {“reason”: pr.reason, “expanded”: pr.expanded}) self.current_actions = []
más: horizonte_path = pr.path[: max(2, min(len(pr.path), self.cfg.horizon + 1))]
rendimiento self._emit(“plan”, f”Plan actualizado (en línea A*). Comprometerse a los siguientes {len(horizon_path)-1} movimientos, luego volver a evaluar.”, {“reason”: pr.reason, “path_len”: len(pr.path), “expanded”: pr.expanded, “commit_horizon”: self.cfg.horizon, “horizon_path”: horizonte_path, “grid_with_path”: self.world.render(ruta=ruta_horizonte)}) si self.current_actions: plane_action = self.current_actions[0]
más: ax, ay = self.world.agent tx, ty = self.world.target opciones = []
si tx > ax: opciones.append(“R”) si tx < ax: opciones.append("L") si ty > ay: opciones.append(“D”) si ty < ay: opciones.append("U") opciones += ["S","U","D","L","R"] acción_planeada = opciones[0] acción, por qué = self._choose_action(acción_planificada) rendimiento self._emit("decidir", f"Decisión intermedia: acción={acción} ({por qué}).", {"acción_planificada": acción_planificada, "acción_elegida": acción, "agente": self.world.agent, "target": self.world.target}) time.sleep(self.cfg.act_latency) obs = self.world.step(action) self.last_snapshot = obs si self.current_actions: si acción == acción_planificada: self.current_actions = self.current_actions[1:] si len(self.current_plan) > 1: self.current_plan = self.current_plan[1:]
ch =obs[“changes”]
sorpresa = []
if ch.get(“target_moved”): sorpresa.append(“target_moved”) if ch.get(“obstacles_added”): sorpresa.append(f”obstacles_added={len(ch[‘obstacles_added’])}”) if ch.get(“obstacles_cleared”): sorpresa.append(f”obstacles_cleared={len(ch[‘obstacles_cleared’])}”) sorpresa_msg = (“Sorpresas: ” + “, “.join(sorpresa)) if sorpresa else “Sin grandes sorpresas.” self.stats[“steps”] += 1 si obs[“moved”]: estadísticas propias[“moves”] += 1 si ch.get(“target_moved”): self.stats[“target_moves”] += 1 si ch.get(“obstacles_added”) o ch.get(“obstacles_cleared”): self.stats[“world_shifts”] += 1 rendimiento self._emit(“observe”, f”Resultado observado. {surprise_msg}”, {“moved”: obs[“moved”]”agente”: obs[“agent”]”objetivo”: obs[“target”]”hecho”: obs[“done”]”cambios”: ch, “grid”: self.world.render(ruta=self.current_plan[: min(len(self.current_plan), 10)])}) si obs[“done”]: rendimiento self._emit(“hecho”, “Objetivo alcanzado. Deteniendo la ejecución.”, {“final_agent”: obs[“agent”]”objetivo_final”: obs[“target”]”stats”: dict(self.stats)}) return rendimiento self._emit(“done”, “Pasos máximos alcanzados sin alcanzar la meta.”, {“final_agent”: self.world.agent, “final_target”: self.world.target, “stats”: dict(self.stats)})