본문 바로가기
ML & DL/tensorflow

[tensorflow] UNet (Oxford-IIIT Pet segmentation)

by 별준 2021. 1. 17.

(tensorflow v2.4.0)

 

2021/01/16 - [ML & DL/tensorflow] - [tensorflow] Fully Convolutional Networks(FCNs)

 

[tensorflow] Fully Convolutional Networks(FCNs)

(tensorflow v2.4.0) Fully Convolutional Network(FCN)를 사용해서 Image Segmentation을 수행하는 모델을 구현해보도록 하겠습니다. 먼저 FCN에 대해서 아주 간단하게 살펴보겠습니다. FCN의 자세한 내용은 논문..

junstar92.tistory.com

이번에는 Fully Convolutional network에 이어서 Image Segmentation을 목적으로 제안된 FCN을 기반으로한 End-to-end 방식의 UNet에 대해서 간단히 살펴보고, IIIT Pet segmentation을 직접 구현해보도록 하겠습니다.

(paper : 논문 링크)

 

UNet의 기본구조는 아래와 같습니다. 이름에서 추측할 수 있듯이 U 모양의 형태의 대칭으로 네트워크가 구성되어 있으며, FCN과 마찬가지로 EncoderDecoder로 구성됩니다.

 

Encoder는 FCN과 거의 동일하게 CNN 구조의 Block이 여러개로 구성되어 있으며, Feature Extractor의 역할을 담당합니다. 그리고 Decoder를 통해서 segmentation을 수행하게 됩니다. 마찬가지로 Skip Connection이 존재하는데, Encoder의 conv layer의 결과를 가져와서 Decoder에서 사용됩니다.

Skip Connection으로 전달되는 값은 Decoder에서 Upsampling path에서 추가되는데, FCN에서는 더해주었지만, UNet에서는 concatenate됩니다.(값을 이어주게 됩니다!)

 

FCN과 큰 차이는 없기 때문에, 바로 직접 코드를 통해서 구현해보고 학습해보도록 하겠습니다.

이번에 사용되는 데이터는 tensorflow dataset에서 제공되는 Oxford-IIIT Pet dataset이며, 각 픽셀을 Pet, Outline, Background 3가지로 분류한 annotation을 가지고 학습에 사용해보도록 하겠습니다.

 

Import

사용되는 package들을 import합니다.

import tensorflow as tf
import tensorflow_datasets as tfds
import matplotlib.pyplot as plt
import numpy as np

Download the Oxford-IIIT Pet Dataset

dataset, info = tfds.load('oxford_iiit_pet', with_info=True)
print(dataset.keys())
print(info)

dataset은 test와 train으로 나누어져서 제공되며, file_name, image, label, segmentation_mask, species가 제공되지만 우리는 image와 segmentation mask만 사용할 예정입니다.

 

Prepare the Dataset

dataset을 통해서 training set과 test set으로 나누는 작업을 진행하겠습니다. 그리고 전처리 과정으로 normalization를 적용하고, 학습 data에는 random flip을 적용하겠습니다.

이 과정은 Dataset의 map 메소드를 통해서 수행될 예정이므로, mapping 함수를 우선 정의하겠습니다.

def random_flip(input_image, input_mask):
    ''' do a random flip of the image and mask'''
    if tf.random.uniform(()) > 0.5:
        input_image = tf.image.flip_left_right(input_image)
        input_mask = tf.image.flip_left_right(input_mask)
    
    return input_image, input_mask

def normalize(input_image, input_mask):
    '''
    normalize the input image pixel values to be from [0,1]
    subtract 1 from the mask labels to have a range from [0,2]
    '''
    input_image = tf.cast(input_image, tf.float32) / 255.0
    input_mask -= 1
    return input_image, input_mask

def load_image_train(datapoint):
    ''' resize, normalize and flips the training data '''
    input_image = tf.image.resize(datapoint['image'], (128, 128), method='nearest')
    input_mask = tf.image.resize(datapoint['segmentation_mask'], (128, 128), method='nearest')
    input_image, input_mask = random_flip(input_image, input_mask)
    input_image, input_mask = normalize(input_image, input_mask)

    return input_image, input_mask

def load_image_test(datapoint):
    ''' resize and normalize the test data'''
    input_image = tf.image.resize(datapoint['image'], (128, 128), method='nearest')
    input_mask = tf.image.resize(datapoint['segmentation_mask'], (128, 128), method='nearest')
    input_image, input_mask = normalize(input_image, input_mask)

    return input_image, input_mask

