Cómo construir un agente de autorización previa autónomo y seguro para la gestión del ciclo de ingresos de atención médica con controles humanos integrados
def _now_iso() -> str: retorno datetime.utcnow().replace(microsegundo=0).isoformat() + “Z” def _stable_id(prefijo: str, semilla: str) -> str: h = hashlib.sha256(seed.encode(“utf-8″)).hexdigest()[:10]
return f”{prefix}_{h}” clase MockEHR: def __init__(self): self.orders_queue: Lista[SurgeryOrder] = []
self.patient_docs: dictado[str, List[ClinicalDocument]]= {} def seed_data(self, n_orders: int = 5): random.seed(7) def make_patient(i: int) -> Paciente: pid = f”PT{i:04d}” plan = random.choice(list(InsurancePlan)) return Paciente( paciente_id=pid, nombre=f”Paciente {i}”, dob=”1980-01-01″, member_id=f”M{i:08d}”, plan=plan, ) def docs_for_order(paciente: Paciente, cirugía: Tipo de cirugía) -> Lista[ClinicalDocument]: base = [
ClinicalDocument(
doc_id=_stable_id(“DOC”, patient.patient_id + “H&P”),
doc_type=DocType.H_AND_P,
created_at=_now_iso(),
content=”H&P: Relevant history, exam findings, and surgical indication.”,
source=”EHR”,
),
ClinicalDocument(
doc_id=_stable_id(“DOC”, patient.patient_id + “NOTE”),
doc_type=DocType.CLINICAL_NOTE,
created_at=_now_iso(),
content=”Clinical note: Symptoms, conservative management attempted, clinician assessment.”,
source=”EHR”,
),
ClinicalDocument(
doc_id=_stable_id(“DOC”, patient.patient_id + “MEDS”),
doc_type=DocType.MED_LIST,
created_at=_now_iso(),
content=”Medication list: Current meds, allergies, contraindications.”,
source=”EHR”,
),
]

tal vez = []
si la cirugía en [SurgeryType.KNEE_ARTHROPLASTY, SurgeryType.SPINE_FUSION, SurgeryType.BARIATRIC]: may.append( ClinicalDocument( doc_id=_stable_id(“DOC”, paciente.paciente_id + “LABS”), doc_type=DocType.LABS, creado_at=_now_iso(), content=”Labs: CBC/CMP dentro de los últimos 30 días.”, source=”LabSystem”, ) ) si la cirugía se realiza en [SurgeryType.SPINE_FUSION, SurgeryType.KNEE_ARTHROPLASTY]: may.append( ClinicalDocument( doc_id=_stable_id(“DOC”, paciente.paciente_id + “IMG”), doc_type=DocType.IMAGING, creado_at=_now_iso(), content=”Imágenes: informe de resonancia magnética/rayos X que respalda el diagnóstico y la gravedad.”, fuente=”Radiología”, ) ) final = base + [d for d in maybe if random.random() > 0.35]

if random.random() > 0.6: final.append( ClinicalDocument( doc_id=_stable_id(“DOC”, paciente.patient_id + “PRIOR_TX”), doc_type=DocType.PRIOR_TX, creado_at=_now_iso(), content=”Tratamientos previos: PT, medicamentos, inyecciones probadas durante más de 6 semanas.”, fuente=”EHR”, ) ) if random.random() > 0.5: final.append( ClinicalDocument( doc_id=_stable_id(“DOC”, paciente.patient_id + “CONSENT”), doc_type=DocType.CONSENT, creado_at=_now_iso(), content=”Consentimiento: consentimiento de procedimiento firmado y divulgación de riesgos.”, fuente=”EHR”, ) ) devuelve final para i en rango(1, n_orders + 1): paciente = make_patient(i) cirugía = random.choice(list(SurgeryType)) orden = SurgeryOrder( order_id=_stable_id(“ORD”, paciente.paciente_id + cirugía.valor), paciente=paciente, tipo_cirugía=cirugía, fecha_programada=(datetime.utcnow().date() + timedelta(days=random.randint(3, 21))).isoformat(), ordering_provider_npi=str(random.randint(1000000000, 1999999999)), diagnostic_codes=[“M17.11”, “M54.5”] si cirugía! = Tipo de Cirugía.CATARACT más [“H25.9”]creado_at=_now_iso(), ) self.orders_queue.append(orden) self.patient_docs[patient.patient_id] = docs_for_order(paciente, cirugía) def poll_new_surgery_orders(self, max_n: int = 1) -> Lista[SurgeryOrder]: tirado = self.orders_queue[:max_n]
self.orders_queue = self.orders_queue[max_n:]
return extraído def get_patient_documents(self,patient_id: str) -> Lista[ClinicalDocument]: lista de retorno(self.patient_docs.get(patient_id, [])) def fetch_additional_docs(self, paciente_id: str, necesario: Lista[DocType]) -> Lista[ClinicalDocument]: generado = []
para dt es necesario: generate.append( ClinicalDocument( doc_id=_stable_id(“DOC”, id_paciente + dt.value + str(time.time())), doc_type=dt, creado_at=_now_iso(), content=f”Documento recopilado automáticamente para {dt.value}: extraído y formateado según la política del pagador.”, source=”AutoCollector”, ) ) self.patient_docs.setdefault(id_paciente, []).extend(generado) devuelve la clase generada MockPayerPortal: def __init__(self): self.db: Dict[str, Dict[str, Any]]= {} random.seed(11) def require_docs_policy(self, plan: InsurancePlan, cirugía: SurgeryType) -> Lista[DocType]: base = [DocType.H_AND_P, DocType.CLINICAL_NOTE, DocType.MED_LIST]
si la cirugía en [SurgeryType.SPINE_FUSION, SurgeryType.KNEE_ARTHROPLASTY]: base += [DocType.IMAGING, DocType.LABS, DocType.PRIOR_TX]
si cirugia == TipoCirugía.BARIATRIC: base += [DocType.LABS, DocType.PRIOR_TX]
si planifica en [InsurancePlan.PAYER_BETA, InsurancePlan.PAYER_GAMMA]: base += [DocType.CONSENT]
return ordenado(lista(conjunto(base)), clave=lambda x: x.value) def enviar(self, pa: PriorAuthRequest) -> PayerResponse: payer_ref = _stable_id(“PAYREF”, pa.request_id + _now_iso()) docs_present = {d.doc_type for d en pa.docs_attached} requerido = self.required_docs_policy(pa.order.patient.plan, pa.order.surgery_type) falta = [d for d in required if d not in docs_present]

