Noisy-Data/making_noise.py

197 lines
6.2 KiB
Python

import torch
import random
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
from torchvision import datasets
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
torch.pi = torch.acos(torch.zeros(1)).item() * 2 # which is 3.1415927410125732
class AddGaussianNoise(object):
def __init__(self, mean=0., std=1.):
self.std = std
self.mean = mean
def __call__(self, tensor):
return tensor + torch.randn(tensor.size()) * self.std + self.mean
def __repr__(self):
return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)
class AddRaleighNoise(object):
def __init__(self, a=0.0, b=0.0):
self.std = (b * (4 - np.pi)) / 4
self.mean = a + np.sqrt((np.pi * b) / 4)
def __call__(self, tensor):
return tensor + torch.randn(tensor.size()) * self.std + self.mean
def __repr__(self):
return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)
class AddErlangNoise(object):
def __init__(self, a=0.0, b=0.0):
if a == 0.0:
self.std = 0.0
self.mean = 0.0
else:
self.std = b / a
self.mean = b / (2*a)
def __call__(self, tensor):
if self.mean == 0.0:
return tensor * self.mean
else:
return tensor + torch.randn(tensor.size()) * self.std + self.mean
def __repr__(self):
return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)
class AddExponentialNoise(object):
def __init__(self, a=0.0):
if a == 0.0:
self.mean = 0.0
else:
self.std = 1 / (2*a)
self.mean = 1 / a
def __call__(self, tensor):
if self.mean == 0.0:
return tensor * self.mean
else:
return tensor + torch.randn(tensor.size()) * self.std + self.mean
def __repr__(self):
return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)
class AddUniformNoise(object):
def __init__(self, a=0.0, b=0.0):
if a == 0.0:
self.std = 0.0
self.mean = 0.0
else:
self.std = (b - a)**2 / 12
self.mean = (b + a) / 2
def __call__(self, tensor):
if self.mean == 0.0:
return tensor * self.mean
else:
print('(mean={0}, std={1})'.format(self.mean, self.std))
return tensor + (torch.randn(tensor.size()) * self.std + self.mean)
def __repr__(self):
return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)
class AddInpulseNoise(object):
def __init__(self, a=0.0):
self.value = a
def __call__(self, tensor):
if random.gauss(0, 1) > 0:
return tensor * self.value
elif random.gauss(0, 1) < 0:
return tensor * (-1 * self.value)
else:
return tensor * 0.0
def __repr__(self):
return self.__class__.__name__ + '(a={0})'.format(self.value)
def get_mnist_loaders(batch_size=128, test_batch_size=1000, perc=1.0):
transform_train = transforms.Compose([
transforms.RandomCrop(28, padding=4),
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,)),
# AddGaussianNoise(0., 0.0),
# AddRaleighNoise(1, 1),
# AddErlangNoise(0.0001, 0.0001),
# AddExponentialNoise(2),
# AddUniformNoise(2, 1),
AddInpulseNoise(0.5),
])
transform_test = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,)),
])
train_loader = DataLoader(
datasets.MNIST(root='.data/mnist', train=True, download=True, transform=transform_train),
batch_size=batch_size, shuffle=True, num_workers=2, drop_last=True
)
train_eval_loader = DataLoader(
datasets.MNIST(root='.data/mnist', train=True, download=True, transform=transform_test),
batch_size=test_batch_size, shuffle=True, num_workers=2, drop_last=True
)
test_loader = DataLoader(
datasets.MNIST(root='.data/mnist', train=False, download=True, transform=transform_test),
batch_size=test_batch_size, shuffle=False, num_workers=2, drop_last=True
)
return train_loader, test_loader, train_eval_loader
def get_cifar_loaders(batch_size=128, test_batch_size=1000, perc=1.0):
transform_train = transforms.Compose([
transforms.ToTensor(),
transforms.RandomCrop(32, padding=0),
# transforms.Normalize((0.5,), (0.5,)),
# AddGaussianNoise(0., 0.25),
# AddRaleighNoise(1, 2), # Not worinkg for CIFAR
# AddErlangNoise(0.0001, 0.0001),
# AddExponentialNoise(2),
AddUniformNoise(2, 1), # Not working for CIFAR
# AddInpulseNoise(0.5),
])
transform_test = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,)),
])
train_loader = DataLoader(
datasets.CIFAR10(root='.data/cifar', train=True, download=True, transform=transform_train),
batch_size=batch_size, shuffle=True, num_workers=2, drop_last=True
)
train_eval_loader = DataLoader(
datasets.CIFAR10(root='.data/cifar', train=True, download=True, transform=transform_test),
batch_size=test_batch_size, shuffle=True, num_workers=2, drop_last=True
)
test_loader = DataLoader(
datasets.CIFAR10(root='.data/cifar', train=False, download=True, transform=transform_test),
batch_size=test_batch_size, shuffle=False, num_workers=2, drop_last=True
)
return train_loader, test_loader, train_eval_loader
if __name__ == '__main__':
train_loader, test_loader, train_eval_loader\
= get_cifar_loaders()
#for batch_idx, (data, target) in tqdm(enumerate(train_loader), total=len(train_loader)):
# sample_idx = torch.randint(len(data), size=(1,)).item()
# img = data[sample_idx]
# # print(data)
images, labels = next(iter(train_loader))
plt.imshow(images[0].permute(1, 2, 0))
# plt.imshow(images[0].reshape(28, 28), cmap='gray')
plt.show()
#print(images[0].shape)