Skip to content

Commit

Permalink
first commit of the FeatureCP code
Browse files Browse the repository at this point in the history
  • Loading branch information
AlvinWen428 committed Mar 30, 2023
1 parent 12e28bb commit 3fff13a
Show file tree
Hide file tree
Showing 31 changed files with 296,480 additions and 0 deletions.
Binary file added ckpt/.DS_Store
Binary file not shown.
Binary file added ckpt/cityscapes/.DS_Store
Binary file not shown.
Binary file added conformal/.DS_Store
Binary file not shown.
Empty file added conformal/__init__.py
Empty file.
140 changes: 140 additions & 0 deletions conformal/fcn_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
from collections import OrderedDict
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models.segmentation.segmentation import model_urls, load_state_dict_from_url
from torchvision.models import resnet
from torchvision.models._utils import IntermediateLayerGetter
import random
import os


class FCNHead(nn.Sequential):
def __init__(self, img_shape, in_channels, channels):
inter_channels = in_channels // 4
layers = [
nn.Conv2d(in_channels, inter_channels, 3, padding=1, bias=False),
nn.BatchNorm2d(inter_channels),
nn.ReLU(),
nn.Dropout(0.1),
nn.Conv2d(inter_channels, channels, 1)
]
super(FCNHead, self).__init__(*layers)
self.img_shape = img_shape
self.in_channels, self.channels = in_channels, channels
self.num_classes = channels
self.feature_shape = (2048, self.img_shape[0] // 8, self.img_shape[1] // 8)

def forward(self, inputs):
inputs = inputs.view(inputs.shape[0], *self.feature_shape)
output = super(FCNHead, self).forward(inputs)
return output


class FCN(nn.Module):
def __init__(self, img_shape, backbone, classifier):
super(FCN, self).__init__()
self.img_shape = img_shape
# remove the first dimension "unlabeled"
self.out_shape = (classifier.num_classes - 1) * img_shape[0] * img_shape[1]
self.encoder = backbone
self.g = classifier

@staticmethod
def output_post_process(output, img_shape):
output = F.interpolate(output, size=img_shape, mode='bilinear', align_corners=False)
# remove the first dimension "unlabeled"
output = output[:, 1:, ...]
output = output.view(output.shape[0], -1)
return output

def forward(self, x):
features = self.encoder(x)
output = self.g(features)
output = self.output_post_process(output, img_shape=self.img_shape)
return output


class MyIntermediateLayerGetter(IntermediateLayerGetter):
def __init__(self, model, return_layers: str):
super(MyIntermediateLayerGetter, self).__init__(model, {return_layers: 'out'})
self.return_layers = return_layers

def forward(self, x):
for name, module in self.items():
x = module(x)
if name == self.return_layers:
out = x
out = out.view(out.shape[0], -1)
return out


def enet_weighing(dataloader, num_classes, c=1.02):
"""Computes class weights as described in the ENet paper:
w_class = 1 / (ln(c + p_class)),
where c is usually 1.02 and p_class is the propensity score of that
class:
propensity_score = freq_class / total_pixels.
References: https://arxiv.org/abs/1606.02147
Keyword arguments:
- dataloader (``data.Dataloader``): A data loader to iterate over the
dataset.
- num_classes (``int``): The number of classes.
- c (``int``, optional): AN additional hyper-parameter which restricts
the interval of values for the weights. Default: 1.02.
"""
class_count = 0
total = 0
for _, label, _ in dataloader:
label = label.cpu().numpy()

# Flatten label
flat_label = label.flatten()

# Sum up the number of pixels of each class and the total pixel
# counts for each label
class_count += np.bincount(flat_label, minlength=num_classes)
total += flat_label.size

# Compute propensity score and then the weights for each class
propensity_score = class_count / total
class_weights = 1 / (np.log(c + propensity_score))

return class_weights


def _segm_resnet(backbone_name, img_shape, num_classes, pretrained_backbone=True):
backbone = resnet.__dict__[backbone_name](
pretrained=pretrained_backbone,
replace_stride_with_dilation=[False, True, True])

return_layers = 'layer4'
backbone = MyIntermediateLayerGetter(backbone, return_layers=return_layers)

inplanes = 2048
classifier = FCNHead(img_shape, inplanes, num_classes)

model = FCN(img_shape, backbone, classifier)
return model


def make_fcn(pretrained=False, progress=True, img_shape=(512, 1024), num_classes=21, **kwargs):
model = _segm_resnet('resnet50', img_shape, num_classes, **kwargs)
if pretrained:
arch = 'fcn_resnet50_coco'
model_url = model_urls[arch]
if model_url is None:
raise NotImplementedError('pretrained {} is not supported as of now'.format(arch))
else:
state_dict = load_state_dict_from_url(model_url, progress=progress)
del state_dict['classifier.4.weight']
del state_dict['classifier.4.bias']
model.load_state_dict(state_dict, strict=False)
return model
255 changes: 255 additions & 0 deletions conformal/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
import sys
import copy
import torch
import numpy as np
import torch.nn as nn
import abc
import ipdb
from sklearn.base import BaseEstimator
from sklearn.model_selection import train_test_split
from functools import partial

from conformal.icp import IcpRegressor


def run_icp(nc, X_train, y_train, X_test, idx_train, idx_cal, significance, condition=None):
""" Run split conformal method
Parameters
----------
nc : class of nonconformist object
X_train : numpy array, training features (n1Xp)
y_train : numpy array, training labels (n1)
X_test : numpy array, testing features (n2Xp)
idx_train : numpy array, indices of proper training set examples
idx_cal : numpy array, indices of calibration set examples
significance : float, significance level (e.g. 0.1)
condition : function, mapping feature vector to group id
Returns
-------
y_lower : numpy array, estimated lower bound for the labels (n2)
y_upper : numpy array, estimated upper bound for the labels (n2)
"""
icp = IcpRegressor(nc, condition=condition)
icp.fit(X_train[idx_train, :], y_train[idx_train])
icp.calibrate(X_train[idx_cal, :], y_train[idx_cal])
predictions = icp.predict(X_test, significance=significance)

y_lower = predictions[:, 0]
y_upper = predictions[:, 1]
return y_lower, y_upper


class BaseModelAdapter(BaseEstimator):
__metaclass__ = abc.ABCMeta

def __init__(self, model, fit_params=None):
super(BaseModelAdapter, self).__init__()

self.model = model
self.last_x, self.last_y = None, None
self.clean = False
self.fit_params = {} if fit_params is None else fit_params

def fit(self, x, y):
self.model.fit(x, y, **self.fit_params)
self.clean = False

def predict(self, x):
if (not self.clean or
self.last_x is None or
self.last_y is None or
not np.array_equal(self.last_x, x)
):
self.last_x = x
self.last_y = self._underlying_predict(x)
self.clean = True

return self.last_y.copy()

@abc.abstractmethod
def _underlying_predict(self, x):
pass


class RegressorAdapter(BaseModelAdapter):
def __init__(self, model, fit_params=None):
super(RegressorAdapter, self).__init__(model, fit_params)

def _underlying_predict(self, x):
return self.model.predict(x)


class MSENet_RegressorAdapter(RegressorAdapter):
def __init__(self, model, device, fit_params=None, in_shape=1, out_shape=1, hidden_size=1,
learn_func=torch.optim.Adam, epochs=1000, batch_size=10,
dropout=0.1, lr=0.01, wd=1e-6, test_ratio=0.2, random_state=0):
super(MSENet_RegressorAdapter, self).__init__(model, fit_params)
self.epochs = epochs
self.batch_size = batch_size
self.dropout = dropout
self.lr = lr
self.wd = wd
self.test_ratio = test_ratio
self.random_state = random_state
self.device = device
if model is None:
self.model = mse_model(in_shape=in_shape, out_shape=out_shape, hidden_size=hidden_size, dropout=dropout)
self.loss_func = torch.nn.MSELoss()
self.learner = LearnerOptimized(self.model, partial(learn_func, lr=lr, weight_decay=wd),
self.loss_func, device=device, test_ratio=self.test_ratio, random_state=self.random_state)

self.hidden_size = hidden_size
self.in_shape = in_shape
self.learn_func = learn_func

def fit(self, x, y):
self.learner.fit(x, y, self.epochs, batch_size=self.batch_size, verbose=True)

def predict(self, x):
return self.learner.predict(x)


def epoch_internal_train(model, loss_func, x_train, y_train, batch_size, optimizer, cnt=0, best_cnt=np.Inf):
model.train()
shuffle_idx = np.arange(x_train.shape[0])
np.random.shuffle(shuffle_idx)
x_train = x_train[shuffle_idx]
y_train = y_train[shuffle_idx]
epoch_losses = []
for idx in range(0, x_train.shape[0], batch_size):
cnt = cnt + 1
optimizer.zero_grad()
batch_x = x_train[idx: min(idx + batch_size, x_train.shape[0]), :]
batch_y = y_train[idx: min(idx + batch_size, y_train.shape[0])]
preds = model(batch_x)
loss = loss_func(preds, batch_y)
loss.backward()
optimizer.step()
epoch_losses.append(loss.cpu().detach().numpy())

if cnt >= best_cnt:
break

epoch_loss = np.mean(epoch_losses)

return epoch_loss, cnt


class mse_model(nn.Module):
def __init__(self, in_shape=1, out_shape=1, hidden_size=64, dropout=0.5):

super().__init__()
self.in_shape = in_shape
self.out_shape = out_shape
self.hidden_size = hidden_size
self.dropout = dropout
self.build_model()
self.init_weights()

def build_model(self):
self.encoder = nn.Sequential(
nn.Linear(self.in_shape, self.hidden_size),
nn.ReLU(), nn.Dropout(self.dropout),
nn.Linear(self.hidden_size, self.hidden_size), nn.ReLU()
)
self.g = nn.Sequential(
nn.Linear(self.hidden_size, self.hidden_size), nn.ReLU(),
nn.Dropout(self.dropout),
nn.Linear(self.hidden_size, self.out_shape))

self.base_model = nn.Sequential(self.encoder, self.g)

def init_weights(self):
for m in self.base_model:
if isinstance(m, nn.Linear):
nn.init.orthogonal_(m.weight)
nn.init.constant_(m.bias, 0)

def forward(self, x):
return torch.squeeze(self.base_model(x))


class LearnerOptimized:
def __init__(self, model, optimizer_class, loss_func, device='cpu', test_ratio=0.2, random_state=0):
self.model = model.to(device)
self.optimizer_class = optimizer_class
self.optimizer = optimizer_class(self.model.parameters())
self.loss_func = loss_func.to(device)
self.device = device
self.test_ratio = test_ratio
self.random_state = random_state
self.loss_history = []
self.test_loss_history = []
self.full_loss_history = []

def fit(self, x, y, epochs, batch_size, verbose=False):
sys.stdout.flush()
model = copy.deepcopy(self.model)
model = model.to(self.device)
optimizer = self.optimizer_class(model.parameters())
best_epoch = epochs

x_train, xx, y_train, yy = train_test_split(x, y, test_size=self.test_ratio, random_state=self.random_state)

x_train = torch.from_numpy(x_train).float().to(self.device).requires_grad_(False)
xx = torch.from_numpy(xx).float().to(self.device).requires_grad_(False)
y_train = torch.from_numpy(y_train).float().to(self.device).requires_grad_(False)
yy = torch.from_numpy(yy).float().to(self.device).requires_grad_(False)

best_cnt = 1e10
best_test_epoch_loss = 1e10

cnt = 0
for e in range(epochs):
epoch_loss, cnt = epoch_internal_train(model, self.loss_func, x_train, y_train, batch_size, optimizer, cnt)
self.loss_history.append(epoch_loss)

# test
model.eval()
preds = model(xx)
test_epoch_loss = self.loss_func(preds, yy).cpu().detach().numpy()
self.test_loss_history.append(test_epoch_loss)

test_preds = preds.cpu().detach().numpy()
test_preds = np.squeeze(test_preds)

if (test_epoch_loss <= best_test_epoch_loss):
best_test_epoch_loss = test_epoch_loss
best_epoch = e
best_cnt = cnt

if (e + 1) % 100 == 0 and verbose:
print("CV: Epoch {}: Train {}, Test {}, Best epoch {}, Best loss {}".format(e + 1, epoch_loss,
test_epoch_loss, best_epoch, best_test_epoch_loss))
sys.stdout.flush()

# use all the data to train the model, for best_cnt steps
x = torch.from_numpy(x).float().to(self.device).requires_grad_(False)
y = torch.from_numpy(y).float().to(self.device).requires_grad_(False)

cnt = 0
for e in range(best_epoch + 1):
if cnt > best_cnt:
break

epoch_loss, cnt = epoch_internal_train(self.model, self.loss_func, x, y, batch_size,
self.optimizer, cnt, best_cnt)
self.full_loss_history.append(epoch_loss)

if (e + 1) % 100 == 0 and verbose:
print("Full: Epoch {}: {}, cnt {}".format(e + 1, epoch_loss, cnt))
sys.stdout.flush()

def predict(self, x):
self.model.eval()
if isinstance(x, torch.Tensor):
x = x.to(self.device).requires_grad_(False)
else:
x = torch.from_numpy(x).to(self.device).requires_grad_(False)
ret_val = self.model(x).cpu().detach().numpy()
return ret_val


Loading

0 comments on commit 3fff13a

Please sign in to comment.