auto.db[payer_ref] = { “status”: AuthStatus.SUBMITTED, “order_id”: pa.order.order_id, “plan”: pa.order.patient.plan, “surgery”: pa.order.surgery_type, “missing”: desaparecido, “encuestas”: 0, “submitted_at”: _now_iso(), “denial_reason”: Ninguno, } msg = “Envío recibido. Caso en cola para revisión”. si falta: msg += “La validación inicial indica documentación incompleta”. return PayerResponse(status=AuthStatus.SUBMITTED, payer_ref=payer_ref, message=msg) def check_status(self, payer_ref: str) -> PayerResponse: si payer_ref no está en self.db: return PayerResponse( status=AuthStatus.DENIED, payer_ref=payer_ref, message=”Caso no encontrado (posible error del sistema de pago).”, denial_reason=DenialReason.OTHER, confianza=0.4, ) case = self.db[payer_ref]
caso[“polls”] += 1 si es el caso[“status”] == AuthStatus.SUBMITTED y caso[“polls”] >= 1: caso[“status”] = AuthStatus.IN_REVIEW si es el caso[“status”] == AuthStatus.IN_REVIEW y caso[“polls”] >= 3: si es el caso[“missing”]: caso[“status”] = Caso AuthStatus.DENEGADO[“denial_reason”] = DenialReason.MISSING_DOCS else: roll = random.random() si roll < 0.10: caso["status"] = Caso AuthStatus.DENEGADO["denial_reason"] = DenialReason.CODING_ISSUE elif roll < 0,18: caso["status"] = Caso AuthStatus.DENEGADO["denial_reason"] = DenialReason.MEDICAL_NECESSITY más: caso["status"] = AuthStatus.APPROVED si es el caso["status"] == AuthStatus.DENIED: dr = caso["denial_reason"] o DenialReason.OTHER falta = caso["missing"] si dr == DenialReason.MISSING_DOCS más [] conf = 0.9 if dr != DenialReason.OTHER else 0.55 return PayerResponse( status=AuthStatus.DENIED, payer_ref=payer_ref, message=f"Denied. Reason={dr.value}.", denial_reason=dr, miss_docs=missing, confianza=conf, ) si es el caso["status"] == AuthStatus.APPROVED: devuelve PayerResponse( estado=AuthStatus.APPROVED, payer_ref=payer_ref, mensaje="Aprobado. Autorización emitida.", confianza=0.95, ) devuelve PayerResponse( estado=caso["status"]payer_ref=payer_ref, mensaje=f"Estado={caso['status'].valor}. Encuestas={caso['polls']}.", confianza=0.9, ) def file_appeal(self, payer_ref: str, Appeal_text: str, adjuntos_docs: Lista[ClinicalDocument]) -> PayerResponse: si payer_ref no está en self.db: devuelve PayerResponse( status=AuthStatus.DENIED, payer_ref=payer_ref, message=”Apelación fallida: caso no encontrado.”, denial_reason=DenialReason.OTHER, confianza=0.4, ) case = self.db[payer_ref]
docs_present = {d.doc_type para d en adjuntos_docs} still_missing = [d for d in case[“missing”] si no está en el caso docs_present][“missing”] = caso aún_faltante[“status”] = AuthStatus.caso APELADO[“polls”] = 0 msg = “Apelación enviada y en cola para revisión.” si todavía falta: msg += f” Advertencia: todavía falta {‘, ‘.join([d.value for d in still_missing])}.” return PayerResponse(status=AuthStatus.APPEALED, payer_ref=payer_ref, mensaje=msg, confianza=0.9)