我正在学习 Tensorflow TF 教程 (https://www.tensorflow.org/text/tutorials/transformer),但使用的是我自己的数据。我的数据与文本无关,但它是带有开头的标记序列
我正在学习 Tensorflow TF 教程( https://www.tensorflow.org/text/tutorials/transformer ),但使用的是我自己的数据。我的数据与文本无关,但它是带有起始标记和终止标记的标记序列。所有标记从 0 到 30(起始标记是 31,终止标记是 32)。序列的长度为 64(加上起始和终止标记,总共 66)。序列如下所示:
tf.Tensor(
[31 10 10 10 10 18 10 19 27 22 5 19 10 10 10 10 10 19 10 19 10 1 1 20
22 15 12 26 14 22 17 3 10 14 22 9 25 25 20 7 19 28 4 7 15 14 13 25
21 15 15 17 14 18 14 14 14 27 14 19 25 19 5 3 17 32], shape=(66,), dtype=int32)
我的代码与教程中的代码非常相似,只有很小的变化:
import pickle, os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from keras import layers
from src.utils.callbacks import VQTransCallback
from src.models.layers import GlobalSelfAttention, CrossAttention, CausalSelfAttention
from src.models.layers import TransformerFeedForward as FeedForward
class PositionalEmbedding(tf.keras.layers.Layer):
def __init__(self, vocab_size, d_model):
super().__init__()
self.d_model = d_model
self.embedding = layers.Embedding(vocab_size, d_model, mask_zero=True)
self.pos_encoding = self.positional_encoding(length=66, depth=d_model)
def compute_mask(self, *args, **kwargs):
return self.embedding.compute_mask(*args, **kwargs)
def positional_encoding(self, length, depth):
depth = depth/2
positions = np.arange(length)[:, np.newaxis] # (seq, 1)
depths = np.arange(depth)[np.newaxis, :]/depth # (1, depth)
angle_rates = 1 / (10000**depths) # (1, depth)
angle_rads = positions * angle_rates # (pos, depth)
pos_encoding = np.concatenate(
[np.sin(angle_rads), np.cos(angle_rads)],
axis=-1)
return tf.cast(pos_encoding, dtype=tf.float32)
def call(self, x):
length = tf.shape(x)[1]
x = self.embedding(x)
# This factor sets the relative scale of the embedding and positonal_encoding.
x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
x = x + self.pos_encoding[tf.newaxis, :length, :]
return x
class EncoderLayer(tf.keras.layers.Layer):
def __init__(self,*, d_model, num_heads, dff, dropout_rate=0.1):
super().__init__()
self.self_attention = GlobalSelfAttention(
num_heads=num_heads,
key_dim=d_model,
dropout=dropout_rate)
self.ffn = FeedForward(d_model, dff)
self.supports_masking = True
def call(self, x):
x = self.self_attention(x)
x = self.ffn(x)
return x
class Encoder(tf.keras.layers.Layer):
def __init__(self, *, num_layers, d_model, num_heads,
dff, vocab_size, dropout_rate=0.1):
super().__init__()
self.d_model = d_model
self.num_layers = num_layers
self.pos_embedding = PositionalEmbedding(vocab_size=vocab_size, d_model=d_model)
self.enc_layers = [
EncoderLayer(d_model=d_model,
num_heads=num_heads,
dff=dff,
dropout_rate=dropout_rate)
for _ in range(num_layers)]
self.dropout = tf.keras.layers.Dropout(dropout_rate)
self.supports_masking = True
def call(self, x):
x = self.pos_embedding(x) # Shape `(batch_size, d_model)`.
x = self.dropout(x)
for i in range(self.num_layers):
x = self.enc_layers[i](x)
return x # Shape `(batch_size, seq_len, d_model)`.
class DecoderLayer(tf.keras.layers.Layer):
def __init__(self, *, d_model, num_heads, dff, dropout_rate=0.1):
super(DecoderLayer, self).__init__()
self.causal_self_attention = CausalSelfAttention(
num_heads=num_heads,
key_dim=d_model,
dropout=dropout_rate)
self.cross_attention = CrossAttention(
num_heads=num_heads,
key_dim=d_model,
dropout=dropout_rate)
self.ffn = FeedForward(d_model, dff)
self.supports_masking = True
def call(self, x, context):
x = self.causal_self_attention(x=x)
x = self.cross_attention(x=x, context=context)
x = self.ffn(x) # Shape `(batch_size, seq_len, d_model)`.
return x
class Decoder(tf.keras.layers.Layer):
def __init__(self, *, num_layers, d_model, num_heads, dff, vocab_size,
dropout_rate=0.1):
super(Decoder, self).__init__()
self.d_model = d_model
self.num_layers = num_layers
self.pos_embedding = PositionalEmbedding(vocab_size=vocab_size, d_model=d_model)
self.dropout = tf.keras.layers.Dropout(dropout_rate)
self.dec_layers = [
DecoderLayer(d_model=d_model, num_heads=num_heads,
dff=dff, dropout_rate=dropout_rate)
for _ in range(num_layers)]
self.supports_masking = True
def call(self, x, context):
x = self.pos_embedding(x) # (batch_size, target_seq_len, d_model)
x = self.dropout(x)
for i in range(self.num_layers):
x = self.dec_layers[i](x, context)
return x
class VQVAE2Transformer(tf.keras.Model):
def __init__(self, *, enc_num_layers, dec_num_layers, d_model, num_heads, dff,
vocab_size, codebook_length, dropout_rate=0.1):
super().__init__()
self.encoder = Encoder(num_layers=enc_num_layers, d_model=d_model,
num_heads=num_heads, dff=dff, vocab_size=vocab_size,
dropout_rate=dropout_rate)
self.decoder = Decoder(num_layers=dec_num_layers, d_model=d_model,
num_heads=num_heads, dff=dff, vocab_size=vocab_size,
dropout_rate=dropout_rate)
self.enc_num_layers = enc_num_layers
self.dec_num_layers = dec_num_layers
self.d_model = d_model
self.num_heads = num_heads
self.dff = dff
self.vocab_size = vocab_size + 3 + 1 # +3 start, end and mask
self.codebook_length = codebook_length + 2 # +2 start and end tokens
self.start_token = vocab_size+1
self.end_token = vocab_size+2
self.final_layer = tf.keras.layers.Dense(self.vocab_size)
self.supports_masking = True
def call(self, inputs):
enc_in, dec_in = inputs[0], inputs[1]
context = self.encoder(enc_in) # (batch_size, context_len, d_model)
x = self.decoder(dec_in, context) # (batch_size, target_len, d_model)
# Final linear layer output.
logits = self.final_layer(x) # (batch_size, target_len, latent_size)
try:
# Drop the keras mask, so it doesn't scale the losses/metrics.
# b/250038731
del logits._keras_mask
except AttributeError:
pass
# Return the final output and the attention weights.
return logits
def accuracy(self, label, pred):
pred = tf.argmax(pred, axis=2)
label = tf.cast(label, pred.dtype)
match = label == pred
match = tf.cast(match, dtype=tf.float32)
return tf.reduce_sum(match) / self.codebook_length
def compile_model(self):
optimizer = keras.optimizers.AdamW()
scce = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True)
self.compile(optimizer=optimizer,
loss=scce,
metrics=["accuracy"])
def save_build(self, folder):
"""Saves the config before the training starts. The model itself will be saved
later on using keras checkpoints.
Args:
folder: Where to save the config parameters
"""
if not os.path.exists(folder):
os.makedirs(folder)
os.makedirs(os.path.join(folder, 'weights'))
with open(os.path.join(folder, 'params.pkl'), 'wb') as f:
pickle.dump([
self.enc_num_layers,
self.dec_num_layers,
self.d_model,
self.num_heads,
self.dff,
self.vocab_size,
], f)
def train_step(self, batch):
"""Processes one batch inside model.fit()."""
enc_in, dec_in = batch[0][:,:66], batch[0][:,66:]
dec_input = dec_in[:, :-1]
dec_target = dec_in[:, 1:]
with tf.GradientTape() as tape:
preds = self([enc_in, dec_input])
loss = self.compute_loss(None, dec_target, preds)
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update the metrics
self.compiled_metrics.update_state(dec_target, preds)
return {m.name: m.result() for m in self.metrics}
def test_step(self, batch):
enc_in, dec_in = batch[0][:,:66], batch[0][:,66:]
dec_input = dec_in[:, :-1]
dec_target = dec_in[:, 1:]
preds = self([enc_in, dec_input])
self.compiled_metrics.update_state(dec_target, preds)
return {m.name: m.result() for m in self.metrics}
def train(
self, train_dataset, valid_dataset, epochs, run_folder, initial_epoch=0,
print_every_n_epochs=5
):
test_batch = next(valid_dataset.dataset_iter)
display_cb = VQTransCallback(test_batch, run_folder)
checkpoint_filepath = os.path.join(
run_folder, "weights/{epoch:03d}-{loss:.5f}-{val_loss:.5f}.weights.h5")
checkpoint1 = keras.callbacks.ModelCheckpoint(
checkpoint_filepath, save_weights_only=True, save_best_only=True)
checkpoint2 = keras.callbacks.ModelCheckpoint(
os.path.join(run_folder, 'weights/last.weights.h5'),
save_weights_only=True, save_best_only=True)
callbacks_list = [checkpoint1, checkpoint2, display_cb]
self.fit(
train_dataset.dataset, validation_data=valid_dataset.dataset,
epochs=epochs, initial_epoch=initial_epoch, callbacks=callbacks_list,
#steps_per_epoch=1000, validation_steps=1000
)
def generate(self, enc_in, dec_in, startid=0):
"""Performs inference over one batch of inputs."""
bs = tf.shape(enc_in)[0]
# scce = tf.keras.losses.SparseCategoricalCrossentropy(
# from_logits=True)
# logits = self([enc_in, dec_in[:,:-1]])
# print("SCCE LOSS")
# print(scce(dec_in[:,1:],logits))
if startid == 0:
dec_in = tf.ones((bs, 1), dtype=tf.int32) * self.start_token
else:
dec_in = tf.cast(dec_in[:,:startid], dtype=tf.int32)
for _ in range(self.codebook_length - startid):
logits = self([enc_in, dec_in])
logits = tf.argmax(logits, axis=-1, output_type=tf.int32)
last_logit = tf.expand_dims(logits[:, -1], axis=-1)
dec_in = tf.concat([dec_in, last_logit], axis=-1)
return dec_in
我使用教程中的稀疏交叉熵。
我遇到了一个问题,训练过程中显示的损失变为负值。例如现在显示为 -2.0593。
我正在使用稀疏交叉熵。当我使用回调监控某些测试批次的损失时,返回的值永远不会为负数,但通常是 1.5 到 2 之间的某个值。
如您所见,Transformer 中的最后一层是没有激活函数的密集层(如教程中所示),因此在损失中我设置了“from_logit=True”。我尝试在最后一层使用 softmax 激活,然后设置“from_logit=False”,但这似乎无法训练,并且它卡在损失 0.274 左右,并且永远不会移动。
我不知道为什么损失会变成负数,因为当我测试损失函数时它似乎总是输出正数。
总体来说训练的效果也不太好。