전처리를 진행합니다.

# preprocess the train and test sets
train = dataset['train'].map(load_image_train, num_parallel_calls=tf.data.experimental.AUTOTUNE)
test = dataset['test'].map(load_image_test)

 

그리고, batch size는 64로 설정해서 training dataset과 test dataset을 만듭니다.

BATCH_SIZE = 64
BUFFER_SIZE = 1000

# shuffle and group the train set into batches
train_dataset = train.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE).repeat()

# do a prefetch to optimize processing
train_dataset = train_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

# group the test set into batches
test_dataset = test.batch(BATCH_SIZE)

 

dataset의 sample을 살펴보도록 하겠습니다.

# class list of the mask pixels
class_names = ['pet', 'background', 'outline']

def display_with_metrics(display_list, iou_list, dice_score_list):
    ''' displays a list of images/masks and overlays a list of IoU and Dice Score '''
    metrics_by_id = [(idx, iou, dice_score) for idx, (iou, dice_score) in enumerate(zip(iou_list, dice_score_list))]
    metrics_by_id.sort(key=lambda tup: tup[1], reverse=True) # sort in place

    display_string_list = [f"{class_names[idx]}: IoU: {iou} Dice Score: {dice_score}" for idx, iou, dice_score in metrics_by_id]
    display_string = "\n\n".join(display_string_list)

    display(display_list, ["Image", "Predicted Mask", "True Mask"], display_string=display_string)

def display(display_list, titles=[], display_string=None):
    ''' displays list of images/masks'''
    plt.figure(figsize=(15,15))

    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i+1)
        plt.title(titles[i])
        plt.xticks([])
        plt.yticks([])
        if display_string and i == 1:
            plt.xlabel(display_string, fontsize=12)
        img_arr = tf.keras.preprocessing.image.array_to_img(display_list[i])
        plt.imshow(img_arr)
    
    plt.show()

def show_image_from_dataset(dataset):
    for image, mask in dataset.take(1):
        sample_image, sample_mask = image, mask
    display([sample_image, sample_mask], titles=['Image', 'True Maks'])
# display an image from the train set
show_image_from_dataset(train)

# display an image from the test set
show_image_from_dataset(test)

 

Define the Model

Encoder를 구현해보도록 하겠습니다. 각 Block에서 Convolution은 총 두 번 이루어지므로, 중복되는 부분을 conv2d_block 함수로 정의하고, pooling layer를 포함한 encoder_block을 정의합니다. pooling layer 이후에는 regularization 목적으로 dropout layer도 추가해줍니다. 논문에서는 conv layer의 padding이 없지만, 편의를 위해서 padding을 'same'으로 설정하였습니다.

# Encoder

def conv2d_block(input_tensor, n_filters, kernel_size=3):
    '''
    Add 2 convolutional layers with the parameters
    '''

    x = input_tensor
    for i in range(2):
        x = tf.keras.layers.Conv2D(filters=n_filters, kernel_size=kernel_size,
                                   kernel_initializer='he_normal', activation='relu', padding='same')(x)
    return x

def encoder_block(inputs, n_filters=64, pool_size=(2,2), dropout=0.3):
    '''
    Add 2 convolutional blocks and then perform down sampling on output of convolutions
    '''

    f = conv2d_block(inputs, n_filters)
    p = tf.keras.layers.MaxPooling2D(pool_size=pool_size)(f)
    p = tf.keras.layers.Dropout(dropout)(p)

    return f, p

def encoder(inputs):
    '''
    defines the encoder or downsampling path.
    '''
    
    f1, p1 = encoder_block(inputs, n_filters=64)
    f2, p2 = encoder_block(p1, n_filters=128)
    f3, p3 = encoder_block(p2, n_filters=256)
    f4, p4 = encoder_block(p3, n_filters=512)

    return p4, (f1, f2, f3, f4)

encoder_block은 총 2개의 결과값을 반환하며, skip connection을 위한 2번의 conv layer의 output과 다음 encoder block에서 사용될 pooling layer와 dropout을 통과한 output을 반환합니다.

최종 encoder에서는 각 block에서의 conv layer output과 마지막 pooling layer/dropout의 output을 반환합니다.

 

