def extraer_función_fuente(texto_completo, nombre_función): texto = texto_completo.replace(“\r\n”, “\n”) cerca = re.search(r”“`(?:python)?\n(.*?)“`”, texto, banderas=re.S | re.I) if cerca: texto = cerca.grupo(1) patrón = rf”^def\s+{re.escape(function_name)}\s*\(” match = re.search(pattern, text, flags=re.M) si no coincide: return “” fragmento = texto[match.start():]
líneas = trozo.splitlines() recopiladas = []
para i, línea en enumerar(líneas): si i > 0: si line.startswith(“def “) o line.startswith(“class “): romper si line.startswith(“if __name__”): romper si línea y no line.startswith((” “, “\t”, “#”)) y re.match(r”^[A-Za-z_][A-Za-z0-9_]*\s*=”, línea): break recolectado.append(línea) fuente = “\n”.join(collected).rstrip() prueba: ast.parse(fuente) devuelve la fuente excepto SyntaxError: fix_lines = []
para la línea recopilada: fixed_lines.append(line) candidato = “\n”.join(fixed_lines).rstrip() intente: ast.parse(candidate) fuente = candidato excepto SyntaxError: pase devuelva fuente si source.strip().startswith(“def “) else “” def syntax_ok(fuente): intente: ast.parse(fuente) devuelva True, “” excepto SyntaxError como e: devuelva False, str(e) FORBIDDEN_NAMES = { “eval”, “exec”, “compile”, “open”, “input”, “__import__”, “globals”, “locals”, “vars”, “dir”, “getattr”, “setattr”, “delattr”, “help”, “breakpoint”, “exit”, “quit” } FORBIDDEN_NODES = (ast.Import, ast.ImportFrom, ast.Global, ast.Nonlocal, ast.With, ast.AsyncWith, ast.AsyncFunctionDef, ast.ClassDef, ast.Delete, ast.Raise,) ALLOWED_BUILTINS = { “abs”: abs, “all”: todos, “any”: any, “bool”: bool, “dict”: dict, “enumerate”: enumerate, “float”: float, “int”: int, “isinstance”: isinstance, “len”: len, “list”: list, “map”: map, “max”: max, “min”: min, “pow”: pow, “range”: range, “reversed”: invertido, “round”: round, “set”: set, “sorted”: ordenado, “str”: str, “sum”: suma, “tuple”: tuple, “zip”: zip, } def static_safety_check(fuente): try: tree = ast.parse(fuente) excepto Error de sintaxis como e: devuelve Falso, f”Error de sintaxis: {e}” para el nodo en ast.walk(árbol): si esinstancia(nodo, NODES_PROHIBIDOS): devuelve Falso, f”Nodo AST prohibido: {tipo(nodo).__nombre__}” si esinstancia(nodo, ast.Nombre): si nodo.id en NOMBRES_PROHIBIDOS o node.id.startswith(“__”): devuelve False, f”Nombre prohibido: {node.id}” si esinstancia(nodo, ast.Attribute): si node.attr.startswith(“__”): devuelve False, f”Atributo prohibido: {node.attr}” si esinstancia(nodo, ast.Call): si esinstancia(node.func, ast.Name) y node.func.id en FORBIDDEN_NAMES: devuelve Falso, f”Llamada prohibida: {node.func.id}” devuelve Verdadero, “aprobado” def _worker_run_tests(fuente, nombre_función, pruebas, cola): prueba: safe_globals = {“__builtins__”: ALLOWED_BUILTINS} safe_locals = {} compilado = compilar(fuente, “”, “exec”) exec(compilado, safe_globals, safe_locals) fn = safe_locals.get(nombre_función) o safe_globals.get(nombre_función) si fn es Ninguno: queue.put({“ok”: False, “error”: f”{nombre_función} no encontrado”, “aprobado”: 0, “total”: len(pruebas)}) retorno aprobado = 0 detalles = []
para prueba en pruebas: args = test.get(“args”, []) kwargs = test.get(“kwargs”, {}) esperado = prueba[“expected”]
resultado = fn(*args, **kwargs) ok = resultado == esperado aprobado += int(ok) detalles.append({ “args”: args, “kwargs”: kwargs, “esperado”: esperado, “resultado”: resultado, “ok”: ok, }) queue.put({“ok”: aprobado == len(pruebas), “error”: “”, “aprobado”: aprobado, “total”: len(pruebas), “detalles”: detalles}) excepto Excepción como e: queue.put({“ok”: False, “error”: repr(e), “aprobado”: 0, “total”: len(tests)}) def run_unit_tests_safely(fuente, nombre_función, pruebas, timeout_segundos=3): seguro, motivo = static_safety_check(fuente) si no es seguro: devolver {“ok”: Falso, “error”: motivo, “aprobado”: 0, “total”: len(pruebas), “detalles”: []} ctx = mp.get_context(“fork”) cola = ctx.Queue() proceso = ctx.Process(target=_worker_run_tests, args=(fuente, nombre_función, pruebas, cola)) proceso.start() proceso.join(timeout_segundos) if proceso.is_alive(): proceso.terminar() proceso.join() return {“ok”: False, “error”: “timeout”, “aprobado”: 0, “total”: len(pruebas), “detalles”: []} if queue.empty(): return {“ok”: False, “error”: “no se devolvió ningún resultado”, “aprobado”: 0, “total”: len(pruebas), “detalles”: []} return queue.get() def code_complexity(fuente): intente: bloques = cc_visit(fuente) si no es bloques: devuelve 1 devuelve max(block.complexity para bloque en bloques) excepto Excepción: devuelve Ninguno def score_candidate(fuente, test_result): syntax_score = 1 si syntax_ok(fuente)[0] else 0 safety_score = 1 si static_safety_check(fuente)[0] else 0 aprobado = test_result.get(“aprobado”, 0) total = max(test_result.get(“total”, 1), 1) test_score = aprobado / complejidad total = code_complexity(source) complex_penalty = 0 si la complejidad es Ninguno más min(complejidad / 20, 0.25) return syntax_score + safety_score + 3 * test_score – complex_penalty
líneas = trozo.splitlines() recopiladas = []
para i, línea en enumerar(líneas): si i > 0: si line.startswith(“def “) o line.startswith(“class “): romper si line.startswith(“if __name__”): romper si línea y no line.startswith((” “, “\t”, “#”)) y re.match(r”^[A-Za-z_][A-Za-z0-9_]*\s*=”, línea): break recolectado.append(línea) fuente = “\n”.join(collected).rstrip() prueba: ast.parse(fuente) devuelve la fuente excepto SyntaxError: fix_lines = []
para la línea recopilada: fixed_lines.append(line) candidato = “\n”.join(fixed_lines).rstrip() intente: ast.parse(candidate) fuente = candidato excepto SyntaxError: pase devuelva fuente si source.strip().startswith(“def “) else “” def syntax_ok(fuente): intente: ast.parse(fuente) devuelva True, “” excepto SyntaxError como e: devuelva False, str(e) FORBIDDEN_NAMES = { “eval”, “exec”, “compile”, “open”, “input”, “__import__”, “globals”, “locals”, “vars”, “dir”, “getattr”, “setattr”, “delattr”, “help”, “breakpoint”, “exit”, “quit” } FORBIDDEN_NODES = (ast.Import, ast.ImportFrom, ast.Global, ast.Nonlocal, ast.With, ast.AsyncWith, ast.AsyncFunctionDef, ast.ClassDef, ast.Delete, ast.Raise,) ALLOWED_BUILTINS = { “abs”: abs, “all”: todos, “any”: any, “bool”: bool, “dict”: dict, “enumerate”: enumerate, “float”: float, “int”: int, “isinstance”: isinstance, “len”: len, “list”: list, “map”: map, “max”: max, “min”: min, “pow”: pow, “range”: range, “reversed”: invertido, “round”: round, “set”: set, “sorted”: ordenado, “str”: str, “sum”: suma, “tuple”: tuple, “zip”: zip, } def static_safety_check(fuente): try: tree = ast.parse(fuente) excepto Error de sintaxis como e: devuelve Falso, f”Error de sintaxis: {e}” para el nodo en ast.walk(árbol): si esinstancia(nodo, NODES_PROHIBIDOS): devuelve Falso, f”Nodo AST prohibido: {tipo(nodo).__nombre__}” si esinstancia(nodo, ast.Nombre): si nodo.id en NOMBRES_PROHIBIDOS o node.id.startswith(“__”): devuelve False, f”Nombre prohibido: {node.id}” si esinstancia(nodo, ast.Attribute): si node.attr.startswith(“__”): devuelve False, f”Atributo prohibido: {node.attr}” si esinstancia(nodo, ast.Call): si esinstancia(node.func, ast.Name) y node.func.id en FORBIDDEN_NAMES: devuelve Falso, f”Llamada prohibida: {node.func.id}” devuelve Verdadero, “aprobado” def _worker_run_tests(fuente, nombre_función, pruebas, cola): prueba: safe_globals = {“__builtins__”: ALLOWED_BUILTINS} safe_locals = {} compilado = compilar(fuente, “”, “exec”) exec(compilado, safe_globals, safe_locals) fn = safe_locals.get(nombre_función) o safe_globals.get(nombre_función) si fn es Ninguno: queue.put({“ok”: False, “error”: f”{nombre_función} no encontrado”, “aprobado”: 0, “total”: len(pruebas)}) retorno aprobado = 0 detalles = []
para prueba en pruebas: args = test.get(“args”, []) kwargs = test.get(“kwargs”, {}) esperado = prueba[“expected”]
resultado = fn(*args, **kwargs) ok = resultado == esperado aprobado += int(ok) detalles.append({ “args”: args, “kwargs”: kwargs, “esperado”: esperado, “resultado”: resultado, “ok”: ok, }) queue.put({“ok”: aprobado == len(pruebas), “error”: “”, “aprobado”: aprobado, “total”: len(pruebas), “detalles”: detalles}) excepto Excepción como e: queue.put({“ok”: False, “error”: repr(e), “aprobado”: 0, “total”: len(tests)}) def run_unit_tests_safely(fuente, nombre_función, pruebas, timeout_segundos=3): seguro, motivo = static_safety_check(fuente) si no es seguro: devolver {“ok”: Falso, “error”: motivo, “aprobado”: 0, “total”: len(pruebas), “detalles”: []} ctx = mp.get_context(“fork”) cola = ctx.Queue() proceso = ctx.Process(target=_worker_run_tests, args=(fuente, nombre_función, pruebas, cola)) proceso.start() proceso.join(timeout_segundos) if proceso.is_alive(): proceso.terminar() proceso.join() return {“ok”: False, “error”: “timeout”, “aprobado”: 0, “total”: len(pruebas), “detalles”: []} if queue.empty(): return {“ok”: False, “error”: “no se devolvió ningún resultado”, “aprobado”: 0, “total”: len(pruebas), “detalles”: []} return queue.get() def code_complexity(fuente): intente: bloques = cc_visit(fuente) si no es bloques: devuelve 1 devuelve max(block.complexity para bloque en bloques) excepto Excepción: devuelve Ninguno def score_candidate(fuente, test_result): syntax_score = 1 si syntax_ok(fuente)[0] else 0 safety_score = 1 si static_safety_check(fuente)[0] else 0 aprobado = test_result.get(“aprobado”, 0) total = max(test_result.get(“total”, 1), 1) test_score = aprobado / complejidad total = code_complexity(source) complex_penalty = 0 si la complejidad es Ninguno más min(complejidad / 20, 0.25) return syntax_score + safety_score + 3 * test_score – complex_penalty