Cómo construir una canalización federada que preserve la privacidad para ajustar modelos de lenguaje grandes con LoRA usando Flower y PEFT
!pip -q instalar -U “protobuf<5" "flwr[simulation]" Los transformadores pueden acelerar conjuntos de datos oración importar antorcha si torch.cuda.is_available(): !pip -q install -U bitsandbytes importar os os.environ["RAY_DISABLE_USAGE_STATS"] = "1" sistema operativo.entorno["TOKENIZERS_PARALLELISM"] = "falso" importar matemáticas importar aleatorio importar numpy como np desde escribir importar Dict, List, Tuple, opcional desde torch.utils.data importar DataLoader desde conjuntos de datos importar conjunto de datos importar flwr como fl desde flwr.common importar contexto desde transformadores importar AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, DataCollatorForLanguageModeling desde peft importar LoraConfig, get_peft_model, prepare_model_for_kbit_training SEED = 7 random.seed(SEED) np.random.seed(SEED) torch.manual_seed(SEED) torch.cuda.manual_seed_all(SEED) DEVICE = "cuda" if torch.cuda.is_available() else "cpu" print("Dispositivo:", DISPOSITIVO) GPU_MODEL_ID = "TinyLlama/TinyLlama-1.1B-Chat-v1.0" CPU_MODEL_ID = "distilgpt2" MODEL_ID = GPU_MODEL_ID si DEVICE == "cuda" else CPU_MODEL_ID MAX_LEN = 256 if DEVICE == "cuda" else 192 NUM_CLIENTS = 3 ROUNDS = 3 LOCAL_EPOCHS = 1 BATCH_SIZE = 2 GRAD_ACCUM = 4 LR = 2e-4 WARMUP_STEPS = 5 WEIGHT_DECAY = 0.0 LOG_EVERY = 10 CLIENT_TEXTS: Dict[int, List[str]]= {0: [ "Policy memo: Employees must rotate on-call weekly and document incidents in the internal tracker.", "Runbook: If latency spikes, check the database connection pool and recent deploys, then roll back if needed.", "Security note: Never paste customer identifiers into public issue trackers. Use redacted tokens.", "Engineering guideline: Prefer idempotent retries for event processing; avoid duplicate side-effects.", "Postmortem template: impact, timeline, root cause, contributing factors, action items, owners, deadlines." ]1: [ "Credit risk review: monitor delinquency curves by cohort and compare against seasonal baselines.", "Fraud signals: repeated small authorizations, device changes, and sudden merchant-category shifts require review.", "Portfolio strategy: tighten limits on volatile segments while maintaining service levels for stable accounts.", "Operational note: reconcile chargebacks weekly and track win-rate by reason code.", "Internal SOP: escalation path is analyst -> manager -> compliance for high-risk cases." ]2: [ "Fleet ops: preventive maintenance reduces downtime; prioritize vehicles with repeated fault codes.", "Dispatch note: optimize routes by time windows and driver hours to reduce empty miles.", "Safety policy: enforce rest breaks and log inspections before long-haul trips.", "Inventory update: track spare parts usage; reorder thresholds should reflect lead time and seasonality.", "Customer SLA: late deliveries require proactive notifications and documented root cause." ]} para cid en la lista (CLIENT_TEXTS.keys()): base = CLIENT_TEXTS[cid] TEXTOS_CLIENTE[cid] = bases + [f"Q: Summarize this for leadership. A: {t}" for t in base] tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, use_fast=True) si tokenizer.pad_token es Ninguno: tokenizer.pad_token = tokenizer.eos_token bnb_config: Opcional[BitsAndBytesConfig] = Ninguno si DISPOSITIVO == "cuda": compute_dtype = torch.bfloat16 si torch.cuda.get_device_capability(0)[0] >= 8 else torch.float16 bnb_config = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_quant_type=”nf4″, bnb_4bit_use_double_quant=True, bnb_4bit_compute_dtype=compute_dtype) si “gpt2” en MODEL_ID.lower(): TARGET_MODULES = [“c_attn”, “c_proj”]
más: TARGET_MODULES = [“q_proj”, “k_proj”, “v_proj”, “o_proj”]
LORA_R = 16 LORA_ALPHA = 32 LORA_DROPOUT = 0.05 lora_config = LoraConfig(r=LORA_R, lora_alpha=LORA_ALPHA, lora_dropout=LORA_DROPOUT, sesgo=”none”, task_type=”CAUSAL_LM”, target_modules=TARGET_MODULES) def model_primary_device(modelo) -> torch.device: return next(model.parameters()).device def build_model_with_lora(): if DEVICE == “cuda”: model = AutoModelForCausalLM.from_pretrained(MODEL_ID, device_map=”auto”, quantization_config=bnb_config, torch_dtype=”auto”) model = prepare_model_for_kbit_training(model) else: model = AutoModelForCausalLM.from_pretrained(MODEL_ID, torch_dtype=torch.float32) model.to(“cpu”) model = get_peft_model(model, lora_config) model.train() return model def make_dataset(textos: Lista[str]) -> Conjunto de datos: ds = Dataset.from_dict({“text”: texts}) def tok(batch): return tokenizer(batch)[“text”]truncamiento=Verdadero, max_length=MAX_LEN, padding=”max_length”) ds = ds.map(tok, lotes=Verdadero, remove_columns=[“text”]) return ds collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False) def lora_state_keys(modelo) -> Lista[str]: sd = model.state_dict() claves = ordenado([k for k in sd.keys() if “lora_” in k]) si no son claves: aumente RuntimeError(“No se encontraron claves LoRA. Es posible que su modelo no tenga los módulos de destino especificados. ” f”Current TARGET_MODULES={TARGET_MODULES}, MODEL_ID={MODEL_ID}”) devolver claves def get_lora_ndarrays(modelo) -> Lista[np.ndarray]: sd = model.state_dict() claves = lora_state_keys(modelo) retorno [sd[k].detach().float().cpu().numpy() para k en claves]def set_lora_ndarrays(modelo, matrices: Lista[np.ndarray]) -> Ninguno: claves = lora_state_keys(modelo) if len(claves) != len(arrays): elevar ValueError(f”Mismatch: obtuve {len(arrays)} matrices pero esperaba {len(keys)}.”) sd = model.state_dict() para k, arr en zip(keys, arrays): t = torch.from_numpy(arr).to(sd[k].dispositivo).a(sd[k].dtipo) sd[k].copy_
dev = model_primary_device(modelo) para i, lote en enumerar(dl): si i >= max_batches: romper lote = {k: v.to(dev) para k, v en lote.items()} out = modelo(**lote, etiquetas=lote[“input_ids”]) pérdidas.append(float(out.loss.detach().cpu())) model.train() return float(np.mean(losses)) si las pérdidas else float(“nan”) def train_one_client_round(model, ds: Dataset, epochs: int, lr: float, grad_accum: int, warmup_steps: int) -> Tupla[float, int]: dl = DataLoader(ds, lote_size=BATCH_SIZE, shuffle=True, collate_fn=collator) total_steps = max(1, (len(dl) * epochs) // max(1, grad_accum)) paso = 0 optimizador = torch.optim.AdamW(model.parameters(), lr=lr,weight_decay=WEIGHT_DECAY) optimizador.zero_grad(set_to_none=True) ejecutándose = []
ejemplos = 0 dev = model_primary_device(modelo) para _ en rango(épocas): para bi, lote en enumerar(dl): lote = {k: v.to(dev) para k, v en lote.items()} out = modelo(**lote, etiquetas=lote[“input_ids”]) pérdida = out.loss / grad_accum pérdida.backward() running.append(float(loss.detach().cpu()) * grad_accum) ejemplos += lote[“input_ids”].forma[0]
if (bi + 1) % grad_accum == 0: lr_t = cosine_warmup_lr(step, total_steps, lr, warmup_steps) para pg en optimizador.param_groups: pg[“lr”] = lr_t optimizador.step() optimizador.zero_grad(set_to_none=True) paso += 1 si paso % LOG_EVERY == 0: print(f” paso={paso}/{total_steps} pérdida={np.mean(ejecutando[-LOG_EVERY:]):.4f} lr={lr_t:.2e}”) devuelve float(np.mean(running)) si se ejecuta else float(“nan”), ejemplos