다음은 Encoder의 마지막 결과값을 decoder로 전달하기 전에 진행되는 bottleneck을 정의합니다.

# Bottlenect
def bottleneck(inputs):
    bottle_neck = conv2d_block(inputs, n_filters=1024)
    return bottle_neck

마지막으로 Decoder를 정의합니다. decoder_block에서 inputs을 Conv2DTranspose를 통해서 Upsampling을 진행하고, Encoder의 output과 concatenate를 진행한 후에 Convolution을 두 번 수행합니다.

# Decoder
def decoder_block(inputs, conv_output, n_filters=64, kernel_size=3, strides=3, dropout=0.3):
    '''
    defines the one decoder block of the UNet
    '''

    u = tf.keras.layers.Conv2DTranspose(n_filters, kernel_size, strides, padding='same')(inputs)
    c = tf.keras.layers.concatenate([u, conv_output])
    c = tf.keras.layers.Dropout(dropout)(c)
    c = conv2d_block(c, n_filters)

    return c

def decoder(inputs, convs, output_channels):
    '''
    Defines the decoder of the UNet chaining together 4 decoder blocks. 
    '''

    f1, f2, f3, f4 = convs

    c6 = decoder_block(inputs, f4, n_filters=512, kernel_size=3, strides=2)
    c7 = decoder_block(c6, f3, n_filters=256, kernel_size=3, strides=2)
    c8 = decoder_block(c7, f2, n_filters=128, kernel_size=3, strides=2)
    c9 = decoder_block(c8, f1, n_filters=64, kernel_size=3, strides=2)

    outputs = tf.keras.layers.Conv2D(output_channels, 1, activation='softmax')(c9)

    return outputs

 

최종적으로 모든 것을 하나로 합쳐서 UNet을 구성하는 함수를 정의하고, 모델을 생성하겠습니다.

분류해야하는 Pixel의 라벨은 Pet/Outline/Background 총 3개이므로, 최종 output은 128x128x3의 모양을 갖습니다.

# putting it all together
OUTPUT_CHANNELS = 3

def UNet():
    '''
    Defines the UNet by connecting the encoder, bottleneck and decoder
    '''

    inputs = tf.keras.layers.Input(shape=(128,128,3,))

    encoder_output, convs = encoder(inputs)
    bottle_neck = bottleneck(encoder_output)
    outputs = decoder(bottle_neck, convs, OUTPUT_CHANNELS)

    model = tf.keras.Model(inputs, outputs)

    return model

model = UNet()
model.summary()

Compile and Train the Model

Optimizer는 Adam, Loss function은 'sparse_categorical_crossentropy'로 설정했으며, 정확도를 평가합니다.

model.compile(optimizer='adam', loss='sparse_categorical_crossentropy',
              metrics=['acc'])

총 20 Epoch로 학습을 진행합니다.

# configure the training parameters and train the model
TRAIN_LENGTH = info.splits['train'].num_examples
EPOCHS = 20
VAL_SUBSPLITS = 5
STEPS_PER_EPOCH = TRAIN_LENGTH // BATCH_SIZE
VALIDATION_STEPS = info.splits['test'].num_examples//BATCH_SIZE//VAL_SUBSPLITS

# this will take around 20 minutes to run
model_history = model.fit(train_dataset, epochs=EPOCHS,
                          steps_per_epoch=STEPS_PER_EPOCH,
                          validation_steps=VALIDATION_STEPS,
                          validation_data=test_dataset)

18 Epoch 부터 약간 과적합의 영향이 나타나고 있는 것 같습니다.

 

아래는 train loss와 validation loss의 그래프입니다.

# Plot the training and validation loss
def plot_metrics(model_history, metric_name, title, ylim=5):
    plt.title(title)
    plt.ylim(0, ylim)
    plt.plot(model_history.history[metric_name], 'b', label=metric_name)
    plt.plot(model_history.history['val_' + metric_name], 'g', label='val_'+metric_name)
    plt.legend()
    
plot_metrics(model_history, "loss", title="Training vs Validation Loss", ylim=1)

 

Make Predictions

이제 Test data를 가지고, 얼마나 잘 예측하는지 알아보겠습니다.

아래 함수들은 예측을 위한 Utility 함수와 IoU와 Dice Score를 계산하기 위한 함수입니다.

