Skip to content

Commit

Permalink
new conditionning
Browse files Browse the repository at this point in the history
  • Loading branch information
arimousa committed Apr 11, 2023
1 parent 0cbf8ea commit 37820a3
Show file tree
Hide file tree
Showing 15 changed files with 1,402 additions and 720 deletions.
53 changes: 33 additions & 20 deletions anomaly_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import numpy as np


def heat_map(output, target, SFE, TFE, constants_dict, config):
def heat_map(output, target, SFE, TFE, bn, constants_dict, config):
sigma = 4
kernel_size = 2 * int(4 * sigma + 0.5) +1
anomaly_map = 0
Expand All @@ -26,23 +26,24 @@ def heat_map(output, target, SFE, TFE, constants_dict, config):


i_d = color_distance(output, target, config) #torch.sqrt(torch.sum(((output)-(target))**2,dim=1).unsqueeze(1)) # 1 - F.cosine_similarity(patchify(output) , patchify(target), dim=1).to(config.model.device).unsqueeze(1) # color_distance(output, target, config) #torch.sqrt(torch.mean(((output)-(target))**2,dim=1).unsqueeze(1)) #torch.sqrt(torch.sum(((output)-(target))**2,dim=1).unsqueeze(1)) #color_distance(output, target, config) ((output)-(target))**2 #torch.mean(torch.abs((output)-(target)),dim=1).unsqueeze(1)
f_d = feature_distance((output), (target), SFE, TFE, constants_dict, config)
f_d = feature_distance((output), (target), SFE, TFE, bn, constants_dict, config)
f_d = torch.Tensor(f_d).to(config.model.device)
# print('image_distance mean : ',torch.mean(i_d))
# print('feature_distance mean : ',torch.mean(f_d))
# print('image_distance max : ',torch.max(i_d))
# print('feature_distance max : ',torch.max(f_d))
# i_d = torch.clamp(i_d, max=f_d.max().item())
print('image_distance max : ',torch.max(i_d))
print('feature_distance max : ',torch.max(f_d))


# visualalize_distance(output, target, i_d, f_d)

anomaly_map += f_d #0.1 * i_d + 2 * f_d # 2 for W5, 4 for W101
anomaly_map += 1 * i_d + f_d #0.1 * i_d + f_d # 2 for W5, 4 for W101

anomaly_map = gaussian_blur2d(
anomaly_map , kernel_size=(kernel_size,kernel_size), sigma=(sigma,sigma)
)

anomaly_map = torch.sum(anomaly_map, dim=1).unsqueeze(1)

# print( 'anomaly_map : ',torch.mean(anomaly_map))

