Denoising Diffusion Models Part 2 - Toy Example

Intro

Stable diffusion landed not long ago, and has already taken the generative ai world by storm. This post follows Part 1, which developed the theory.</br>
Almost all of this paper is just a direct implementation of the ideas contained in Denoising Diffusion Probabilistic Models

In [1]:
import torch
import numpy as np
from IPython.display import Image

Problem

Instead of complicated image generation tasks, let's learn a simple generative model for a structured 2d vector.

In this case lets use a 2d vector where the second entry is 3 larger than the first, and we'll draw the first entry from a 0,1 gausian.

In [2]:
DATA_VECTOR_SIZE = 2
# Training set

data = np.zeros((1000, DATA_VECTOR_SIZE))
data[:,0] = np.random.normal(0,1,size=1000)
data[:,1] = data[:,0]+3

#a "validation/test" set

data_validation = np.zeros((1000, DATA_VECTOR_SIZE))
data_validation[:,0] = np.random.normal(0,1,size=1000)
data_validation[:,1] = data_validation[:,0]+3
In [3]:
data
Out[3]:
array([[ 1.36190115,  4.36190115],
       [ 1.36516221,  4.36516221],
       [ 0.68204398,  3.68204398],
       ...,
       [ 0.36040593,  3.36040593],
       [ 1.50456992,  4.50456992],
       [-0.96962201,  2.03037799]])

Next, we need a noise schedule - I've just copied directly from the paper here, linearly interpolating between a start and end value.

$\beta_{t}$ is a variance, and to simplify some of the math, the paper also introduces $\alpha_{t} = 1- \beta_{t}$ and $\bar{\alpha}_{t} = \prod_{s=1}^{t} \alpha_{s}$.

As we fix the $\beta$ schedule ahead of time, we can precompute the $\bar{\alpha}_{t}$, as I've done below.

In [4]:
class LinearNoiseScheduler:
    def __init__(self, start, end, T):
        self.start = start
        self.end = end
        self.T = T
        self.alpha_bars = None
        self.precompute_alpha_bar()
        
    def get_variance(self, t):
        return self.start + (self.end - self.start)*(t-1)/(self.T-1)
        
    def get_alpha(self, t):
        return 1-self.get_variance(t)
        
    def get_alpha_bar(self, t):
        assert self.alpha_bars is not None
        
        return self.alpha_bars[t-1]
        
    def precompute_alpha_bar(self):
        tmp = np.zeros((self.T))
        
        for i in range(self.T):
            tmp[i-1] = self.get_alpha(i)
            
        self.alpha_bars = np.cumprod(tmp)
        

Model

As discussed in the previous post, we also need a noise prediction model.

We're going to use a really small neural net here.

One small detail, is that the model we want to learn is given by:

$\epsilon_{\theta}(x_{t}, t)$

In a larger problem, we would use transformers and positional encodings for the t parameter. Here, due to problem size, I've gone with a simple embedding for t -> (cos(t), sin(t)).

Also, in the full stable diffusion models, the diffusion runs over the latent variable from an encoder/decoder step. Obviously we could do this, but its overkill here, we'll just learn it all in one go.

