无法保存自定义Keras模型
我有一个自定义模型,是基于keras.Model类的,我想把它保存起来以后再用。保存模型的时候没有问题,但重新加载同一个模型时却出现了以下错误:
ValueError: 无法给变量 ' dense_4/kernel:0' 赋值:形状不匹配。变量的形状是 (24, 512),而赋值的形状是 (784, 256),这两个形状不兼容。
我使用的是keras文件格式,以及相应的 model.save
和 keras.saving.load_model
方法。我还尝试过 pickle
和 dill
,但都无法保存模型。
在 class Encoder
之前的内容是损失计算的数学部分,可以忽略(可能)但为了功能需要保留。这个模型是一种变分自编码器,训练的数据是MNIST。BaseNet类是先训练的。HigherNet接收来自训练好的BaseNet的编码器和解码器,然后用自己的生成网络再次训练。这个HigherNet被保存下来,而我现在尝试重新加载的就是这个模型。BaseNet本身在HigherNet中已经不再重要,只是作为HigherNet的一部分使用。
以下是一个可以重现错误的工作示例:
import tensorflow as tf
import keras
from keras import layers
import numpy as np
import math
def silverman_rule_of_thumb_normal(N):
return tf.pow((4 / (3 * N)), 0.4)
def pairwise_distances(x, y=None):
if y is None:
y = x
distances_tf = tf.norm(x[:, None] - y, axis=-1) ** 2
return tf.cast(distances_tf, dtype=tf.float64)
def cw_normality(X, y=None):
assert len(X.shape) == 2
D = tf.cast(tf.shape(X)[1], tf.float64)
N = tf.cast(tf.shape(X)[0], tf.float64)
if y is None:
y = silverman_rule_of_thumb_normal(N)
# adjusts for dimensionality; D=2 -> K1=1, D>2 -> K1<1
K1 = 1.0 / (2.0 * D - 3.0)
A1 = pairwise_distances(X)
A = tf.reduce_mean(1 / tf.math.sqrt(y + K1 * A1))
B1 = tf.cast(tf.square(tf.math.reduce_euclidean_norm(X, axis=1)), dtype=tf.float64)
B = 2 * tf.reduce_mean((1 / tf.math.sqrt(y + 0.5 + K1 * B1)))
return (1 / tf.sqrt(1 + y)) + A - B
def phi_sampling(s, D):
return tf.pow(1.0 + 4.0 * s / (2.0 * D - 3), -0.5)
def cw_sampling_lcw(first_sample, second_sample, y):
shape = first_sample.get_shape().as_list()
dim = np.prod(shape[1:])
first_sample = tf.reshape(first_sample, [-1, dim])
shape = second_sample.get_shape().as_list()
dim = np.prod(shape[1:])
second_sample = tf.reshape(second_sample, [-1, dim])
assert len(first_sample.shape) == 2
assert first_sample.shape == second_sample.shape
_, D = first_sample.shape
T = 1.0 / (2.0 * tf.sqrt(math.pi * y))
A0 = pairwise_distances(first_sample)
A = tf.reduce_mean(phi_sampling(A0 / (4 * y), D))
B0 = pairwise_distances(second_sample)
B = tf.reduce_mean(phi_sampling(B0 / (4 * y), D))
C0 = pairwise_distances(first_sample, second_sample)
C = tf.reduce_mean(phi_sampling(C0 / (4 * y), D))
return T * (A + B - 2 * C)
def euclidean_norm_squared(X, axis=None):
return tf.reduce_sum(tf.square(X), axis=axis)
def cw_sampling_silverman(first_sample, second_sample):
stddev = tf.math.reduce_std(second_sample)
N = tf.cast(tf.shape(second_sample)[0], tf.float64)
gamma = silverman_rule_of_thumb_normal(N)
return cw_sampling_lcw(first_sample, second_sample, gamma)
@tf.keras.saving.register_keras_serializable()
class Encoder(keras.Model):
def __init__(self, args, **kwargs):
super().__init__(**kwargs)
self.activation = layers.Activation("relu")
self.flatten = layers.Flatten()
self.dense1 = layers.Dense(256)
self.dense2 = layers.Dense(args["latent_dim"], name="z")
def build(self, **kwargs):
encoder_inputs = keras.Input(shape=(28, 28, 1))
x = self.flatten(encoder_inputs)
x = self.dense1(x)
x = self.activation(x)
z = self.dense2(x)
encoder = keras.Model(encoder_inputs, [z], name="encoder")
return encoder
@tf.keras.saving.register_keras_serializable()
class Decoder(keras.Model):
def __init__(self, args, **kwargs):
super().__init__(**kwargs)
self.latent_dim = args["latent_dim"]
self.activation = layers.Activation("relu")
self.dense1 = layers.Dense(256)
self.dense2 = layers.Dense(28 * 28, activation="sigmoid")
self.reshape = layers.Reshape([28, 28, 1])
def build(self, **kwargs):
latent_inputs = keras.Input(shape=(self.latent_dim,))
x = self.dense1(latent_inputs)
x = self.activation(x)
x = self.dense2(x)
decoder_outputs = self.reshape(x)
decoder = keras.Model(latent_inputs, decoder_outputs, name="encoder")
return decoder
@tf.keras.saving.register_keras_serializable()
class Generator(keras.Model):
def __init__(self, args, **kwargs):
super().__init__(**kwargs)
self.noise_dim = args["noise_dim"]
self.activation = layers.Activation("relu")
self.dense1 = layers.Dense(512)
self.dense2 = layers.Dense(args["latent_dim"], name="z")
def build(self, **kwargs):
noise_inputs = keras.Input(shape=(self.noise_dim,))
x = self.dense1(noise_inputs)
x = self.activation(x)
z = self.dense2(x)
latent_generator = keras.Model(noise_inputs, [z], name="generator")
return latent_generator
@tf.keras.saving.register_keras_serializable()
class BaseNet(keras.Model):
def __init__(self, args, **kwargs):
super(BaseNet, self).__init__(**kwargs)
self.encoder = Encoder(args).build()
self.decoder = Decoder(args).build()
self.args = args
self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
self.reconstruction_loss_tracker = keras.metrics.Mean(
name="cw_reconstruction_loss"
)
self.cw_loss_tracker = keras.metrics.Mean(name="cw_loss")
def get_config(self):
config = {
"args": self.args
}
base_config = super(BaseNet, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
@property
def metrics(self):
return [
self.total_loss_tracker,
self.reconstruction_loss_tracker,
self.cw_loss_tracker,
]
def train_step(self, data):
with tf.GradientTape() as tape:
z = self.encoder(data)
reconstruction = self.decoder(z)
# tf.print(reconstruction)
cw_reconstruction_loss = tf.math.log(
cw_sampling_silverman(data, reconstruction))
lambda_val = 1
cw_loss = lambda_val * tf.math.log(cw_normality(z))
total_loss = cw_reconstruction_loss + cw_loss
grads = tape.gradient(total_loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
self.total_loss_tracker.update_state(total_loss)
self.reconstruction_loss_tracker.update_state(cw_reconstruction_loss)
self.cw_loss_tracker.update_state(cw_loss)
return {
"total_loss": self.total_loss_tracker.result(),
"cw_reconstruction_loss": self.reconstruction_loss_tracker.result(),
"cw_loss": self.cw_loss_tracker.result(),
}
@tf.keras.saving.register_keras_serializable()
class HighNet(keras.Model):
def __init__(self, encoder, decoder, args, **kwargs):
super(HighNet, self).__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder
self.args = args
self.generator = Generator(args).build()
self.reconstruction_loss_tracker = keras.metrics.Mean(
name="cw_reconstruction_loss"
)
def get_config(self):
config = {
"encoder": self.encoder,
"decoder": self.decoder,
"args": self.args
}
base_config = super(HighNet, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def call(self, inputs, **kwargs):
x = self.encoder(inputs)
return self.decoder(x)
@property
def metrics(self):
return [
self.reconstruction_loss_tracker
]
def train_step(self, data):
with tf.GradientTape() as tape:
z = self.encoder(data)
batch_size = tf.shape(z)[0]
noise_np = np.random.normal(0, 1, size=self.args["noise_dim"])
noise_tf = tf.expand_dims(tf.convert_to_tensor(noise_np), axis=0)
noise_tf = tf.repeat(noise_tf, repeats=batch_size, axis=0)
noise_z = self.generator(noise_tf)
# tf.print(reconstruction)
cw_reconstruction_loss = tf.math.log(
cw_sampling_silverman(z, noise_z))
grads = tape.gradient(cw_reconstruction_loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
self.reconstruction_loss_tracker.update_state(cw_reconstruction_loss)
return {
"cw_reconstruction_loss": self.reconstruction_loss_tracker.result()
}
def test_saving():
args = {"sample_amount": 1000,
"latent_dim": 24,
"noise_dim": 24,
"epochs": 1,
"batch_size": 128,
"patience": 3,
"learning_rate": 0.0001}
(x_train, y_train), (x_test, _) = keras.datasets.mnist.load_data()
mnist_digits = np.concatenate([x_train, x_test], axis=0)[0:100]
mnist_digits = np.expand_dims(mnist_digits, -1).astype("float32") / 255
base_model = BaseNet(args)
base_model.compile(optimizer=keras.optimizers.Adam(learning_rate=args["learning_rate"]))
es_callback = keras.callbacks.EarlyStopping(monitor='total_loss', patience=args["patience"], mode="min")
base_model.fit(mnist_digits, epochs=args["epochs"], batch_size=args["batch_size"], callbacks=[es_callback])
model = HighNet(base_model.encoder, base_model.decoder, args)
model.compile(optimizer=keras.optimizers.Adam(learning_rate=args["learning_rate"]))
es2_callback = keras.callbacks.EarlyStopping(monitor='cw_reconstruction_loss', patience=args["patience"],
mode="min")
model.fit(mnist_digits, epochs=args["epochs"], batch_size=args["batch_size"], callbacks=[es2_callback])
model.save("high_model.keras", save_format="keras")
loaded_model = keras.saving.load_model("high_model.keras")
if __name__ == "__main__":
test_saving()
保存后的模型摘要输出:
Model: "encoder"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
input_1 (InputLayer) [(None, 28, 28, 1)] 0
flatten (Flatten) (None, 784) 0
dense (Dense) (None, 256) 200960
activation (Activation) (None, 256) 0
z (Dense) (None, 24) 6168
=================================================================
Total params: 207128 (809.09 KB)
Trainable params: 207128 (809.09 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________
Model: "encoder"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
input_2 (InputLayer) [(None, 24)] 0
dense_1 (Dense) (None, 256) 6400
activation_1 (Activation) (None, 256) 0
dense_2 (Dense) (None, 784) 201488
reshape (Reshape) (None, 28, 28, 1) 0
=================================================================
Total params: 207888 (812.06 KB)
Trainable params: 207888 (812.06 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________
Model: "generator"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
input_3 (InputLayer) [(None, 24)] 0
dense_3 (Dense) (None, 512) 12800
activation_2 (Activation) (None, 512) 0
z (Dense) (None, 24) 12312
=================================================================
Total params: 25112 (98.09 KB)
Trainable params: 25112 (98.09 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________
我不知道为什么这个摘要看起来和我原来的摘要不同,但从原来的摘要中我可以看出,形状不匹配发生在编码器的第一个密集层 'dense_4/kernel:0'。在这里,层是用它们的运行时标签标注的,比如dense_4等等。我不知道我遗漏了什么导致摘要的样式不同,原始模型还有更多的层、批归一化、重用的激活函数等等。尽管如此,错误依然是一样的。我实在想不明白,单纯保存和加载模型怎么会导致形状不兼容的问题。
我使用的是Tensorflow 2.15,Keras 2.15,Python 3.11,并且在Ubuntu 23.10的PyCharm Professional上工作。
1 个回答
在我调试的时候,我把错误信息缩小到:
ValueError: 层 'dense_3' 期望有 0 个变量,但在加载时收到了 2 个变量。期望值: []
这让我觉得很奇怪,因为根本没有叫做 dense_3 的层。我设法构建了一个最小的示例:
import keras
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Activation, Input, BatchNormalization, Dropout, Flatten, Identity
args = {"activation": "relu",
"batch_norm": True}
@keras.saving.register_keras_serializable()
class CustomModel1(Model):
def __init__(self):
super().__init__()
self.dense = Dense(32)
def call(self, inputs):
x = self.dense(inputs)
return x
@keras.saving.register_keras_serializable()
class CustomModel2(Model):
def __init__(self):
super().__init__()
self.dense = Dense(32)
def call(self, inputs):
x = self.dense(inputs)
return x
@keras.saving.register_keras_serializable()
class CustomModel3(Model):
def __init__(self):
super().__init__()
self.net1 = CustomModel1()
self.net2 = CustomModel2()
def call(self, inputs):
z = self.net1(inputs)
x = self.net2(z)
return z, x
def train_step(self, data):
x, y = data
with tf.GradientTape() as tape:
# z, y_pred = self(x) # this fixes it instead
y_pred = self.net2(self.net1(x)) # this line throws the error
loss = self.compiled_loss(y, y_pred)
trainable_vars = self.trainable_weights
gradients = tape.gradient(loss, trainable_vars)
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
self.compiled_metrics.update_state(y, y_pred)
return {m.name: m.result() for m in self.metrics}
# Instantiate the model
model = CustomModel3()
# Compile the model
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
# Create some dummy data for training
x_train = np.random.random((1000, 32))
y_train = np.random.randint(10, size=(1000,))
# Train the model for one epoch
model.fit(x_train, y_train, epochs=1)
# Save the model
model.save('custom_model.keras', save_format='keras')
# Load the model again
loaded_model = tf.keras.models.load_model('custom_model.keras')
# Generate some sample data for prediction
x_sample = np.random.random((10, 32)) # Assuming 10 samples with 32 features each
# Make predictions using the loaded model
predictions = loaded_model.predict(x_sample)
print(predictions)
# Print the predictions
print(model.summary())
在 train_step 里面直接调用 self.net1 和 self.net2 这两个子模型会出现错误,但如果通过更高层模型的 call 方法来调用它们,并返回各自的值,就不会出现错误。