【Pytorch】MNISTで簡単なアンサンブル学習を実装
はじめに
Kaggleなどを勉強していて、
不均衡データが発生した時にどうすりゃいいんだとなる事があったので、
アンサンブル学習的な要素を含めて実装していきます。
データの不均衡
片方のデータの取得が困難な場合に発生します。
異常検知では、そもそも異常事象の発生頻度の少なさが本質的課題となっているわけです。
ラベルをあらかじめ付与した教師あり学習の場合、
学習時に多数派のデータに偏ってしまう為、精度がイマイチ向上しません。
Under Samplingの問題点
改善策として挙げられるのは・・・
- 少数派のデータの拡張を図るOverSampling
- 多数派のデータの縮小を図るUnderSampling
- 多数派データの構造・特徴抽出から学習を行う教師なし学習
- 少数派データの比重を大きくしながら学習を行うコスト考慮型学習
と4つありますが、今回はUnderSamplingに焦点を当てます。
UnderSamplingでは多数派データの数を少数派データに合わせて学習していきますが、
問題点として「多数派データの一部を捨てるなんで勿体無いなあ」と思ってしまうわけです。
そこで多数派データ全てを活用しながら、UnderSamplingをしていこうと思いました。
ここでアンサンブル学習の考え方を活用して、
多数派データ(=N)からランダムで少数データ分(=n)だけ選択して、
(N/n)パターンのデータセットが完成します。
各データセットを学習し、(N/n)パターンのモデルを作成します。
推論時に(N/n)パターンのモデル全てで推論して、
これらの推論結果を多数決で判断してやろう!といった考え方です。
実装(前処理)
実装はPytorchを使って書きます。
MNISTファイルをダウンロードしてきます。
transform = transforms.Compose([ transforms.ToTensor(),transforms.Normalize((0.5,), (0.5,))]) trainset = datasets.MNIST('./data_mnist', train=True, transform=transform, download=True)
1と7の画像に分別して、「1」を2000枚、「7」を50枚として学習していきます。
train_1, train_7 = [], [] for i, t in enumerate(trainset): if t[1] == 1: train_1.append(t) elif t[1] == 7: l = list(t) l[1] = 0 t = tuple(l) train_7.append(t) else: pass import random train_1 = random.sample(train_1, 2000) train_7 = train_7[:50]
2000枚の「1」の画像から50枚をランダム選択して、「7」の画像50枚とセットにします。
この場合40パターンの学習セットが完成します。
n = int(math.floor(len(train_1)/len(train_7))) train_dataset_list = [] validation_dataset_list = [] for i in range(n): train_dataset = random.sample(train_1, len(train_7)) train_dataset.extend(train_7) train_dataset_list.append(train_dataset)
DataLoaderに変換します。
train_loader_list = [] batch_size = 50 for i in range(n): trainloader = DataLoader(train_dataset_list[i], batch_size=batch_size, shuffle=True) train_loader_list.append(trainloader)
実装(訓練)
ネットワークモデルは軽めのモデルを活用します。
class MyNet(nn.Module): def __init__(self): super(MyNet,self).__init__() self.conv1 = nn.Conv2d(1,32,3,1) self.conv2 = nn.Conv2d(32,64,3,1) self.pool = nn.MaxPool2d(2,2) self.dropout1 = nn.Dropout2d(0.25) self.dropout2 = nn.Dropout2d(0.5) self.fc1 = nn.Linear(12*12*64,128) self.fc2 = nn.Linear(128,10) def forward(self,x): x = self.conv1(x) x = f.relu(x) x = self.conv2(x) x = f.relu(x) x = self.pool(x) x = self.dropout1(x) x = x.view(-1,12*12*64) x = self.fc1(x) x = f.relu(x) x = self.dropout2(x) x = self.fc2(x) return f.log_softmax(x, dim=1)
オプティマイザはAdam、損失関数はCrossEntoropyとします。
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") criterion = nn.CrossEntropyLoss() net = MyNet().to(device) optimizer = torch.optim.Adam(params=net.parameters(), lr=0.001)
では学習を進めていきます。
40パターンのデータセットごとに学習を進めて、モデルファイルを保存していきますので、
以下のようなプログラムとなりました。
for i in range(int(n1/n7)): net = MyNet().to(device) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") criterion = nn.CrossEntropyLoss() net.to(device) optimizer = torch.optim.Adam(params=net.parameters(), lr=0.001) train_dataloader = train_loader_list[i] for epoch in range(num_epochs): train_loss, train_acc = 0, 0 center_loss_alpha = 1 net.train() for i , (images, labels) in enumerate(trainloader): images, labels = images.to(device), labels.to(device) optimizer.zero_grad() output = net(images) loss = criterion(output,labels) train_loss += loss.item() train_acc += (output.max(1)[1] == labels).sum().item() loss.backward() optimizer.step() if i % 100 == 99: print("Training: {} epoch. {} iteration. Loss:{}".format(e+1,i+1,loss.item())) train_loss /= len(trainloader.dataset) torch.save(net.state_dict(), './unsemble_result/mynetwork_%d.ckpt'%(i))
実装(推論)
テストデータも訓練データのように作成します。
40パターンのモデルにそれぞれで推論を行い、
1つのテストデータに対し、40個のモデルが答えを出します。
40個の答えから多数決で判断していきます。
for inputs, labels in testloader: inputs = inputs.to(device) labels = labels.to(device) #print(device) result = {'label': labels, 'output': []} lis = [] for j in range(int(n1/n7)): net.load_state_dict(torch.load('./unsemble_result/mynetwork_%d.ckpt'%(i))) net.eval() outputs = net(inputs) lis.append(outputs.max(1)[1]) result['output'] = lis result_list7.append(result)