In [5]:
class SkipLayer(torch.nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.l = torch.nn.Linear(input_dim, input_dim, bias=False)
        
    def forward(self, x):
        return self.l(x) + x

class Model(torch.nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        
        self.input_dim = input_dim

        
        self.net = torch.nn.Sequential(
            torch.nn.Linear(input_dim+2, 5), # add 2 because input_dim is the vector size, and need two for the time embedding.
            torch.nn.ReLU(),
            torch.nn.Linear(5, 3),
            torch.nn.ReLU(),
            SkipLayer(3),
            torch.nn.ReLU(),
            SkipLayer(3),
            torch.nn.ReLU(),
            torch.nn.Linear(3, input_dim)
        )
        
    def forward(self, x):
        return self.net(x)
            
            
In [6]:
# Hyperparams - beta from the paper, T is half the paper due to problem simplicity, probably could be even smaller 
T = 500
beta_1 = 10e-4
beta_1000 = 0.02

Using: $x_{t} = x_0\sqrt{\bar{\alpha_t}} + \sqrt{1-\bar{\alpha_t}}\epsilon$
Create a function which takes our x_0 batch (well, because this is small its the whole dataset), computes x_t, and adds on our time/position embedding.
Also, let's make a utility to create a batch of training data.

In [7]:
Image('images/sd_algo1.png')
Out[7]:
In [8]:
def augment_data(x_0, alpha_bar_t, noise, t):
    assert x_0.shape == noise.shape
    x_t = np.sqrt(alpha_bar_t)*x_0 + np.sqrt(1-alpha_bar_t)*noise
    return np.hstack([x_t, np.cos(t.reshape(-1,1)/T), np.sin(t.reshape(-1,1)/T)])
In [9]:
lns = LinearNoiseScheduler(beta_1, beta_1000, T)
In [10]:
def get_batch(x0):
    n_times_per_batch = 5 # Replicate our full dataset this many times in each batch.
    input_data = []
    targets = []
    for i in range(n_times_per_batch):
        t = np.random.randint(1, T, size=(data.shape[0],))
        alpha_bar_ts = np.array([lns.get_alpha_bar(x) for x in t]).reshape(-1,1)
        eps = np.random.normal(0, 1,size=data.shape)
        new_data = augment_data(x0, alpha_bar_ts, eps, t)
        input_data.append(new_data)
        targets.append(eps)
        
    X = np.concatenate(input_data)
    y = np.concatenate(targets)
    return X,y

Now we are ready for our training loop. Nothing fancy here, just squared error loss as per algorithm 1 above.

In [11]:
model = Model(2).to("cuda")
optim = torch.optim.Adam(model.parameters(), lr=0.01)
loss_f = torch.nn.MSELoss()
losses = []
val_losses = []
for i in range(80000):
    X,y = get_batch(data)
    X = torch.tensor(X, dtype=torch.float32).to("cuda")
    y = torch.tensor(y, dtype=torch.float32).to("cuda")
    y_pred = model(X)
    loss = loss_f(y_pred, y)
    optim.zero_grad()
    loss.backward()
    optim.step()
    
    if i % 10 == 0 :
        with torch.no_grad():
            X_val,y_val = get_batch(data_validation)
            X_val = torch.tensor(X_val, dtype=torch.float32).to("cuda")
            y_val = torch.tensor(y_val, dtype=torch.float32).to("cuda")
            y_pred_val = model(X_val)
            val_loss = loss_f(y_pred_val, y_val)
            val_losses.append(val_loss.cpu().detach().numpy())
    losses.append(loss.cpu().detach().numpy())
In [12]:
import matplotlib.pyplot as plt
In [13]:
import pandas as pd
In [14]:
loss_ts = pd.Series(losses)
loss_val_ts = pd.Series(val_losses)
In [15]:
loss_ts.rolling(1000).mean().plot(ylim=[0.24, 0.26], title='Training loss'); # clipping to ignore the first large values skewing the picture.
In [16]:
loss_val_ts.rolling(1000).mean().plot(ylim=[0.24, 0.26], title="Validation Loss");

Ok, great. We're happy this has converged nicely, and doesn't seem overfit particularly. Let's generate some samples!

In [17]:
Image('images/sd_algo2.png')
Out[17]:

As we can see from the above, the algorithm is again quite simple. Here is a really basic implementation in numpy, probably we could vectorise this, but again its not a huge problem, the samples are pretty small.

In [40]:
samples = []
seeds = []

x_t = np.random.normal(size=(1000,DATA_VECTOR_SIZE))
seeds.append(x_t)
for t in np.arange(T, 0, -1, dtype=int):
    t_tmp = np.array([t]*1000, dtype=np.float32).reshape(-1,1)
    x_inp = torch.tensor(np.concatenate([x_t, np.cos(t_tmp/T), np.sin(t_tmp/T)], axis=1), dtype=torch.float32).to("cuda")
    n_pred = model(x_inp)
    n_pred = n_pred.cpu().detach().numpy()
    x_t_1 = (x_t - n_pred*(1-lns.get_alpha(t))/np.sqrt(1-lns.get_alpha_bar(t)))/np.sqrt(lns.get_alpha(t))
    if t>1:
        x_t_1 += np.sqrt(lns.get_variance(t))*np.random.normal(size=(1000,DATA_VECTOR_SIZE))
    x_t = x_t_1

samples.append(x_t_1)
In [41]:
samples = np.concatenate(samples)
seeds = np.concatenate(seeds)

Generated Samples

Let's just eyeball a few samples.

In [42]:
np.set_printoptions(suppress=True)
samples[:5]
Out[42]:
array([[-1.13064079,  1.84324011],
       [ 1.93921035,  4.75053466],
       [-0.75609775,  2.50448519],
       [ 1.78132403,  4.86578814],
       [-0.17684033,  2.85941363]])
In [47]:
seeds[:5]
Out[47]:
array([[ 0.56978774,  0.43987803],
       [ 0.08676604, -0.72642374],
       [ 0.85984303, -1.86024345],
       [-0.03611226, -0.75957632],
       [ 0.56361097, -0.01211315]])

Looks pretty good to the structure we wanted to learn - the second entry is 3 larger than the first.

The real test is how does this diff look, and what does the marginal distribution of the first entry look like?

In [44]:
diffs = samples[:,1]-samples[:,0]
In [45]:
pd.Series(diffs.ravel()).plot(kind='kde')
plt.xlim([2,4])
plt.axvline(3, c='r', linestyle='--');
In [46]:
pd.Series(samples[:,0].ravel()).plot(kind='kde');
In [48]:
trajectory = []
x_t = np.random.normal(size=(1,DATA_VECTOR_SIZE))

for t in np.arange(T, 0, -1, dtype=int):
    trajectory.append(x_t)
    t_tmp = np.array([t], dtype=np.float32).reshape(-1,1)
    x_inp = torch.tensor(np.concatenate([x_t, np.cos(t_tmp/T), np.sin(t_tmp/T)], axis=1), dtype=torch.float32).to("cuda")
    n_pred = model(x_inp)
    n_pred = n_pred.cpu().detach().numpy()
    x_t_1 = (x_t - n_pred*(1-lns.get_alpha(t))/np.sqrt(1-lns.get_alpha_bar(t)))/np.sqrt(lns.get_alpha(t))
    if t>1:
        x_t_1 += np.sqrt(lns.get_variance(t))*np.random.normal(size=(1,DATA_VECTOR_SIZE))
    x_t = x_t_1

trajectory.append(x_t_1)
In [50]:
trajectory = np.concatenate(trajectory)
In [54]:
trajectory[0]
Out[54]:
array([-2.08350182,  0.58508687])
In [55]:
trajectory[-1]
Out[55]:
array([-0.02944257,  2.91621589])
In [127]:
plt.plot(trajectory[:,0], trajectory[:,1], zorder=1, linewidth=0.7, c='green')
plt.scatter(trajectory[0, 0], trajectory[0,1], marker='x', c='black',zorder=2)
plt.scatter(trajectory[-1, 0], trajectory[-1,1], marker='x', c='r', zorder=2);
In [133]:
plt.plot(trajectory[:,0], trajectory[:,1], zorder=1, linewidth=0.7, c='green')
plt.scatter(trajectory[0, 0], trajectory[0,1], marker='x', c='black',zorder=2)
plt.scatter(trajectory[-1, 0], trajectory[-1,1], marker='x', c='r', zorder=2);
plt.xlim([-0.2,0.2])
plt.ylim([2.8,3.2])
Out[133]:
(2.8, 3.2)
In [108]:
X,Y = np.arange(-1,1, 0.1).reshape(-1,1), np.arange(-1,1, 0.1).reshape(1,-1)
In [109]:
X,Y = np.meshgrid(X,Y)
In [110]:
x,y = X.reshape(-1,1),X.reshape(-1,1)
In [111]:
seeds = np.hstack([x,y])
In [112]:
x_t = seeds
for t in np.arange(T, 0, -1, dtype=int):
    t_tmp = np.array([t]*len(seeds), dtype=np.float32).reshape(-1,1)
    x_inp = torch.tensor(np.concatenate([x_t, np.cos(t_tmp/T), np.sin(t_tmp/T)], axis=1), dtype=torch.float32).to("cuda")
    n_pred = model(x_inp)
    n_pred = n_pred.cpu().detach().numpy()
    x_t_1 = (x_t - n_pred*(1-lns.get_alpha(t))/np.sqrt(1-lns.get_alpha_bar(t)))/np.sqrt(lns.get_alpha(t))
    if t>1:
        x_t_1 += np.sqrt(lns.get_variance(t))*np.random.normal(size=(len(seeds),DATA_VECTOR_SIZE))
    x_t = x_t_1
In [124]:
z = x_t[:,0].reshape(20,20)
In [125]:
from matplotlib import cm
In [128]:
plt.contourf(X,Y, z, levels=np.arange(-8,8,0.01))
plt.colorbar();
In [129]:
fig, ax = plt.subplots(subplot_kw={"projection":"3d"})
ax.plot_surface(X,Y, z, cmap=cm.coolwarm,
                       linewidth=0, antialiased=False);

What I find interesting here is that the learned representations don't seem particularly "close" - compare with A previous toy example on adversarial variation bayes, where the unit circle was basically divided into quadrants.

I'll need to think on why this might be!

Summary

Although the theory of diffusion models is quite dense, actually what comes out at the end is, in essence, quite simple.

We can implement the core logic in a hundred or so lines of code, and play around easily on toy problems.

In this toy example, we were able to visualise the evolution of the latent space and the structure of learned representations. Comparing with the VAE and AVB examples from a few years ago, its clear there is some substantial differences here. Maybe one to explore more deeply in another post.