· 5 years ago · May 29, 2020, 05:44 AM
1####################### model_definition.py ###########################
2
3'''
4This python script consists of all function definitions for
5building the transformer model.
6REFERENCES: https://colab.research.google.com/github/tensorflow/examples/blob/master/community/en/transformer_chatbot.ipynb#scrollTo=NPSKnjS-gE_q
7'''
8
9import numpy as np
10import tensorflow as tf
11
12
13# ATTENTION:
14def scaled_dot_product_attention(query, key, value, mask):
15 """Calculate the attention weights. """
16 matmul_qk = tf.matmul(query, key, transpose_b=True) #query * key vector gives us "score" (a scalar)
17
18 # scale matmul_qk
19 depth = tf.cast(tf.shape(key)[-1], tf.float32) #depth is the dimension of the key vector
20 logits = matmul_qk / tf.math.sqrt(depth) #
21
22 # add the mask to zero out padding tokens
23 if mask is not None:
24 logits += (mask * -1e9)
25
26 # softmax is normalized on the last axis (seq_len_k)
27 attention_weights = tf.nn.softmax(logits, axis=-1) #this softmax score determines how much each word is represented at each positions
28
29 output = tf.matmul(attention_weights, value) #getting the weighted value vectors. (weighted thanks to multiplying it with softmax scores)
30
31 return output #this output can be sent to a feedforward network inside the encoder. But these outputs will actually be concatenated with that of the other similiar self-attention layers as this is actually a multi-headed attention layer.
32
33
34
35
36
37class MultiHeadAttention(tf.keras.layers.Layer):
38
39 def __init__(self, d_model, num_heads, name="multi_head_attention"):
40 super(MultiHeadAttention, self).__init__(name=name)
41 self.num_heads = num_heads #how many parelell copies of self attention should there be.
42 self.d_model = d_model #DIMENSION OF MODEL
43
44 assert d_model % self.num_heads == 0
45
46 self.depth = d_model // self.num_heads
47
48 self.query_dense = tf.keras.layers.Dense(units=d_model)
49 self.key_dense = tf.keras.layers.Dense(units=d_model)
50 self.value_dense = tf.keras.layers.Dense(units=d_model)
51
52 self.dense = tf.keras.layers.Dense(units=d_model)
53
54 def split_heads(self, inputs, batch_size):
55 inputs = tf.reshape(
56 inputs, shape=(batch_size, -1, self.num_heads, self.depth))
57 return tf.transpose(inputs, perm=[0, 2, 1, 3])
58
59 def call(self, inputs):
60 query, key, value, mask = inputs['query'], inputs['key'], inputs['value'], inputs['mask']
61 batch_size = tf.shape(query)[0]
62
63 # linear layers
64 query = self.query_dense(query)
65 key = self.key_dense(key)
66 value = self.value_dense(value)
67
68 # split heads
69 query = self.split_heads(query, batch_size)
70 key = self.split_heads(key, batch_size)
71 value = self.split_heads(value, batch_size)
72
73 # scaled dot-product attention
74 scaled_attention = scaled_dot_product_attention(query, key, value, mask)
75
76 scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
77
78 # concatenation of heads
79 concat_attention = tf.reshape(scaled_attention,
80 (batch_size, -1, self.d_model))
81
82 # final linear layer
83 outputs = self.dense(concat_attention)
84
85 return outputs
86
87
88#Mask all the pad tokens (value 0) in the batch to ensure the model does not treat padding as input.
89
90
91def create_padding_mask(x):
92 mask = tf.cast(tf.math.equal(x, 0), tf.float32)
93 # (batch_size, 1, 1, sequence length)
94 return mask[:, tf.newaxis, tf.newaxis, :]
95
96# ^ this returns a mask tensor, which has values 1 in places the input tensors have values 0, that is the places where the input is padded.
97
98
99def create_look_ahead_mask(x):
100 seq_len = tf.shape(x)[1] #returns the length of the current sentence
101 look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
102 padding_mask = create_padding_mask(x)
103 return tf.maximum(look_ahead_mask, padding_mask)
104
105
106###^ this will basically return matrix of the same sentence having values zeroes at each incremental rows.
107
108###NOW WRITING DOWN POSITIONAL ENCODINGS:
109class PositionalEncoding(tf.keras.layers.Layer):
110
111 def __init__(self, position, d_model):
112 super(PositionalEncoding, self).__init__()
113 self.pos_encoding = self.positional_encoding(position, d_model)
114
115 def get_angles(self, position, i, d_model):
116 angles = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
117 return position * angles
118
119 def positional_encoding(self, position, d_model):
120 angle_rads = self.get_angles(
121 position=tf.range(position, dtype=tf.float32)[:, tf.newaxis],
122 i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
123 d_model=d_model)
124 # apply sin to even index in the array
125 sines = tf.math.sin(angle_rads[:, 0::2])
126 # apply cos to odd index in the array
127 cosines = tf.math.cos(angle_rads[:, 1::2])
128
129 pos_encoding = tf.concat([sines, cosines], axis=-1)
130 pos_encoding = pos_encoding[tf.newaxis, ...]
131 return tf.cast(pos_encoding, tf.float32)
132
133 def call(self, inputs):
134 return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]
135
136
137
138'''
139reason for positional encoding formula:
140where pos is the position and i is the dimension. That is, each dimension of the positional encoding corresponds to a sinusoid. The wavelengths form a geometric progression from 2π to 10000·2π. We chose this function because we hypothesized it would allow the model to easily learn to attend by relative positions, since for any fixed offsetk,PE(pos+k) can be represented as a linear function of PEpos.
141'''
142
143
144#### ENCODER LAYER
145def encoder_layer(units, d_model, num_heads, dropout, name="encoder_layer"):
146 inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
147 padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")
148
149 attention = MultiHeadAttention(
150 d_model, num_heads, name="attention")({
151 'query': inputs,
152 'key': inputs,
153 'value': inputs,
154 'mask': padding_mask
155 })
156 # NOTE: this is functional API, Multiheaded attention is of class keras.layers. When this is
157 #called, the call function is automatically invoked abd that is where this dictionary gets input
158 #into as an argument. Functional API's keras.layers's call function always takes an input argument
159 #which is from the previous layers ouput.
160 attention = tf.keras.layers.Dropout(rate=dropout)(attention)
161 attention = tf.keras.layers.LayerNormalization(
162 epsilon=1e-6)(inputs + attention)
163
164 outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention)
165 outputs = tf.keras.layers.Dense(units=d_model)(outputs)
166 outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
167 outputs = tf.keras.layers.LayerNormalization(
168 epsilon=1e-6)(attention + outputs)
169
170 return tf.keras.Model(
171 inputs=[inputs, padding_mask], outputs=outputs, name=name)
172
173
174
175
176#### ENCODER
177def encoder(vocab_size, num_layers, units, d_model, num_heads, dropout, name="encoder"):
178 inputs = tf.keras.Input(shape=(None,), name="inputs")
179 padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")
180
181 embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
182 embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
183 embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
184
185 outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)
186
187 for i in range(num_layers):
188 outputs = encoder_layer(
189 units=units,
190 d_model=d_model,
191 num_heads=num_heads,
192 dropout=dropout,
193 name="encoder_layer_{}".format(i),
194 )([outputs, padding_mask])
195
196 return tf.keras.Model(
197 inputs=[inputs, padding_mask], outputs=outputs, name=name)
198
199
200#### DECODER LAYER
201def decoder_layer(units, d_model, num_heads, dropout, name="decoder_layer"):
202 inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
203 enc_outputs = tf.keras.Input(shape=(None, d_model), name="encoder_outputs")
204 look_ahead_mask = tf.keras.Input(shape=(1, None, None), name="look_ahead_mask")
205 padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')
206
207 attention1 = MultiHeadAttention(
208 d_model, num_heads, name="attention_1")(inputs={
209 'query': inputs,
210 'key': inputs,
211 'value': inputs,
212 'mask': look_ahead_mask
213 })
214 attention1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention1 + inputs)
215
216 attention2 = MultiHeadAttention(
217 d_model, num_heads, name="attention_2")(inputs={
218 'query': attention1,
219 'key': enc_outputs,
220 'value': enc_outputs,
221 'mask': padding_mask
222 })
223 attention2 = tf.keras.layers.Dropout(rate=dropout)(attention2)
224 attention2 = tf.keras.layers.LayerNormalization(
225 epsilon=1e-6)(attention2 + attention1)
226
227 outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention2)
228 outputs = tf.keras.layers.Dense(units=d_model)(outputs)
229 outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
230 outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(outputs + attention2)
231
232 return tf.keras.Model(
233 inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
234 outputs=outputs,
235 name=name)
236
237
238#### DECODER
239def decoder(vocab_size,
240 num_layers,
241 units,
242 d_model,
243 num_heads,
244 dropout,
245 name='decoder'):
246 inputs = tf.keras.Input(shape=(None,), name='inputs')
247 enc_outputs = tf.keras.Input(shape=(None, d_model), name='encoder_outputs')
248 look_ahead_mask = tf.keras.Input(shape=(1, None, None), name='look_ahead_mask')
249 padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')
250
251 embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
252 embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
253 embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
254
255 outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)
256
257 for i in range(num_layers):
258 outputs = decoder_layer(
259 units=units,
260 d_model=d_model,
261 num_heads=num_heads,
262 dropout=dropout,
263 name='decoder_layer_{}'.format(i),
264 )(inputs=[outputs, enc_outputs, look_ahead_mask, padding_mask])
265
266 return tf.keras.Model(
267 inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
268 outputs=outputs,
269 name=name)
270
271
272
273
274#### TRANSFORMER
275def transformer(vocab_size,
276 num_layers,
277 units,
278 d_model,
279 num_heads,
280 dropout,
281 name="transformer"):
282 inputs = tf.keras.Input(shape=(None,), name="inputs")
283 dec_inputs = tf.keras.Input(shape=(None,), name="dec_inputs")
284
285 enc_padding_mask = tf.keras.layers.Lambda(
286 create_padding_mask,
287 output_shape=(1, 1, None),
288 name='enc_padding_mask')(inputs)
289 # mask the future tokens for decoder inputs at the 1st attention block
290 look_ahead_mask = tf.keras.layers.Lambda(
291 create_look_ahead_mask,
292 output_shape=(1, None, None),
293 name='look_ahead_mask')(dec_inputs)
294 # mask the encoder outputs for the 2nd attention block
295 dec_padding_mask = tf.keras.layers.Lambda(
296 create_padding_mask,
297 output_shape=(1, 1, None),
298 name='dec_padding_mask')(inputs)
299
300 enc_outputs = encoder(
301 vocab_size=vocab_size,
302 num_layers=num_layers,
303 units=units,
304 d_model=d_model,
305 num_heads=num_heads,
306 dropout=dropout)(inputs=[inputs, enc_padding_mask])
307
308 dec_outputs = decoder(
309 vocab_size=vocab_size,
310 num_layers=num_layers,
311 units=units,
312 d_model=d_model,
313 num_heads=num_heads,
314 dropout=dropout)(inputs=[dec_inputs, enc_outputs, look_ahead_mask, dec_padding_mask])
315
316 outputs = tf.keras.layers.Dense(units=vocab_size, name="outputs")(dec_outputs)
317
318 return tf.keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)