# Prediction Utilities
def get_test_image_and_annotation_arrays():
    '''
    Unpacks the test dataset and returns the input images and segmentation masks
    '''

    ds = test_dataset.unbatch()
    ds = ds.batch(info.splits['test'].num_examples)

    images = []
    y_true_segments = []

    for image, annotation in ds.take(1):
        y_true_segments = annotation.numpy()
        images = image.numpy()
    
    y_true_segments = y_true_segments[:(info.splits['test'].num_examples - (info.splits['test'].num_examples % BATCH_SIZE))]

    return images[:(info.splits['test'].num_examples - (info.splits['test'].num_examples % BATCH_SIZE))], y_true_segments

def create_mask(pred_mask):
    '''
    Creates the segmentation mask by getting the channel with the highest probability. Remember that we
    have 3 channels in the output of the UNet. For each pixel, the predicition will be the channel with the
    highest probability.
    '''

    pred_mask = tf.argmax(pred_mask, axis=-1)
    pred_mask = pred_mask[..., tf.newaxis]
    return pred_mask[0].numpy()

def make_predictions(image, mask, num=1):
    '''
    Feeds an image to a model and returns the predicted mask
    '''

    image = np.reshape(image, (1, image.shape[0], image.shape[1], image.shape[2]))
    pred_mask = model.predict(image)
    pred_mask = create_mask(pred_mask)

    return pred_mask
    
def class_wise_metrics(y_true, y_pred):
    class_wise_iou = []
    class_wise_dice_score = []

    smoothening_factor = 0.00001
    
    for i in range(3):
        intersection = np.sum((y_pred==i) * (y_true==i))
        y_true_area = np.sum((y_true==i))
        y_pred_area = np.sum((y_pred==i))
        combined_area = y_true_area + y_pred_area

        iou = (intersection + smoothening_factor) / (combined_area - intersection + smoothening_factor)
        class_wise_iou.append(iou)

        dice_score = 2 * ((intersection + smoothening_factor) / (combined_area + smoothening_factor))
        class_wise_dice_score.append(dice_score)
    
    return class_wise_iou, class_wise_dice_score

이제 test data의 ground truth 값과 predictions을 구하고, class별 metric(IoU, Dice Score)을 계산합니다.

# Setup the ground truth and predictions.
# get the ground truth from the test set
y_true_images, y_true_segments = get_test_image_and_annotation_arrays()

# feed the test set to th emodel to get the predicted masks
results = model.predict(test_dataset, steps=info.splits['test'].num_examples//BATCH_SIZE)
results = np.argmax(results, axis=3)
results = results[..., tf.newaxis]

# compute the class wise metrics
cls_wise_iou, cls_wise_dice_score = class_wise_metrics(y_true_segments, results)

class별 IoU 결과입니다.

# show the IOU for each class
for idx, iou in enumerate(cls_wise_iou):
    spaces = ' ' * (10-len(class_names[idx]) + 2)
    print("{}{}{} ".format(class_names[idx], spaces, iou)) 

class별 Dice Score 결과입니다.

# show the Dice Score for each class
for idx, dice_score in enumerate(cls_wise_dice_score):
    spaces = ' ' * (10-len(class_names[idx]) + 2)
    print("{}{}{} ".format(class_names[idx], spaces, dice_score))

예측 결과를 시각화해서 살펴보겠습니다. integer_slider에 보고싶은 index를 넣어서 다른 이미지도 살펴볼 수 있습니다.

# Please input a number between 0 to 3647 to pick an image from the dataset
integer_slider = 2800

# Get the prediction mask
y_pred_mask = make_predictions(y_true_images[integer_slider], y_true_segments[integer_slider])

# Compute the class wise metrics
iou, dice_score = class_wise_metrics(y_true_segments[integer_slider], y_pred_mask)  

# Overlay the metrics with the images
display_with_metrics([y_true_images[integer_slider], y_pred_mask, y_true_segments[integer_slider]], iou, dice_score)

나름 꽤 괜찮은 결과를 보여주고 있는 것 같습니다.

 

지금까지 UNet으로 구현한 Image Segmentation이었습니다!!

'ML & DL > tensorflow' 카테고리의 다른 글

Saliency Map  (0) 2021.01.18
Class Activation Map(CAM)  (6) 2021.01.18
[tensorflow] Fully Convolutional Networks(FCNs)  (7) 2021.01.16
Breast Cancer Prediction  (0) 2021.01.13
[tensorflow] GradientTape  (0) 2021.01.12

댓글