return anomaly_map
Expand Down Expand Up @@ -80,8 +81,8 @@ def color_distance(image1, image2, config):
])
else:
transform = transforms.Compose([
transforms.CenterCrop(224),
# transforms.Lambda(lambda t: (t + 1) / (2)),
# transforms.CenterCrop(224),
transforms.Lambda(lambda t: (t + 1) / (2)),

transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
# transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
Expand All @@ -91,8 +92,8 @@ def color_distance(image1, image2, config):
image2 = transform(image2)
# visualalize_distance(image1_1, image2_2, image1, image2)

cmyk_image_1 = rgb_to_cmyk(image1)
cmyk_image_2 = rgb_to_cmyk(image2)
# cmyk_image_1 = rgb_to_cmyk(image1)
# cmyk_image_2 = rgb_to_cmyk(image2)

# cmyk_image_1 = ((cmyk_image_1 - cmyk_image_1.min())/ (cmyk_image_1.max() - cmyk_image_1.min())).to(config.model.device)
# cmyk_image_2 = ((cmyk_image_2 - cmyk_image_2.min())/ (cmyk_image_2.max() - cmyk_image_2.min())).to(config.model.device)
Expand All @@ -102,8 +103,7 @@ def color_distance(image1, image2, config):
# distance_map = (cmyk_image_1 - cmyk_image_2)**2
# distance_map = 1 - F.cosine_similarity((image1) , (image2), dim=1).to(config.model.device).unsqueeze(1)
# distance_map = torch.mean(distance_map, dim=1).unsqueeze(1)
distance_map = torch.mean(((image1) - (image2))**2, dim=1).unsqueeze(1)
distance_map = F.interpolate(distance_map , size = int(config.data.image_size), mode="bilinear")
distance_map = torch.mean(torch.abs(image1 - image2), dim=1).unsqueeze(1)
return distance_map


Expand All @@ -113,14 +113,15 @@ def cal_anomaly_map(fs_list, ft_list, config, out_size=256, amap_mode='mul'):
else:
anomaly_map = torch.zeros([fs_list[0].shape[0] ,1 ,out_size, out_size]).to(config.model.device)
a_map_list = []
# l1 = torch.nn.L1Loss()
for i in range(len(ft_list)):
if i == 0:
if i == 0: # or i == 1:
continue
fs = fs_list[i]
ft = ft_list[i]
#fs_norm = F.normalize(fs, p=2)
#ft_norm = F.normalize(ft, p=2)

a_map = 1 - F.cosine_similarity(patchify(fs), patchify(ft))
# a_map = torch.mean(torch.abs((fs) - (ft)),dim = 1) #l1 (fs , ft)
a_map = torch.unsqueeze(a_map, dim=1)
a_map = F.interpolate(a_map, size=out_size, mode='bilinear', align_corners=True)
if amap_mode == 'mul':
Expand All @@ -130,7 +131,10 @@ def cal_anomaly_map(fs_list, ft_list, config, out_size=256, amap_mode='mul'):
return anomaly_map, a_map_list


def feature_distance(output, target,SFE, TFE, constants_dict, config):
def feature_distance(output, target,SFE, TFE, bn, constants_dict, config):
SFE.eval()
TFE.eval()
bn.eval()

# output = ((output - output.min())/ (output.max() - output.min()))
# target = ((target - target.min())/ (target.max() - target.min()))
Expand All @@ -157,11 +161,20 @@ def feature_distance(output, target,SFE, TFE, constants_dict, config):
# transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
])

output = transform(output)
target = transform(target)
outpu_features = SFE(target)
target_features = TFE(target)
distance_map = cal_anomaly_map(outpu_features, target_features, config, out_size=256, amap_mode='a')[0]
output = transform(output)


inputs_features = TFE(target)
# target_features = SFE(bn(inputs_features))


output_features = TFE(output)
# output_output_features = SFE(bn(output_features))

# output_features = SFE(target)
# target_features = TFE(target)
distance_map = cal_anomaly_map(inputs_features, output_features, config, out_size=256, amap_mode='a')[0]
return distance_map

# outputs_features = extract_features(feature_extractor=feature_extractor, x=output, config=config, out_indices=['layer1'])
Expand Down
32 changes: 18 additions & 14 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,34 +1,36 @@
data :
name: mvtec #mtd # mvtec #mvtec if the dataset is MVTec AD, otherwise, it is the name of your desire dataset
data_dir: datasets/MVTec #MTD #MVTec
category: metal_nut #['hazelnut', 'bottle', 'cable', 'carpet', 'leather', 'capsule', 'grid', 'pill','transistor', 'metal_nut', 'screw','toothbrush', 'zipper', 'tile', 'wood']
image_size: 256
name: MVTec #cifar10 #MVTec # membrane #mtd # mvtec #mvtec #CIFAR10 if the dataset is MVTec AD, otherwise, it is the name of your desire dataset
data_dir: datasets/MVTec #cifar10 #MVTec #membrane #MTD #MVTec
category: carpet #zero #['hazelnut', 'bottle', 'cable', 'carpet', 'leather', 'capsule', 'grid', 'pill','transistor', 'metal_nut', 'screw','toothbrush', 'zipper', 'tile', 'wood']
image_size: 256 #256 #32
batch_size: 32
mask : True
mask : True #False True
imput_channel : 3
manualseed: -1



