import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
torch.manual_seed(0)
# Parameters
n = 1000
true_beta = torch.tensor([2.0]) # coefficient on x
true_alpha = torch.tensor([1.0]) # constant for option 1
# Simulate regressor
x = torch.randn(n, 1)
# Utilities with Gumbel noise
def sample_gumbel(shape):
u = torch.rand(shape)
return -torch.log( - torch.log(u + 1e-8) + 1e-8)
v0 = x * torch.zeros(1) + sample_gumbel((n, 1)) # base option, no constant
v1 = x * true_beta + true_alpha + sample_gumbel((n, 1)) # option 1
# Choice: 1 if option 1 is better
y = torch.where(v1 > v0, 1, 0)
print(true_beta)
tensor([2.])
class LogitModel(nn.Module):
def __init__(self):
super().__init__()
self.beta = nn.Parameter(torch.randn(1) ) # weight on x
self.alpha = nn.Parameter(torch.randn(1)) # constant for option 1
def forward(self, x):
utility1 = self.alpha + x * self.beta
utility0 = torch.zeros_like(utility1)
return -torch.log( 1 + torch.exp( utility0 - utility1))
model = LogitModel()
optimizer = optim.Adam(model.parameters(), lr=0.05)
for epoch in range(300):
log_prob = model(x)
loss = -torch.mean(y * log_prob + (1 - y) * torch.log(1 - torch.exp(log_prob) + 1e-8))
optimizer.zero_grad()
loss.backward()
optimizer.step()
if epoch % 50 == 0:
print(f"Epoch {epoch} | Loss: {loss.item():.4f}")
print("\nEstimated beta and alpha:")
print("beta:", model.beta.item(), "| true:", true_beta.item())
print("alpha:", model.alpha.item(), "| true:", true_alpha.item())
Epoch 0 | Loss: 0.5152 Epoch 50 | Loss: 0.4254 Epoch 100 | Loss: 0.4250 Epoch 150 | Loss: 0.4250 Epoch 200 | Loss: 0.4250 Epoch 250 | Loss: 0.4250 Estimated beta and alpha: beta: 2.024514675140381 | true: 2.0 alpha: 1.0754637718200684 | true: 1.0
We explore how to use the power of neural nets and backpropagation to estimate distributional model. We can start simple and consider directly fitting a density.
We simulate a simple density (bi-modal) or not and try to use a Normalizing flow.
Consider draws $X$ from some density $f_X(x)$. Let's say we want to learn $f_X(x)$ from the data.
A Naive proposal¶
We have seen how to estimate conditional means using neurale nets, and we have seen that MLE is simply changing the loss to KL. Why not do the following:
- construct a flexible function $g(x)$ using a neural net
- minimize the mean - log of $g(X)$
- we should get $g(x) = f(x)$, why or why not?
The answer is that we can make the log likelihood infinity simply by choosing $g(x)=0$. What is the catch? Well need to ensure that $g$ remains a density.
What we would actually need¶
We know the formula for the density of the function of a random variable. Consider a base density, a joint independent standard normal, and then consider a transformation of this function.
Normalizing flows¶
Normalizing Flows are a technique in probabilistic modeling that transforms a simple base distribution (like a standard normal) into a more complex distribution using a sequence of invertible and differentiable transformations.
Core Idea:¶
Let:
$z \sim p_Z(z)$: a sample from a simple base distribution
$x = f(z)$: a transformation (possibly a sequence) that maps $z$ to the target space
Using the change of variables formula, we can write the transformed density: $$ p_X(x) = p_Z(f^{-1}(x)) \left| \det \left( \frac{\partial f^{-1}(x)}{\partial x} \right) \right| $$
Or equivalently:
$$ \log p_X(x) = \log p_Z(z) - \log \left| \det \left( \frac{\partial f^{-1}(z)}{\partial z} \right) \right| $$
Why it's useful:¶
Flexible densities : Can approximate complicated distributions by stacking simple transformations.
Inference & generation : Easy to sample (just apply $f$) and compute likelihoods (using the Jacobian determinant).
Applications : Variational inference, density estimation, generative modeling (e.g., Glow, RealNVP), and more.
Types of flows:¶
Affine flows (simple scaling + shifting)
Autoregressive flows (e.g., MAF)
Spline flows (piecewise-polynomial transformations)
Planar flows:¶
Planar Flow is one of the simplest types of normalizing flows, introduced in Rezende & Mohamed (2015). It adds a single non-linear transformation to a base latent variable $$z$$, making the overall distribution more flexible while keeping the transformation invertible and the Jacobian tractable.
The Transformation Given a latent variable $z \in \mathbb{R}^D$, the planar flow transforms it as: $$ f(z) = z + u \cdot h(w^\top z + b) $$
Where:
$u \in \mathbb{R}^D$, $w \in \mathbb{R}^D$, $b \in \mathbb{R}$: learnable parameters
$h(\cdot)$: an element-wise nonlinearity (e.g., $\tanh$)
So you're "bending" the space slightly in a direction controlled by $u$, depending on a projection $w^\top z$.
Log-Det Jacobian Term
To compute the change of variables, we need the Jacobian determinant:
$$ \log \left| \det \left( \frac{\partial f}{\partial z} \right) \right| = \log \left| 1 + u^\top \psi(z) \right| $$
Where:
$$ \psi(z) = h'(w^\top z + b) \cdot w $$
This keeps the flow tractable because:
The Jacobian is a rank-1 update of the identity matrix: $I + u \psi^\top$
The determinant can be computed efficiently using the matrix determinant lemma
Intuition
Planar flows bend the space around a hyperplane.
They are universal approximators when stacked — many small deformations build up a rich, flexible distribution.
But one planar flow only changes the density in a "thin slice" — not very expressive alone.
import torch
import matplotlib.pyplot as plt
from nflows.distributions.normal import StandardNormal
from nflows.flows.base import Flow
from nflows.transforms import MaskedAffineAutoregressiveTransform
from nflows.transforms.autoregressive import MaskedPiecewiseRationalQuadraticAutoregressiveTransform,MaskedPiecewiseCubicAutoregressiveTransform
from nflows.transforms.permutations import ReversePermutation
from nflows.transforms.base import CompositeTransform
from nflows.transforms import AffineTransform
from torch.distributions import Uniform
from nflows.distributions.base import Distribution
import torch
import random
import numpy as np
def set_seed(seed: int):
torch.manual_seed(seed)
random.seed(seed)
np.random.seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed) # for multi-GPU
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
set_seed(43)
# Generate synthetic 1D data from a bimodal distribution
def sample_data(n_samples=1000):
z = torch.randn(n_samples)
return torch.cat([(z - 2), (1.5*z + 2.0)]) / 2.0
data = sample_data().unsqueeze(1) # Shape (N, 1)
features = 1
n_layers = 1
hidden_features = 8
# Define a flow: 5 MADE layers + permutations
transforms = []
for _ in range(1):
# transforms.append(ReversePermutation(features=1))
# transforms.append( MaskedAffineAutoregressiveTransform(features=1, hidden_features=8))
transforms.append( MaskedPiecewiseRationalQuadraticAutoregressiveTransform(
features=features,
hidden_features=hidden_features,
num_bins=8,
tails='linear',
tail_bound=5.0
))
flow_transform = CompositeTransform(transforms)
base_dist = StandardNormal([1])
flow = Flow(flow_transform, base_dist)
# Optimizer
optimizer = torch.optim.Adam(flow.parameters(), lr=1e-3)
# Training loop
for epoch in range(1001):
optimizer.zero_grad()
loss = -flow.log_prob(data).mean() # Maximize log likelihood
loss.backward()
optimizer.step()
if epoch % 200 == 0:
print(f"Epoch {epoch}, NLL: {loss.item():.4f}")
# Plot learned density
x = torch.linspace(-5, 5, 1000).unsqueeze(1)
with torch.no_grad():
log_prob = flow.log_prob(x)
prob = torch.exp(log_prob)
plt.hist(data.numpy(), bins=50, density=True, alpha=0.5, label="Data histogram")
plt.plot(x.numpy(), prob.numpy(), label="Learned density", color='red')
plt.legend()
plt.title("1D Density Learned by Normalizing Flow")
plt.show()
Epoch 0, NLL: 2.0672 Epoch 200, NLL: 1.4957 Epoch 400, NLL: 1.4850 Epoch 600, NLL: 1.4810 Epoch 800, NLL: 1.4805 Epoch 1000, NLL: 1.4802
Wasserstein¶
We look at a seperate method, that uses a different distance between densities.
# Plotting with colored area under the KL integrand
fig, axs = plt.subplots(2, 1, figsize=(10, 6), sharex=True)
# Plot the densities
axs[0].plot(x, f_pdf, label='f(x): N(0,1)', lw=2)
axs[0].plot(x, g_pdf, label='g(x): N(2,1)', lw=2)
axs[0].set_title('Densities f(x) and g(x)')
axs[0].legend()
# Plot the KL integrand with colored fill
axs[1].plot(x, kl_integrand, color='purple', lw=2, label='KL integrand')
# Fill above and below 0 differently
axs[1].fill_between(x, 0, kl_integrand, where=kl_integrand >= 0, interpolate=True, color='purple', alpha=0.3, label='Positive Contribution')
axs[1].fill_between(x, 0, kl_integrand, where=kl_integrand < 0, interpolate=True, color='red', alpha=0.3, label='Negative (should be zero)')
axs[1].axhline(0, color='black', lw=1, linestyle='--')
axs[1].set_title(r'KL Divergence Integrand: $f(x) \log \frac{f(x)}{g(x)}$')
axs[1].set_xlabel('x')
axs[1].legend()
plt.tight_layout()
plt.show()
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
# Define the x range
x = np.linspace(-4, 8, 1000)
# Define the two distributions: f ~ N(0,1), g ~ N(2,1)
f_cdf = norm(loc=0, scale=1).cdf(x)
g_cdf = norm(loc=2, scale=1).cdf(x)
# Compute the absolute difference between the CDFs (transport distance)
transport_distance = np.abs(f_cdf - g_cdf)
# Plot the CDFs and the area between them
fig, ax = plt.subplots(figsize=(10, 5))
# Plot CDFs
ax.plot(x, f_cdf, label='F(x): CDF of N(0,1)', lw=2)
ax.plot(x, g_cdf, label='G(x): CDF of N(2,1)', lw=2)
# Fill area between the CDFs
ax.fill_between(x, f_cdf, g_cdf, color='orange', alpha=0.4, label='Transport Distance')
# Labels and legend
ax.set_title('Earth Mover’s Distance (Wasserstein-1) between CDFs of N(0,1) and N(2,1)')
ax.set_ylabel('Cumulative Probability')
ax.set_xlabel('x')
ax.legend()
plt.tight_layout()
plt.show()
Estimating a density using such distance and ML¶
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
from geomloss import SamplesLoss
# Define device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Target distribution: mixture of Gaussians
def sample_target(n):
choice = torch.rand(n) < 0.5
x1 = torch.randn(n) * 0.2 - 2
x2 = torch.randn(n) * 0.2 + 2
return torch.where(choice, x1, x2).unsqueeze(1).to(device)
# Base distribution: standard normal
def sample_base(n):
return torch.randn(n, 1).to(device)
# Neural net: simple MLP
class TransportNet(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(1, 32),
nn.ReLU(),
nn.Linear(32, 32),
nn.ReLU(),
nn.Linear(32, 1)
)
def forward(self, x):
return self.net(x)
# Initialize model and loss
model = TransportNet().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
sinkhorn_loss = SamplesLoss(loss="sinkhorn", p=2, blur=0.05)
# Training loop
n_iter = 500
batch_size = 512
for it in range(n_iter):
x_base = sample_base(batch_size).detach()
# x_target = sample_target(batch_size).detach()
x_target = sample_data(batch_size).unsqueeze(1).detach()
x_push = model(x_base)
loss = sinkhorn_loss(x_push, x_target)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if it % 100 == 0:
print(f"Iter {it:04d} | Sinkhorn Loss: {loss.item():.4f}")
Iter 0000 | Sinkhorn Loss: 0.6372 Iter 0100 | Sinkhorn Loss: 0.0222 Iter 0200 | Sinkhorn Loss: 0.0013 Iter 0300 | Sinkhorn Loss: 0.0037 Iter 0400 | Sinkhorn Loss: 0.0048
# Plot result
x_base = sample_base(1000)
x_push = model(x_base).detach().cpu().numpy()
x_target = sample_data(1000).unsqueeze(1).cpu().numpy()
plt.hist(x_push, bins=50, alpha=0.5, label="Learned",density=True)
plt.hist(x_target, bins=50, alpha=0.5, label="Target",density=True)
plt.legend()
plt.title("Sinkhorn Density Fitting")
plt.show()