mirror of
https://github.com/Sygil-Dev/sygil-webui.git
synced 2024-12-14 14:05:36 +03:00
253 lines
10 KiB
Python
253 lines
10 KiB
Python
from scipy import integrate
|
|
import torch
|
|
from tqdm.auto import trange, tqdm
|
|
import torch.nn as nn
|
|
|
|
|
|
def append_zero(x):
|
|
return torch.cat([x, x.new_zeros([1])])
|
|
|
|
|
|
def append_dims(x, target_dims):
|
|
"""Appends dimensions to the end of a tensor until it has target_dims dimensions."""
|
|
dims_to_append = target_dims - x.ndim
|
|
if dims_to_append < 0:
|
|
raise ValueError(f'input has {x.ndim} dims but target_dims is {target_dims}, which is less')
|
|
return x[(...,) + (None,) * dims_to_append]
|
|
|
|
def get_ancestral_step(sigma_from, sigma_to):
|
|
"""Calculates the noise level (sigma_down) to step down to and the amount
|
|
of noise to add (sigma_up) when doing an ancestral sampling step."""
|
|
sigma_up = (sigma_to ** 2 * (sigma_from ** 2 - sigma_to ** 2) / sigma_from ** 2) ** 0.5
|
|
sigma_down = (sigma_to ** 2 - sigma_up ** 2) ** 0.5
|
|
return sigma_down, sigma_up
|
|
|
|
|
|
class DiscreteSchedule(nn.Module):
|
|
"""A mapping between continuous noise levels (sigmas) and a list of discrete noise
|
|
levels."""
|
|
|
|
def __init__(self, sigmas, quantize):
|
|
super().__init__()
|
|
self.register_buffer('sigmas', sigmas)
|
|
self.quantize = quantize
|
|
|
|
def get_sigmas(self, n=None):
|
|
if n is None:
|
|
return append_zero(self.sigmas.flip(0))
|
|
t_max = len(self.sigmas) - 1
|
|
t = torch.linspace(t_max, 0, n, device=self.sigmas.device)
|
|
return append_zero(self.t_to_sigma(t))
|
|
|
|
def sigma_to_t(self, sigma, quantize=None):
|
|
quantize = self.quantize if quantize is None else quantize
|
|
dists = torch.abs(sigma - self.sigmas[:, None])
|
|
if quantize:
|
|
return torch.argmin(dists, dim=0).view(sigma.shape)
|
|
low_idx, high_idx = torch.sort(torch.topk(dists, dim=0, k=2, largest=False).indices, dim=0)[0]
|
|
low, high = self.sigmas[low_idx], self.sigmas[high_idx]
|
|
w = (low - sigma) / (low - high)
|
|
w = w.clamp(0, 1)
|
|
t = (1 - w) * low_idx + w * high_idx
|
|
return t.view(sigma.shape)
|
|
|
|
def t_to_sigma(self, t):
|
|
t = t.float()
|
|
low_idx, high_idx, w = t.floor().long(), t.ceil().long(), t.frac()
|
|
# print(low_idx, high_idx, w )
|
|
return (1 - w) * self.sigmas[low_idx] + w * self.sigmas[high_idx]
|
|
|
|
|
|
class DiscreteEpsDDPMDenoiser(DiscreteSchedule):
|
|
"""A wrapper for discrete schedule DDPM models that output eps (the predicted
|
|
noise)."""
|
|
|
|
def __init__(self, alphas_cumprod, quantize):
|
|
super().__init__(((1 - alphas_cumprod) / alphas_cumprod) ** 0.5, quantize)
|
|
self.sigma_data = 1.
|
|
|
|
def get_scalings(self, sigma):
|
|
c_out = -sigma
|
|
c_in = 1 / (sigma ** 2 + self.sigma_data ** 2) ** 0.5
|
|
return c_out, c_in
|
|
|
|
def get_eps(self, *args, **kwargs):
|
|
return self.inner_model(*args, **kwargs)
|
|
|
|
def forward(self, input, sigma, **kwargs):
|
|
c_out, c_in = [append_dims(x, input.ndim) for x in self.get_scalings(sigma)]
|
|
eps = self.get_eps(input * c_in, self.sigma_to_t(sigma), **kwargs)
|
|
return input + eps * c_out
|
|
|
|
class CompVisDenoiser(DiscreteEpsDDPMDenoiser):
|
|
"""A wrapper for CompVis diffusion models."""
|
|
|
|
def __init__(self, alphas_cumprod, quantize=False, device='cpu'):
|
|
super().__init__(alphas_cumprod, quantize=quantize)
|
|
|
|
def get_eps(self, *args, **kwargs):
|
|
return self.inner_model.apply_model(*args, **kwargs)
|
|
|
|
|
|
def to_d(x, sigma, denoised):
|
|
"""Converts a denoiser output to a Karras ODE derivative."""
|
|
return (x - denoised) / append_dims(sigma, x.ndim)
|
|
|
|
|
|
def get_ancestral_step(sigma_from, sigma_to):
|
|
"""Calculates the noise level (sigma_down) to step down to and the amount
|
|
of noise to add (sigma_up) when doing an ancestral sampling step."""
|
|
sigma_up = (sigma_to ** 2 * (sigma_from ** 2 - sigma_to ** 2) / sigma_from ** 2) ** 0.5
|
|
sigma_down = (sigma_to ** 2 - sigma_up ** 2) ** 0.5
|
|
return sigma_down, sigma_up
|
|
|
|
|
|
@torch.no_grad()
|
|
def sample_euler(model, x, sigmas, extra_args=None, callback=None, disable=None, s_churn=0., s_tmin=0., s_tmax=float('inf'), s_noise=1.):
|
|
"""Implements Algorithm 2 (Euler steps) from Karras et al. (2022)."""
|
|
extra_args = {} if extra_args is None else extra_args
|
|
s_in = x.new_ones([x.shape[0]])
|
|
for i in trange(len(sigmas) - 1, disable=disable):
|
|
gamma = min(s_churn / (len(sigmas) - 1), 2 ** 0.5 - 1) if s_tmin <= sigmas[i] <= s_tmax else 0.
|
|
eps = torch.randn_like(x) * s_noise
|
|
sigma_hat = sigmas[i] * (gamma + 1)
|
|
if gamma > 0:
|
|
x = x + eps * (sigma_hat ** 2 - sigmas[i] ** 2) ** 0.5
|
|
denoised = model(x, sigma_hat * s_in, **extra_args)
|
|
d = to_d(x, sigma_hat, denoised)
|
|
if callback is not None:
|
|
callback({'x': x, 'i': i, 'sigma': sigmas[i], 'sigma_hat': sigma_hat, 'denoised': denoised})
|
|
dt = sigmas[i + 1] - sigma_hat
|
|
# Euler method
|
|
x = x + d * dt
|
|
return x
|
|
|
|
|
|
|
|
@torch.no_grad()
|
|
def sample_euler_ancestral(model, x, sigmas, extra_args=None, callback=None, disable=None):
|
|
"""Ancestral sampling with Euler method steps."""
|
|
extra_args = {} if extra_args is None else extra_args
|
|
s_in = x.new_ones([x.shape[0]])
|
|
for i in trange(len(sigmas) - 1, disable=disable):
|
|
denoised = model(x, sigmas[i] * s_in, **extra_args)
|
|
sigma_down, sigma_up = get_ancestral_step(sigmas[i], sigmas[i + 1])
|
|
if callback is not None:
|
|
callback({'x': x, 'i': i, 'sigma': sigmas[i], 'sigma_hat': sigmas[i], 'denoised': denoised})
|
|
d = to_d(x, sigmas[i], denoised)
|
|
# Euler method
|
|
dt = sigma_down - sigmas[i]
|
|
x = x + d * dt
|
|
x = x + torch.randn_like(x) * sigma_up
|
|
return x
|
|
|
|
|
|
@torch.no_grad()
|
|
def sample_heun(model, x, sigmas, extra_args=None, callback=None, disable=None, s_churn=0., s_tmin=0., s_tmax=float('inf'), s_noise=1.):
|
|
"""Implements Algorithm 2 (Heun steps) from Karras et al. (2022)."""
|
|
extra_args = {} if extra_args is None else extra_args
|
|
s_in = x.new_ones([x.shape[0]])
|
|
for i in trange(len(sigmas) - 1, disable=disable):
|
|
gamma = min(s_churn / (len(sigmas) - 1), 2 ** 0.5 - 1) if s_tmin <= sigmas[i] <= s_tmax else 0.
|
|
eps = torch.randn_like(x) * s_noise
|
|
sigma_hat = sigmas[i] * (gamma + 1)
|
|
if gamma > 0:
|
|
x = x + eps * (sigma_hat ** 2 - sigmas[i] ** 2) ** 0.5
|
|
denoised = model(x, sigma_hat * s_in, **extra_args)
|
|
d = to_d(x, sigma_hat, denoised)
|
|
if callback is not None:
|
|
callback({'x': x, 'i': i, 'sigma': sigmas[i], 'sigma_hat': sigma_hat, 'denoised': denoised})
|
|
dt = sigmas[i + 1] - sigma_hat
|
|
if sigmas[i + 1] == 0:
|
|
# Euler method
|
|
x = x + d * dt
|
|
else:
|
|
# Heun's method
|
|
x_2 = x + d * dt
|
|
denoised_2 = model(x_2, sigmas[i + 1] * s_in, **extra_args)
|
|
d_2 = to_d(x_2, sigmas[i + 1], denoised_2)
|
|
d_prime = (d + d_2) / 2
|
|
x = x + d_prime * dt
|
|
return x
|
|
|
|
|
|
@torch.no_grad()
|
|
def sample_dpm_2(model, x, sigmas, extra_args=None, callback=None, disable=None, s_churn=0., s_tmin=0., s_tmax=float('inf'), s_noise=1.):
|
|
"""A sampler inspired by DPM-Solver-2 and Algorithm 2 from Karras et al. (2022)."""
|
|
extra_args = {} if extra_args is None else extra_args
|
|
s_in = x.new_ones([x.shape[0]])
|
|
for i in trange(len(sigmas) - 1, disable=disable):
|
|
gamma = min(s_churn / (len(sigmas) - 1), 2 ** 0.5 - 1) if s_tmin <= sigmas[i] <= s_tmax else 0.
|
|
eps = torch.randn_like(x) * s_noise
|
|
sigma_hat = sigmas[i] * (gamma + 1)
|
|
if gamma > 0:
|
|
x = x + eps * (sigma_hat ** 2 - sigmas[i] ** 2) ** 0.5
|
|
denoised = model(x, sigma_hat * s_in, **extra_args)
|
|
d = to_d(x, sigma_hat, denoised)
|
|
if callback is not None:
|
|
callback({'x': x, 'i': i, 'sigma': sigmas[i], 'sigma_hat': sigma_hat, 'denoised': denoised})
|
|
# Midpoint method, where the midpoint is chosen according to a rho=3 Karras schedule
|
|
sigma_mid = ((sigma_hat ** (1 / 3) + sigmas[i + 1] ** (1 / 3)) / 2) ** 3
|
|
dt_1 = sigma_mid - sigma_hat
|
|
dt_2 = sigmas[i + 1] - sigma_hat
|
|
x_2 = x + d * dt_1
|
|
denoised_2 = model(x_2, sigma_mid * s_in, **extra_args)
|
|
d_2 = to_d(x_2, sigma_mid, denoised_2)
|
|
x = x + d_2 * dt_2
|
|
return x
|
|
|
|
|
|
@torch.no_grad()
|
|
def sample_dpm_2_ancestral(model, x, sigmas, extra_args=None, callback=None, disable=None):
|
|
"""Ancestral sampling with DPM-Solver inspired second-order steps."""
|
|
extra_args = {} if extra_args is None else extra_args
|
|
s_in = x.new_ones([x.shape[0]])
|
|
for i in trange(len(sigmas) - 1, disable=disable):
|
|
denoised = model(x, sigmas[i] * s_in, **extra_args)
|
|
sigma_down, sigma_up = get_ancestral_step(sigmas[i], sigmas[i + 1])
|
|
if callback is not None:
|
|
callback({'x': x, 'i': i, 'sigma': sigmas[i], 'sigma_hat': sigmas[i], 'denoised': denoised})
|
|
d = to_d(x, sigmas[i], denoised)
|
|
# Midpoint method, where the midpoint is chosen according to a rho=3 Karras schedule
|
|
sigma_mid = ((sigmas[i] ** (1 / 3) + sigma_down ** (1 / 3)) / 2) ** 3
|
|
dt_1 = sigma_mid - sigmas[i]
|
|
dt_2 = sigma_down - sigmas[i]
|
|
x_2 = x + d * dt_1
|
|
denoised_2 = model(x_2, sigma_mid * s_in, **extra_args)
|
|
d_2 = to_d(x_2, sigma_mid, denoised_2)
|
|
x = x + d_2 * dt_2
|
|
x = x + torch.randn_like(x) * sigma_up
|
|
return x
|
|
|
|
|
|
def linear_multistep_coeff(order, t, i, j):
|
|
if order - 1 > i:
|
|
raise ValueError(f'Order {order} too high for step {i}')
|
|
def fn(tau):
|
|
prod = 1.
|
|
for k in range(order):
|
|
if j == k:
|
|
continue
|
|
prod *= (tau - t[i - k]) / (t[i - j] - t[i - k])
|
|
return prod
|
|
return integrate.quad(fn, t[i], t[i + 1], epsrel=1e-4)[0]
|
|
|
|
|
|
@torch.no_grad()
|
|
def sample_lms(model, x, sigmas, extra_args=None, callback=None, disable=None, order=4):
|
|
extra_args = {} if extra_args is None else extra_args
|
|
s_in = x.new_ones([x.shape[0]])
|
|
ds = []
|
|
for i in trange(len(sigmas) - 1, disable=disable):
|
|
denoised = model(x, sigmas[i] * s_in, **extra_args)
|
|
d = to_d(x, sigmas[i], denoised)
|
|
ds.append(d)
|
|
if len(ds) > order:
|
|
ds.pop(0)
|
|
if callback is not None:
|
|
callback({'x': x, 'i': i, 'sigma': sigmas[i], 'sigma_hat': sigmas[i], 'denoised': denoised})
|
|
cur_order = min(i + 1, order)
|
|
coeffs = [linear_multistep_coeff(cur_order, sigmas.cpu(), i, j) for j in range(cur_order)]
|
|
x = x + sum(coeff * d for coeff, d in zip(coeffs, reversed(ds)))
|
|
return x
|