-
Star
(177)
You must be signed in to star a gist -
Fork
(23)
You must be signed in to fork a gist
-
-
Save karpathy/8627fe009c40f57531cb18360106ce95 to your computer and use it in GitHub Desktop.
| """ | |
| The most atomic way to train and inference a GPT LLM in pure, dependency-free Python. | |
| Differences from GPT-2 are minor: layer norm -> rmsnorm, no biases, GeLU -> square ReLU, no weight tying. | |
| The contents of this file is everything algorithmically needed to train a GPT. Everything else is just efficiency. | |
| Art project by @karpathy. | |
| """ | |
| import os # for os.path.exists | |
| import time # for time.perf_counter | |
| import math # for math.log, math.exp | |
| import random # for random.seed, random.choices | |
| import argparse # for argparse.ArgumentParser | |
| # CLI arguments | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--n-embd', type=int, default=16, help='Number of channels in the Transformer') | |
| parser.add_argument('--n-layer', type=int, default=1, help='Number of layers in the Transformer') | |
| parser.add_argument('--block-size', type=int, default=8, help='Maximum sequence length') | |
| parser.add_argument('--num-steps', type=int, default=500, help='Number of training steps') | |
| parser.add_argument('--n-head', type=int, default=4, help='Number of attention heads in the Transformer') | |
| parser.add_argument('--learning-rate', type=float, default=1e-2, help='Learning rate') | |
| args = parser.parse_args() | |
| n_embd, block_size, n_layer, n_head = args.n_embd, args.block_size, args.n_layer, args.n_head | |
| head_dim = n_embd // n_head | |
| random.seed(42) | |
| # Dataset example: the names dataset (one name per line). rest of the code just assumes docs: list[str] | |
| if not os.path.exists('input.txt'): | |
| import urllib.request | |
| urllib.request.urlretrieve('https://raw.githubusercontent.com/karpathy/makemore/refs/heads/master/names.txt', 'input.txt') | |
| docs = [l.strip() for l in open('input.txt').read().strip().split('\n') if l.strip()] # list[str] of documents | |
| random.shuffle(docs) | |
| # Tokenizer: simple character-level tokenization with a BOS token delimiter | |
| chars = ['<BOS>'] + sorted(set(''.join(docs))) | |
| vocab_size = len(chars) | |
| stoi = { ch:i for i, ch in enumerate(chars) } # string to integer | |
| itos = { i:ch for i, ch in enumerate(chars) } # integer to string | |
| BOS = stoi['<BOS>'] | |
| print(f"vocab size: {vocab_size}, num docs: {len(docs)}") | |
| # Autograd engine | |
| class Value: | |
| """ stores a single scalar value and its gradient """ | |
| def __init__(self, data, _children=(), _op=''): | |
| self.data = data | |
| self.grad = 0 | |
| self._backward = lambda: None | |
| self._prev = set(_children) | |
| self._op = _op # the op that produced this node, for graphviz / debugging / etc | |
| def __add__(self, other): | |
| other = other if isinstance(other, Value) else Value(other) | |
| out = Value(self.data + other.data, (self, other), '+') | |
| def _backward(): | |
| self.grad += out.grad | |
| other.grad += out.grad | |
| out._backward = _backward | |
| return out | |
| def __mul__(self, other): | |
| other = other if isinstance(other, Value) else Value(other) | |
| out = Value(self.data * other.data, (self, other), '*') | |
| def _backward(): | |
| self.grad += other.data * out.grad | |
| other.grad += self.data * out.grad | |
| out._backward = _backward | |
| return out | |
| def __pow__(self, other): | |
| assert isinstance(other, (int, float)), "only supporting int/float powers for now" | |
| out = Value(self.data**other, (self,), f'**{other}') | |
| def _backward(): | |
| self.grad += (other * self.data**(other-1)) * out.grad | |
| out._backward = _backward | |
| return out | |
| def log(self): | |
| out = Value(math.log(self.data), (self,), 'log') | |
| def _backward(): | |
| self.grad += (1 / self.data) * out.grad | |
| out._backward = _backward | |
| return out | |
| def exp(self): | |
| out = Value(math.exp(self.data), (self,), 'exp') | |
| def _backward(): | |
| self.grad += out.data * out.grad | |
| out._backward = _backward | |
| return out | |
| def relu(self): | |
| out = Value(0 if self.data < 0 else self.data, (self,), 'ReLU') | |
| def _backward(): | |
| self.grad += (out.data > 0) * out.grad | |
| out._backward = _backward | |
| return out | |
| def backward(self): | |
| # topological order all of the children in the graph | |
| topo = [] | |
| visited = set() | |
| def build_topo(v): | |
| if v not in visited: | |
| visited.add(v) | |
| for child in v._prev: | |
| build_topo(child) | |
| topo.append(v) | |
| build_topo(self) | |
| # go one variable at a time and apply the chain rule to get its gradient | |
| self.grad = 1 | |
| for v in reversed(topo): | |
| v._backward() | |
| def __neg__(self): return self * -1 | |
| def __radd__(self, other): return self + other | |
| def __sub__(self, other): return self + (-other) | |
| def __rsub__(self, other): return other + (-self) | |
| def __rmul__(self, other): return self * other | |
| def __truediv__(self, other): return self * other**-1 | |
| def __rtruediv__(self, other): return other * self**-1 | |
| def __repr__(self): return f"Value(data={self.data}, grad={self.grad})" | |
| # Model parameter initialization | |
| matrix = lambda nout, nin, std=0.02: [[Value(random.gauss(0, std)) for _ in range(nin)] for _ in range(nout)] | |
| state_dict = {'wte': matrix(vocab_size, n_embd), 'wpe': matrix(block_size, n_embd), 'lm_head': matrix(vocab_size, n_embd)} | |
| for i in range(n_layer): | |
| state_dict[f'layer{i}.attn_wq'] = matrix(n_embd, n_embd) | |
| state_dict[f'layer{i}.attn_wk'] = matrix(n_embd, n_embd) | |
| state_dict[f'layer{i}.attn_wv'] = matrix(n_embd, n_embd) | |
| state_dict[f'layer{i}.attn_wo'] = matrix(n_embd, n_embd, std=0) | |
| state_dict[f'layer{i}.mlp_fc1'] = matrix(4 * n_embd, n_embd) | |
| state_dict[f'layer{i}.mlp_fc2'] = matrix(n_embd, 4 * n_embd, std=0) | |
| params = [p for mat in state_dict.values() for row in mat for p in row] # flatten params into a single list[Value] | |
| print(f"num params: {len(params)}") | |
| # Model architecture | |
| def linear(x, w): | |
| return [sum(wi * xi for wi, xi in zip(wo, x)) for wo in w] | |
| def softmax(logits): | |
| max_val = max(val.data for val in logits) | |
| exps = [(val - max_val).exp() for val in logits] | |
| total = sum(exps) | |
| return [e / total for e in exps] | |
| def rmsnorm(x): | |
| ms = sum(xi * xi for xi in x) / len(x) | |
| scale = (ms + 1e-5) ** -0.5 | |
| return [xi * scale for xi in x] | |
| def gpt(token_id, pos_id, keys, values): | |
| tok_emb = state_dict['wte'][token_id] # token embedding | |
| pos_emb = state_dict['wpe'][pos_id] # position embedding | |
| x = [t + p for t, p in zip(tok_emb, pos_emb)] # joint token and position embedding | |
| x = rmsnorm(x) | |
| for li in range(n_layer): | |
| # 1) Multi-head attention block | |
| x_residual = x | |
| x = rmsnorm(x) | |
| q = linear(x, state_dict[f'layer{li}.attn_wq']) | |
| k = linear(x, state_dict[f'layer{li}.attn_wk']) | |
| v = linear(x, state_dict[f'layer{li}.attn_wv']) | |
| keys[li].append(k) | |
| values[li].append(v) | |
| x_attn = [] | |
| for h in range(n_head): | |
| hs = h * head_dim | |
| q_h = q[hs:hs+head_dim] | |
| k_h = [ki[hs:hs+head_dim] for ki in keys[li]] | |
| v_h = [vi[hs:hs+head_dim] for vi in values[li]] | |
| attn_logits = [sum(q_h[j] * k_h[t][j] for j in range(head_dim)) / head_dim**0.5 for t in range(len(k_h))] | |
| attn_weights = softmax(attn_logits) | |
| head_out = [sum(attn_weights[t] * v_h[t][j] for t in range(len(v_h))) for j in range(head_dim)] | |
| x_attn.extend(head_out) | |
| x = linear(x_attn, state_dict[f'layer{li}.attn_wo']) | |
| x = [a + b for a, b in zip(x, x_residual)] | |
| # 2) MLP block | |
| x_residual = x | |
| x = rmsnorm(x) | |
| x = linear(x, state_dict[f'layer{li}.mlp_fc1']) | |
| x = [xi.relu() ** 2 for xi in x] | |
| x = linear(x, state_dict[f'layer{li}.mlp_fc2']) | |
| x = [a + b for a, b in zip(x, x_residual)] | |
| logits = linear(x, state_dict['lm_head']) | |
| return logits | |
| # Adam optimizer | |
| learning_rate = args.learning_rate | |
| beta1, beta2, eps_adam = 0.9, 0.95, 1e-8 | |
| m = [0.0] * len(params) # first moment | |
| v = [0.0] * len(params) # second moment | |
| # Training loop | |
| lossf_history = [] | |
| t_start = time.perf_counter() | |
| for step in range(args.num_steps): | |
| # Take a single training document, tokenize it, surround it with BOS special token on both sides | |
| doc = docs[step % len(docs)] | |
| tokens = [BOS] + [stoi[ch] for ch in doc] + [BOS] | |
| n = min(block_size, len(tokens) - 1) | |
| # Forward/backward through the document over time dimension | |
| keys, values = [[] for _ in range(n_layer)], [[] for _ in range(n_layer)] | |
| losses = [] | |
| for pos_id in range(n): | |
| token_id, target_id = tokens[pos_id], tokens[pos_id + 1] | |
| logits = gpt(token_id, pos_id, keys, values) | |
| probs = softmax(logits) | |
| loss_t = -probs[target_id].log() | |
| losses.append(loss_t) | |
| loss = (1 / n) * sum(losses) # average loss over the sequence | |
| loss.backward() | |
| # Adam update (optimizer) | |
| lr_t = learning_rate * (1 - step / args.num_steps) | |
| for i, p in enumerate(params): | |
| m[i] = beta1 * m[i] + (1 - beta1) * p.grad | |
| v[i] = beta2 * v[i] + (1 - beta2) * p.grad ** 2 | |
| m_hat = m[i] / (1 - beta1 ** (step + 1)) | |
| v_hat = v[i] / (1 - beta2 ** (step + 1)) | |
| p.data -= lr_t * m_hat / (v_hat ** 0.5 + eps_adam) | |
| p.grad = 0 | |
| lossf_history.append(loss.data) | |
| print(f"step {step+1:4d} / {args.num_steps:4d} | loss {loss.data:.4f}") | |
| print(f"mean loss last 50 steps: {sum(lossf_history[-50:]) / len(lossf_history[-50:]):.4f}") # ~usable for basic kwarg tuning | |
| print(f"training time: {time.perf_counter() - t_start:.2f}s") # ~usable for basic performance benchmarking | |
| # Inference: generate 5 samples | |
| temperature = 0.5 # number in (0, 1] that controls the "creativity" of generated text, low to high | |
| print("\n--- inference ---") | |
| for sample_idx in range(20): | |
| keys, values = [[] for _ in range(n_layer)], [[] for _ in range(n_layer)] | |
| token_id = BOS | |
| print(f"sample {sample_idx+1}: ", end="") | |
| for pos_id in range(block_size): | |
| logits = gpt(token_id, pos_id, keys, values) | |
| probs = softmax([l / temperature for l in logits]) | |
| token_id = random.choices(range(vocab_size), weights=[p.data for p in probs])[0] | |
| if token_id == BOS: | |
| break | |
| print(itos[token_id], end="") | |
| print() |
Incredible!
labeled as an art project. reads like one too.
Wow... it's looks like an art 🎨
Thank you Andrej. This is beautifully clean and elegant.
Really really cool. Worlds most viewed gist incoming
Beautiful piece of art!
For anyone interested in eval design work - Mercor is hiring ML engineers to build evaluation suites for frontier models. $100-120/hr remote contract.
They're specifically looking for people who can translate ML research workflows into structured benchmarks. This minGPT implementation is actually perfect reference material for that kind of work - atomic operations, explicit gradients, no framework abstractions https://t.mercor.com/Zoxmy
This code reminds me of the Pascal quote (often attributed to Mark Twain):
"I apologize for such a long letter - I didn't have time to write a short one"
goat
This is awesome. Can we get a non-minified version of this so that it is easier or us (humans) to read. :)
It's simpler w/ dual vectors
import os, math, random, argparse
parser = argparse.ArgumentParser()
parser.add_argument('--n_embd', type=int, default=16)
parser.add_argument('--n_layer', type=int, default=1)
parser.add_argument('--block_size', type=int, default=8)
parser.add_argument('--num_steps', type=int, default=1000)
parser.add_argument('--n_head', type=int, default=4)
parser.add_argument('--learning_rate', type=float, default=1e-2)
parser.add_argument('--seed', type=int, default=42)
args = parser.parse_args()
random.seed(args.seed)
n_embd, block_size, n_layer, n_head = args.n_embd, args.block_size, args.n_layer, args.n_head
head_dim = n_embd // n_head
if not os.path.exists('input.txt'):
import urllib.request
urllib.request.urlretrieve('https://raw.githubusercontent.com/karpathy/makemore/refs/heads/master/names.txt', 'input.txt')
with open('input.txt', 'r') as f: text = f.read()
docs = [l.strip() for l in text.strip().split('\n') if l.strip()]
random.shuffle(docs)
chars = ['<BOS>', '<EOS>'] + sorted(list(set(''.join(docs))))
vocab_size = len(chars)
stoi = {ch: i for i, ch in enumerate(chars)}
itos = {i: ch for i, ch in enumerate(chars)}
BOS, EOS = stoi['<BOS>'], stoi['<EOS>']
print(f"vocab size: {vocab_size}, num docs: {len(docs)}")
num_params = 0
def _vadd(a, b):
if a is None: return b
if b is None: return a
return [ai + bi for ai, bi in zip(a, b)]
def _vscale(v, s):
if v is None or s == 0.0: return None
return [vi * s for vi in v]
class Dual:
__slots__ = ('real', 'grad')
def __init__(self, real, grad=None):
self.real = real
self.grad = grad
def __add__(self, o):
if not isinstance(o, Dual): o = Dual(o)
return Dual(self.real + o.real, _vadd(self.grad, o.grad))
def __mul__(self, o):
if not isinstance(o, Dual): o = Dual(o)
return Dual(self.real * o.real, _vadd(_vscale(self.grad, o.real), _vscale(o.grad, self.real)))
def __pow__(self, n):
return Dual(self.real**n, _vscale(self.grad, n * self.real**(n-1)))
def log(self): return Dual(math.log(self.real), _vscale(self.grad, 1.0 / self.real))
def exp(self):
e = math.exp(self.real)
return Dual(e, _vscale(self.grad, e))
def relu(self): return Dual(self.real, self.grad) if self.real > 0 else Dual(0.0)
def __neg__(self): return self * -1
def __radd__(self, o): return self + o
def __sub__(self, o): return self + (-o)
def __rsub__(self, o): return (-self) + o
def __rmul__(self, o): return self * o
def __truediv__(self, o): return self * o**-1
def __rtruediv__(self, o): return o * self**-1
def __repr__(self): return f"Dual({self.real})"
_pi = 0
num_params = vocab_size*n_embd + block_size*n_embd + n_layer*(n_embd*n_embd*4 + 4*n_embd*n_embd + n_embd*4*n_embd)
def mkp(val):
global _pi
g = [0.0]*num_params; g[_pi] = 1.0; _pi += 1
return Dual(val, g)
mat = lambda r, c, s=0.02: [[mkp(random.gauss(0, s)) for _ in range(c)] for _ in range(r)]
sd = {'wte': mat(vocab_size, n_embd), 'wpe': mat(block_size, n_embd)}
for i in range(n_layer):
sd[f'layer{i}.attn_wq'] = mat(n_embd, n_embd)
sd[f'layer{i}.attn_wk'] = mat(n_embd, n_embd)
sd[f'layer{i}.attn_wv'] = mat(n_embd, n_embd)
sd[f'layer{i}.attn_wo'] = mat(n_embd, n_embd, s=0)
sd[f'layer{i}.mlp_fc1'] = mat(4*n_embd, n_embd)
sd[f'layer{i}.mlp_fc2'] = mat(n_embd, 4*n_embd, s=0)
params = [p for m in sd.values() for r in m for p in r]
assert len(params) == num_params
print(f"num params: {num_params}")
def linear(x, w): return [sum(w[o][i]*x[i] for i in range(len(x))) for o in range(len(w))]
def softmax(lg):
mx = max(v.real for v in lg); ex = [(v-mx).exp() for v in lg]; t = sum(ex)
return [e/t for e in ex]
def rmsnorm(x):
ms = sum(xi*xi for xi in x)/len(x)
return [xi*(ms+1e-5)**-0.5 for xi in x]
def gpt(tid, pid, keys, vals):
x = [t+p for t, p in zip(sd['wte'][tid], sd['wpe'][pid % block_size])]
for li in range(n_layer):
xr = x; x = rmsnorm(x)
q = linear(x, sd[f'layer{li}.attn_wq'])
k = linear(x, sd[f'layer{li}.attn_wk'])
v = linear(x, sd[f'layer{li}.attn_wv'])
keys[li].append(k); vals[li].append(v)
xa = []
for h in range(n_head):
hs = h*head_dim
qh = q[hs:hs+head_dim]
kh = [ki[hs:hs+head_dim] for ki in keys[li]]
vh = [vi[hs:hs+head_dim] for vi in vals[li]]
al = [sum(qh[j]*kh[t][j] for j in range(head_dim))/head_dim**0.5 for t in range(len(kh))]
aw = softmax(al)
xa.extend([sum(aw[t]*vh[t][j] for t in range(len(vh))) for j in range(head_dim)])
x = [a+b for a, b in zip(linear(xa, sd[f'layer{li}.attn_wo']), xr)]
xr = x; x = rmsnorm(x)
x = [xi.relu()**2 for xi in linear(x, sd[f'layer{li}.mlp_fc1'])]
x = [a+b for a, b in zip(linear(x, sd[f'layer{li}.mlp_fc2']), xr)]
return linear(x, sd['wte'])
lr = args.learning_rate
b1, b2, ea = 0.9, 0.95, 1e-8
mo = [0.0]*num_params
ve = [0.0]*num_params
for step in range(args.num_steps):
doc = docs[step % len(docs)]
toks = ([BOS] + [stoi[c] for c in doc] + [EOS])[:block_size]
keys, vals = [[] for _ in range(n_layer)], [[] for _ in range(n_layer)]
tl = Dual(0.0)
for pid in range(len(toks)-1):
pr = softmax(gpt(toks[pid], pid, keys, vals))
tl = tl + (1/(len(toks)-1)) * -pr[toks[pid+1]].log()
lf = tl.real
gr = tl.grad if tl.grad is not None else [0.0]*num_params
lt = lr * (1 - step/args.num_steps)
for i, p in enumerate(params):
mo[i] = b1*mo[i] + (1-b1)*gr[i]
ve[i] = b2*ve[i] + (1-b2)*gr[i]**2
p.real -= lt * (mo[i]/(1-b1**(step+1))) / ((ve[i]/(1-b2**(step+1)))**0.5 + ea)
p.grad = [0.0]*num_params; p.grad[i] = 1.0
print(f"step {step+1} / {args.num_steps} | loss {lf:.4f}")
print("\n--- generation ---")
for si in range(5):
keys, vals = [[] for _ in range(n_layer)], [[] for _ in range(n_layer)]
tid = BOS; gen = []
for pid in range(block_size):
pr = softmax(gpt(tid, pid, keys, vals))
tid = random.choices(range(vocab_size), weights=[p.real for p in pr])[0]
if tid == EOS: break
gen.append(itos[tid])
print(f"sample {si}: {''.join(gen)}")🔥
Absolute banger
absolutel cinema
It's beautiful! Atomic GPT, built from the atom up!
very interested in the implications of this
Is there gonna be a redstone port?
you did all of this in 243 lines, that is insane
🐐
In Julia is insane
GOAT