model:
checkpoint_dir: checkpoints/MVTec #MTD
checkpoint_dir: checkpoints/MVTec #CIFAR10 #membrane #MTD #MVTec
checkpoint_name: weights
exp_name: default
backbone: wide_resnet101_2 #wide_resnet101_2 #resnet18 # wide_resnet50_2 #resnet34 #deit_base_distilled_patch16_384
backbone: resnet18 #wide_resnet101_2 #resnet18 # wide_resnet50_2 #resnet34 #deit_base_distilled_patch16_384
pre_trained: True
noise : Gaussian # options : [Gaussian, Perlin]
schedule : linear # options: [linear, quad, const, jsd, sigmoid]
fine_tune : True #True #False
learning_rate: 1e-4 #0.0002
weight_decay: 0
epochs: 1000
epochs: 30000
trajectory_steps: 1000
test_trajectoy_steps: 160 #160
test_trajectoy_steps2: 160 #160
generate_time_steps: 900
test_trajectoy_steps: 200 #90 #160
test_trajectoy_steps2: 200 #160
trajectory_steps_condition: 500
skip : 20 #10
skip2 : 40 #20 #60
skip2 : 20 #20 #60
skip_generation : 100 #225
sigma : 0.5
eta : 1
eta : 1 #1
beta_start : 0.0001 # 0.0001
beta_end : 0.02 # 0.006 for 300
ema : True
Expand All @@ -39,10 +41,12 @@ model:
seed : 42



metrics:
image_level_F1Score: True
image_level_AUROC: True
pixel_level_AUROC: True
image_level_F1Score: True
pixel_level_F1Score: True
threshold:
method: adaptive #options: [adaptive, manual]
manual_image: null
Expand Down
115 changes: 38 additions & 77 deletions dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __init__(self, root, category, config, is_train=True):
self.mask_transform = transforms.Compose(
[
transforms.Resize((config.data.image_size, config.data.image_size)),
transforms.CenterCrop(224),
# transforms.CenterCrop(224),
transforms.ToTensor(), # Scales data into [0,1]
]
)
Expand Down Expand Up @@ -128,7 +128,7 @@ def __getitem__(self, index):
target = Image.open(
image_file.replace("/test/", "/ground_truth/"))
target = self.mask_transform(target)
target = F.interpolate(target.unsqueeze(1) , size = int(self.config.data.image_size), mode="bilinear").squeeze(1)
# target = F.interpolate(target.unsqueeze(1) , size = int(self.config.data.image_size), mode="bilinear").squeeze(1)
label = 'defective'
else:
if os.path.dirname(image_file).endswith("good"):
Expand All @@ -146,78 +146,39 @@ def __len__(self):



def get_cifar_anomaly_dataset(trn_img, trn_lbl, tst_img, tst_lbl, abn_cls_idx=0, manualseed=-1):
"""[summary]
Arguments:
trn_img {np.array} -- Training images
trn_lbl {np.array} -- Training labels
tst_img {np.array} -- Test images
tst_lbl {np.array} -- Test labels
Keyword Arguments:
abn_cls_idx {int} -- Anomalous class index (default: {0})
Returns:
[np.array] -- New training-test images and labels.
"""
# Convert train-test labels into numpy array.
trn_lbl = np.array(trn_lbl)
tst_lbl = np.array(tst_lbl)

# --
# Find idx, img, lbl for abnormal and normal on org dataset.
nrm_trn_idx = np.where(trn_lbl != abn_cls_idx)[0]
abn_trn_idx = np.where(trn_lbl == abn_cls_idx)[0]
nrm_trn_img = trn_img[nrm_trn_idx] # Normal training images
abn_trn_img = trn_img[abn_trn_idx] # Abnormal training images
nrm_trn_lbl = trn_lbl[nrm_trn_idx] # Normal training labels
abn_trn_lbl = trn_lbl[abn_trn_idx] # Abnormal training labels.

