Noisy-Data/making_noise.py

195 lines
6.2 KiB
Python

import torch
import random
import numpy as np
import matplotlib.pyplot as plt
from torchvision import datasets
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
torch.pi = torch.acos(torch.zeros(1)).item() * 2 # which is 3.1415927410125732
class AddGaussianNoise(object):
def __init__(self, mean=0., std=1.):
self.std = std
self.mean = mean
def __call__(self, tensor):
return tensor + torch.randn(tensor.size()) * self.std + self.mean
def __repr__(self):
return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)
class AddRaleighNoise(object):
def __init__(self, a=0.0, b=0.0):
self.std = (b * (4 - np.pi)) / 4
self.mean = a + np.sqrt((np.pi * b) / 4)
def __call__(self, tensor):
print('(mean={0}, std={1})'.format(self.mean, self.std))
return tensor + torch.randn(tensor.size()) * self.std + self.mean
def __repr__(self):
return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)
class AddErlangNoise(object):
def __init__(self, a=0.0, b=0.0):
if a == 0.0:
self.std = 0.0
self.mean = 0.0
else:
self.std = b / a
self.mean = b / (2*a)
def __call__(self, tensor):
if self.mean == 0.0:
return tensor * self.mean
else:
return tensor + torch.randn(tensor.size()) * self.std + self.mean
def __repr__(self):
return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)
class AddExponentialNoise(object):
def __init__(self, a=0.0):
if a == 0.0:
self.mean = 0.0
else:
self.std = 1 / (2*a)
self.mean = 1 / a
def __call__(self, tensor):
if self.mean == 0.0:
return tensor * self.mean
else:
return tensor + torch.randn(tensor.size()) * self.std + self.mean
def __repr__(self):
return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)
class AddUniformNoise(object):
def __init__(self, a=0.0, b=0.0):
if a == 0.0:
self.std = 0.0
self.mean = 0.0
else:
self.std = (b - a)**2 / 12
self.mean = (b + a) / 2
def __call__(self, tensor):
if self.mean == 0.0:
return tensor * self.mean
else:
return tensor + (torch.randn(tensor.size()) * self.std + self.mean)
def __repr__(self):
return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)
class AddImpulseNoise(object):
def __init__(self, a=0.0):
self.value = a
def __call__(self, tensor):
if random.gauss(0, 1) > 0:
return tensor * self.value
elif random.gauss(0, 1) < 0:
return tensor * (-1 * self.value)
else:
return tensor * 0.0
def __repr__(self):
return self.__class__.__name__ + '(a={0})'.format(self.value)
def get_mnist_loaders(batch_size=128, test_batch_size=1000, perc=1.0):
transform_train = transforms.Compose([
transforms.RandomCrop(28, padding=4),
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,)),
# AddGaussianNoise(0., 0.0),
# AddRaleighNoise(1, 1),
# AddErlangNoise(0.0001, 0.0001),
# AddExponentialNoise(2),
# AddUniformNoise(2, 1),
AddImpulseNoise(0.5),
])
transform_test = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,)),
])
train_loader = DataLoader(
datasets.MNIST(root='.data/mnist', train=True, download=True, transform=transform_train),
batch_size=batch_size, shuffle=True, num_workers=2, drop_last=True
)
train_eval_loader = DataLoader(
datasets.MNIST(root='.data/mnist', train=True, download=True, transform=transform_test),
batch_size=test_batch_size, shuffle=True, num_workers=2, drop_last=True
)
test_loader = DataLoader(
datasets.MNIST(root='.data/mnist', train=False, download=True, transform=transform_test),
batch_size=test_batch_size, shuffle=False, num_workers=2, drop_last=True
)
return train_loader, test_loader, train_eval_loader
def get_cifar_loaders(batch_size=128, test_batch_size=1000, perc=1.0):
transform_train = transforms.Compose([
transforms.ToTensor(),
transforms.RandomCrop(32, padding=0),
# transforms.Normalize((0.5,), (0.5,)),
# AddGaussianNoise(0., 0.25),
AddRaleighNoise(1, 200), # CIFAR requires big b value
# AddErlangNoise(0.0001, 0.0001),
# AddExponentialNoise(2),
# AddUniformNoise(100, 1), # CIFAR requires big a value
# AddImpulseNoise(0.5),
])
transform_test = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,)),
])
train_loader = DataLoader(
datasets.CIFAR10(root='.data/cifar', train=True, download=True, transform=transform_train),
batch_size=batch_size, shuffle=True, num_workers=2, drop_last=True
)
train_eval_loader = DataLoader(
datasets.CIFAR10(root='.data/cifar', train=True, download=True, transform=transform_test),
batch_size=test_batch_size, shuffle=True, num_workers=2, drop_last=True
)
test_loader = DataLoader(
datasets.CIFAR10(root='.data/cifar', train=False, download=True, transform=transform_test),
batch_size=test_batch_size, shuffle=False, num_workers=2, drop_last=True
)
return train_loader, test_loader, train_eval_loader
if __name__ == '__main__':
train_loader, test_loader, train_eval_loader\
= get_cifar_loaders()
#for batch_idx, (data, target) in tqdm(enumerate(train_loader), total=len(train_loader)):
# sample_idx = torch.randint(len(data), size=(1,)).item()
# img = data[sample_idx]
# # print(data)
images, labels = next(iter(train_loader))
plt.imshow(images[0].permute(1, 2, 0))
# plt.imshow(images[0].reshape(28, 28), cmap='gray')
plt.show()
#print(images[0].shape)