-
Notifications
You must be signed in to change notification settings - Fork 1
/
vq.py
119 lines (99 loc) · 3.61 KB
/
vq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import gc
import time
from typing import Optional, Tuple
import numpy as np
import torch
from torch import nn
from torch_scatter import scatter
from tqdm import trange
from weighted_distance._C import weightedDistance
class VectorQuantize(nn.Module):
def __init__(
self,
channels: int,
codebook_size: int = 2**12,
decay: float = 0.5,
) -> None:
super().__init__()
self.decay = decay
self.codebook = nn.Parameter(
torch.empty(codebook_size, channels), requires_grad=False
)
nn.init.kaiming_uniform_(self.codebook)
self.entry_importance = nn.Parameter(
torch.zeros(codebook_size), requires_grad=False
)
self.eps = 1e-5
def uniform_init(self, x: torch.Tensor):
amin, amax = x.aminmax()
self.codebook.data = torch.rand_like(self.codebook) * (amax - amin) + amin
def update(self, x: torch.Tensor, importance: torch.Tensor) -> torch.Tensor:
with torch.no_grad():
min_dists, idx = weightedDistance(x.detach(), self.codebook.detach())
acc_importance = scatter(
importance, idx, 0, reduce="sum", dim_size=self.codebook.shape[0]
)
ema_inplace(self.entry_importance, acc_importance, self.decay)
codebook = scatter(
x * importance[:, None],
idx,
0,
reduce="sum",
dim_size=self.codebook.shape[0],
)
ema_inplace(
self.codebook,
codebook / (acc_importance[:, None] + self.eps),
self.decay,
)
return min_dists
def forward(
self,
x: torch.Tensor,
return_dists: bool = False,
) -> Tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor]]:
min_dists, idx = weightedDistance(x.detach(), self.codebook.detach())
if return_dists:
return self.codebook[idx], idx, min_dists
else:
return self.codebook[idx], idx
def ema_inplace(moving_avg: torch.Tensor, new: torch.Tensor, decay: float):
moving_avg.data.mul_(decay).add_(new, alpha=(1 - decay))
def vq_features(
features: torch.Tensor,
importance: torch.Tensor,
codebook_size: int,
vq_chunk: int = 2**16,
steps: int = 1000,
decay: float = 0.8,
scale_normalize: bool = False,
) -> Tuple[torch.Tensor, torch.Tensor]:
'''
borrowed from c3dgs, check: https://arxiv.org/abs/2401.02436
'''
importance_n = importance/importance.max()
vq_model = VectorQuantize(
channels=features.shape[-1],
codebook_size=codebook_size,
decay=decay,
).to(device=features.device)
vq_model.uniform_init(features)
errors = []
for i in trange(steps):
batch = torch.randint(low=0, high=features.shape[0], size=[vq_chunk])
vq_feature = features[batch]
error = vq_model.update(vq_feature, importance=importance_n[batch]).mean().item()
errors.append(error)
if scale_normalize:
# this computes the trace of the codebook covariance matrices
# we devide by the trace to ensure that matrices have normalized eigenvalues / scales
tr = vq_model.codebook[:, [0, 3, 5]].sum(-1)
vq_model.codebook /= tr[:, None]
gc.collect()
torch.cuda.empty_cache()
start = time.time()
_, vq_indices = vq_model(features)
torch.cuda.synchronize(device=vq_indices.device)
end = time.time()
print(f"calculating indices took {end-start} seconds ")
return vq_model.codebook.data.detach(), vq_indices.detach()