nrm_tst_idx = np.where(tst_lbl != abn_cls_idx)[0]
abn_tst_idx = np.where(tst_lbl == abn_cls_idx)[0]
nrm_tst_img = tst_img[nrm_tst_idx] # Normal training images
abn_tst_img = tst_img[abn_tst_idx] # Abnormal training images.
nrm_tst_lbl = tst_lbl[nrm_tst_idx] # Normal training labels
abn_tst_lbl = tst_lbl[abn_tst_idx] # Abnormal training labels.

# --
# Assign labels to normal (0) and abnormals (1)
nrm_trn_lbl[:] = 0
nrm_tst_lbl[:] = 0
abn_trn_lbl[:] = 1
abn_tst_lbl[:] = 1

# --
if manualseed != -1:
# Random seed.
# Concatenate the original train and test sets.
nrm_img = np.concatenate((nrm_trn_img, nrm_tst_img), axis=0)
nrm_lbl = np.concatenate((nrm_trn_lbl, nrm_tst_lbl), axis=0)
abn_img = np.concatenate((abn_trn_img, abn_tst_img), axis=0)
abn_lbl = np.concatenate((abn_trn_lbl, abn_tst_lbl), axis=0)

# Split the normal data into the new train and tests.
idx = np.arange(len(nrm_lbl))
np.random.seed(manualseed)
np.random.shuffle(idx)

nrm_trn_len = int(len(idx) * 0.80)
nrm_trn_idx = idx[:nrm_trn_len]
nrm_tst_idx = idx[nrm_trn_len:]

nrm_trn_img = nrm_img[nrm_trn_idx]
nrm_trn_lbl = nrm_lbl[nrm_trn_idx]
nrm_tst_img = nrm_img[nrm_tst_idx]
nrm_tst_lbl = nrm_lbl[nrm_tst_idx]

# Create new anomaly dataset based on the following data structure:
# - anomaly dataset
# . -> train
# . -> normal
# . -> test
# . -> normal
# . -> abnormal
new_trn_img = np.copy(nrm_trn_img)
new_trn_lbl = np.copy(nrm_trn_lbl)
new_tst_img = np.concatenate((nrm_tst_img, abn_trn_img, abn_tst_img), axis=0)
new_tst_lbl = np.concatenate((nrm_tst_lbl, abn_trn_lbl, abn_tst_lbl), axis=0)

return new_trn_img, new_trn_lbl, new_tst_img, new_tst_lbl
def load_data(dataset_name='cifar10',normal_class=0,batch_size= 32):


img_transform = transforms.Compose([
transforms.Resize((32, 32)),
#transforms.CenterCrop(28),
transforms.ToTensor(),
transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
])

os.makedirs("./Dataset/CIFAR10/train", exist_ok=True)
dataset = CIFAR10('./Dataset/CIFAR10/train', train=True, download=True, transform=img_transform)
print("Cifar10 DataLoader Called...")
print("All Train Data: ", dataset.data.shape)
dataset.data = dataset.data[np.array(dataset.targets) == normal_class]
dataset.targets = [normal_class] * dataset.data.shape[0]
print("Normal Train Data: ", dataset.data.shape)

os.makedirs("./Dataset/CIFAR10/test", exist_ok=True)
test_set = CIFAR10("./Dataset/CIFAR10/test", train=False, download=True, transform=img_transform)
print("Test Train Data:", test_set.data.shape)

train_dataloader = torch.utils.data.DataLoader(
dataset,
batch_size=batch_size,
shuffle=True,
)
test_dataloader = torch.utils.data.DataLoader(
test_set,
batch_size=32,
shuffle=False,
)

return train_dataloader, test_dataloader


Loading

0 comments on commit 37820a3

Please sign in to comment.