· 4 years ago · Apr 08, 2021, 10:56 PM
1#!/usr/bin/env python3
2import argparse
3import datetime
4import os
5import re
6os.environ.setdefault("TF_CPP_MIN_LOG_LEVEL", "2") # Report only TF errors by default
7
8import numpy as np
9import tensorflow as tf
10
11from mnist import MNIST
12
13parser = argparse.ArgumentParser()
14# These arguments will be set appropriately by ReCodEx, even if you change them.
15parser.add_argument("--batch_size", default=50, type=int, help="Batch size.")
16parser.add_argument("--cnn", default='F,H-100', type=str, help="CNN architecture.")
17parser.add_argument("--epochs", default=5, type=int, help="Number of epochs.")
18parser.add_argument("--recodex", default=False, action="store_true", help="Evaluation in ReCodEx.")
19parser.add_argument("--seed", default=42, type=int, help="Random seed.")
20parser.add_argument("--threads", default=1, type=int, help="Maximum number of threads to use.")
21# If you add more arguments, ReCodEx will keep them with your default values.
22parser.add_argument("--hidden_layers", default=128, nargs="*", type=int, help="Hidden layer sizes.")
23
24# The neural network model
25class Network(tf.keras.Model):
26
27 def add_convolution_layer(self, my_hidden, argumenty):
28 my_hidden = tf.keras.layers.Conv2D(
29 filters=int(argumenty[1]), kernel_size=int(argumenty[2]), strides=int(argumenty[3]), padding=argumenty[4],
30 activation=tf.nn.relu,)(my_hidden)
31 return my_hidden
32
33 def add_convolution_layer_BN(self, my_hidden, argumenty):
34 my_hidden = tf.keras.layers.Conv2D(
35 filters=int(argumenty[1]), kernel_size=int(argumenty[2]), strides=int(argumenty[3]), padding=argumenty[4],
36 activation=None, use_bias=False)(my_hidden)
37 my_hidden = tf.keras.layers.BatchNormalization()(my_hidden)
38 my_hidden = tf.keras.activations.relu(my_hidden)
39 return my_hidden
40
41 def add_max_pooling(self, my_hidden, argumenty):
42 return tf.keras.layers.MaxPool2D(pool_size=int(argumenty[1]), strides=int(argumenty[2]))(my_hidden)
43
44
45 def __init__(self, args):
46 # TODO: Create the model. The template uses functional API, but
47 # feel free to use subclassing if you want.
48 inputs = tf.keras.layers.Input(shape=[MNIST.H, MNIST.W, MNIST.C])
49 # TODO: Add CNN layers specified by `args.cnn`, which contains
50 # comma-separated list of the following layers:
51 # - `C-filters-kernel_size-stride-padding`: Add a convolutional layer with ReLU
52 # activation and specified number of filters, kernel size, stride and padding.
53 # - `CB-filters-kernel_size-stride-padding`: Same as `C`, but use batch normalization.
54 # In detail, start with a convolutional layer without bias and activation,
55 # then add batch normalization layer, and finally ReLU activation.
56 # - `M-pool_size-stride`: Add max pooling with specified size and stride, using
57 # the default "valid" padding.
58 # - `R-[layers]`: Add a residual connection. The `layers` contain a specification
59 # of at least one convolutional layer (but not a recursive residual connection `R`).
60 # The input to the `R` layer should be processed sequentially by `layers`, and the
61 # produced output (after the ReLU nonlinearty of the last layer) should be added
62 # to the input (of this `R` layer).
63 # - `F`: Flatten inputs. Must appear exactly once in the architecture.
64 # - `H-hidden_layer_size`: Add a dense layer with ReLU activation and specified size.
65 # - `D-dropout_rate`: Apply dropout with the given dropout rate.
66 # You can assume the resulting network is valid; it is fine to crash if it is not.
67 #
68 # Produce the results in variable `hidden`.
69 #'CB-8-3-5-valid,R-[CB-8-3-1-same,CB-8-3-1-same],F,H-50'
70 new_str = args.cnn
71 if 'R' in args.cnn:
72 my_str = args.cnn[args.cnn.index('R')+3 : args.cnn.index(']')]
73 new_str = args.cnn[:args.cnn.index('R')+1] + args.cnn[args.cnn.index(']')+1:]
74 cnn_retezce = new_str.split(',')
75
76 hidden = inputs
77 for i in cnn_retezce:
78 v_listu = i.split('-')
79 if v_listu[0] == 'C':
80 #hidden = tf.keras.layers.Conv2D(
81 #filters=int(v_listu[1]), kernel_size=int(v_listu[2]), strides=int(args.cnn[3]), padding=args.cnn[4],
82 #activation=tf.nn.relu,)(self.my_hidden)
83 hidden = self.add_convolution_layer(hidden, v_listu)
84 elif v_listu[0] == 'CB':
85 hidden = self.add_convolution_layer_BN(hidden, v_listu)
86 elif v_listu[0] == 'M':
87 hidden = self.add_max_pooling(hidden, v_listu)
88 elif v_listu[0] == 'R':
89 retezce = my_str.split(',')
90 temp = hidden
91
92 for j in retezce:
93 v_listu = j.split('-')
94 if v_listu[0] == 'C':
95 hidden = self.add_convolution_layer(hidden, v_listu)
96 elif v_listu[0] == 'CB':
97 hidden = self.add_convolution_layer_BN(hidden, v_listu)
98 hidden += temp
99 elif v_listu[0] == 'F':
100 hidden = tf.keras.layers.Flatten()(hidden)
101 elif v_listu[0] == 'H':
102 hidden = tf.keras.layers.Dense(int(v_listu[1]),activation=tf.nn.relu)(hidden)
103 elif v_listu[0] == 'D':
104 hidden = tf.keras.layers.Dropout(0.5)(hidden)
105
106 # Add the final output layer
107 outputs = tf.keras.layers.Dense(MNIST.LABELS, activation=tf.nn.softmax)(hidden)
108
109 super().__init__(inputs=inputs, outputs=outputs)
110 self.compile(
111 optimizer=tf.optimizers.Adam(),
112 loss=tf.losses.SparseCategoricalCrossentropy(),
113 metrics=[tf.metrics.SparseCategoricalAccuracy(name="accuracy")],
114 )
115 self.tb_callback = tf.keras.callbacks.TensorBoard(args.logdir, histogram_freq=1, update_freq=100, profile_batch=0)
116 self.tb_callback._close_writers = lambda: None # A hack allowing to keep the writers open.
117
118def main(args):
119 # Fix random seeds and threads
120 np.random.seed(args.seed)
121 tf.random.set_seed(args.seed)
122 tf.config.threading.set_inter_op_parallelism_threads(args.threads)
123 tf.config.threading.set_intra_op_parallelism_threads(args.threads)
124 if args.recodex:
125 tf.keras.utils.get_custom_objects()["glorot_uniform"] = tf.initializers.GlorotUniform(seed=args.seed)
126 tf.keras.utils.get_custom_objects()["orthogonal"] = tf.initializers.Orthogonal(seed=args.seed)
127 tf.keras.utils.get_custom_objects()["uniform"] = tf.initializers.RandomUniform(seed=args.seed)
128
129 # Create logdir name
130 args.logdir = os.path.join("logs", "{}-{}-{}".format(
131 os.path.basename(globals().get("__file__", "notebook")),
132 datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S"),
133 ",".join(("{}={}".format(re.sub("(.)[^_]*_?", r"\1", key), value) for key, value in sorted(vars(args).items())))
134 ))
135
136 # Load the data
137 mnist = MNIST()
138
139 # Create the network and train
140 network = Network(args)
141 network.fit(
142 mnist.train.data["images"], mnist.train.data["labels"],
143 batch_size=args.batch_size, epochs=args.epochs,
144 validation_data=(mnist.dev.data["images"], mnist.dev.data["labels"]),
145 callbacks=[network.tb_callback],
146 )
147
148 # Compute test set accuracy and return it
149 test_logs = network.evaluate(
150 mnist.test.data["images"], mnist.test.data["labels"], batch_size=args.batch_size, return_dict=True,
151 )
152 network.tb_callback.on_epoch_end(args.epochs, {"val_test_" + metric: value for metric, value in test_logs.items()})
153
154 return test_logs["accuracy"]
155
156if __name__ == "__main__":
157 args = parser.parse_args([] if "__file__" not in globals() else None)
158 main(args)
159