Cómo acelerar el entrenamiento de transformadores usando NVIDIA Apex (FusedAdam, FusedLayerNorm) y Native torch.amp
print(“\n### SECCIÓN D: Transformador de extremo a extremo (vanilla fp32 vs Apex fusionado + AMP) ###”) VOCAB, D, NHEAD, LAYERS, SEQ, BATCH, STEPS = 2000, 256, 4, 4, 128, 32, 60 class Block(torch.nn.Module): def __init__(self, d, nhead, norm_cls): super().__init__() self.attn = torch.nn.MultiheadAttention(d, nhead, batch_first=True) self.ff = torch.nn.Sequential(torch.nn.Linear(d, 4 * d), torch.nn.GELU(), torch.nn.Linear(4 * d, d)) self.n1, self.n2 = norm_cls(d), norm_cls(d) def adelante(self, x): h = self.n1(x); x = x + self.attn(h, h, h, necesita_pesos=Falso)[0]
return x + self.ff(self.n2(x)) clase TinyTransformer(torch.nn.Module): def __init__(self, norm_cls): super().__init__() self.emb = torch.nn.Embedding(VOCAB, D) self.blocks = torch.nn.ModuleList([Block(D, NHEAD, norm_cls) for _ in range(LAYERS)]) self.norm = norm_cls(D) self.head = torch.nn.Linear(D, VOCAB) def forward(self, idx): x = self.emb(idx) para b en self.blocks: x = b(x) return self.head(self.norm(x)) g = torch.Generator(device=”cpu”).manual_seed(0) data = torch.randint(0, VOCAB, (BATCH, SEQ + 1), generador=g).to(DEV) entrada, tgt = datos[:, :-1]datos[:, 1:]
lossfn = torch.nn.CrossEntropyLoss() def run_training(use_apex): torch.manual_seed(0) norm_cls = (FusedLayerNorm si (use_apex y HAS_FLN y APEX_OK) else torch.nn.LayerNorm) modelo = TinyTransformer(norm_cls).to(DEV) si use_apex y HAS_AMP_C y APEX_OK: optimizador = FusedAdam(model.parameters(), lr=3e-4) else: optimizador = torch.optim.AdamW(model.parameters(), lr=3e-4) escalador = torch.amp.GradScaler(“cuda”, enable=use_apex) def one_step(): optimizador.zero_grad(set_to_none=True) con torch.amp.autocast(“cuda”, dtype=torch.float16, enable=use_apex): logits = model(inp) pérdida = lossfn(logits.reshape(-1, VOCAB), tgt.reshape(-1)) escalar.scale(loss).backward() escalar.step(optimizador) escalar.update() pérdida de retorno para _ en rango(5): one_step() torch.cuda.synchronize() t0 = time.perf_counter() para _ dentro del rango(PASOS): pérdida = one_step() torch.cuda.synchronize() dt = time.perf_counter() – t0 return loss.item(), (STEPS * BATCH * SEQ) / dt, dt loss_v, tps_v, dt_v = run_training(use_apex=False) print(f” vainilla (fp32, nn.LayerNorm, AdamW): ” f”{dt_v:5.2f}s | {tps_v:9.0f} tok/s | pérdida final {loss_v:.3f}”) si APEX_OK y (HAS_AMP_C o HAS_FLN): loss_a, tps_a, dt_a = run_training(use_apex=True) print(f” ápice (fp16, FusedLayerNorm, FusedAdam): ” f”{dt_a:5.2f}s | {tps_a:9.0f} tok/s | pérdida final {loss_a:.3f}”) print(f” —-> aceleración: {tps_a / tps_v:0.2f}x rendimiento”) else: print(” ruta del ápice SALTADA (sin núcleos fusionados) build)”) print(“\n” + “=” * 78) print(“HECHO. Conclusiones clave:”) print(” – FusedAdam/FusedLayerNorm/FusedRMSNorm son las piezas de Apex aún relevantes;”) print(” las aceleraciones crecen con el tamaño del modelo y el recuento de parámetros (una pequeña demostración lo subestima).”) print(” – apex.amp está en desuso -> prefiero torch.amp.autocast + torch.amp.GradScaler.”) print(” – FusedAdam compone limpiamente con torch.amp nativo (Sección D).”) print(” – En cargas de trabajo reales, pruebe también con un modelo más grande y autocast bf16 (no se necesita escalador).”) print(“=” * 78)