无法保存在Keras中使用TensorFlow构建的自定义变分自编码器模型

1 投票
1 回答
27 浏览
提问于 2025-04-12 12:17

Keras 2.10.0

我正在尝试保存我在Keras和Tensorflow中训练好的变分自编码器模型,但我遇到了问题,无法保存这个模型。我该怎么做才能保存它呢?

这里附上了我正在运行的基础模型的Colab链接。

https://colab.research.google.com/github/MaxIG1/var_auto_for_synth_data/blob/saving_branch/notebooks/var_auto_working.ipynb

它产生的错误信息如下:

ValueError: 模型 <__main__.VAE object at 0x00000278F0A96E60> 无法保存,可能是因为输入形状不可用,或者模型的前向传播没有定义。要定义前向传播,请重写 Model.call()。要指定输入形状,可以直接调用 build(input_shape),或者使用实际数据调用模型,比如 Model()Model.fit()Model.predict()。如果你有自定义的训练步骤,请确保在训练步骤中通过 Model.__call__ 调用前向传播,也就是 model(inputs),而不是 model.call()

此外,如果你不想打开Colab,我在这里添加了主要的代码行。

任何帮助都将非常感谢。

import numpy as np
from tensorflow import keras
from keras.layers import Input, Dense, Lambda
import tensorflow as tf
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from keras import backend as K
import matplotlib.pyplot as plt
import numpy as np



# Creating fake data to replicate
# Define the desired number of samples for each class
class_samples = [2000, 3000, 2500, 1500]  # Adjust these numbers as needed

# Calculate weights based on the desired number of samples
class_weights = [num_samples / sum(class_samples) for num_samples in class_samples]

# Generate a synthetic dataset with different numbers of samples for each class
X, y = make_classification(
    n_samples=sum(class_samples),
    n_features=4,
    n_informative=4,
    n_redundant=0,
    n_classes=4,
    weights=class_weights,
    random_state=42,
)
columns = ["Feature_1", "Feature_2", "Feature_3", "Feature_4"]
synthetic_df = pd.DataFrame(data=X, columns=columns)
for column in synthetic_df:
    std = np.std(synthetic_df[column])
    mean = np.mean(synthetic_df[column])
    synthetic_df[column] = synthetic_df[column]-mean
    synthetic_df[column] = synthetic_df[column]/std
synthetic_df["target"] = y
synthetic_array =synthetic_df.values

# Defining the sampling layer that is also the call
class Sampling(keras.layers.Layer):

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

# Encoder
latent_dim = 10
encoder_inputs = Input(shape=(5), name="input_layer")

n_x = 50
x = Dense(n_x, activation="relu", name="h1")(encoder_inputs)

# Split x into two halves
half_size = n_x // 2
x3_first_half = Lambda(lambda x: x[:, :half_size], name="select_z_mean")(x)
x3_second_half = Lambda(lambda x: x[:, half_size:], name="select_z_var")(x)

z_mean = Dense(latent_dim, name="z_mean")(x3_first_half)
z_log_var = Dense(latent_dim, name="z_log_var")(x3_second_half)
z = Sampling(name="Sampling")([z_mean, z_log_var])

encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
# Decoder
# Decoder
latent_inputs = keras.Input(shape=(latent_dim,))
n_x = 30

x = Dense(n_x, activation="relu", name="h4")(latent_inputs)

cont_decoder_outputs = Dense(4, activation="linear", name="cont_decoder_output")(x)
class_decoder_output = Dense(4, activation="softmax", name="classification_output")(x)

decoder = keras.Model(latent_inputs, [cont_decoder_outputs, class_decoder_output], name="decoder")
decoder.summary()
# Custom VAE
class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
        ]

    def train_step(self, data):
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction_cont, reconstruction_class = self.decoder(z)

            data_cont = data[
                :, :4
            ]  # Assuming the first 4 columns are for continuous variables
            data_class = data[:, 4:]  # Assuming the last column is for classification

            # Reconstruction loss for continuous outputs
            reconstruction_loss_cont = keras.losses.mean_squared_error(
                data_cont, reconstruction_cont
            )

            # Reconstruction loss for classification output
            reconstruction_loss_class = keras.losses.sparse_categorical_crossentropy(
                data_class, reconstruction_class
            )

            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(kl_loss, axis=1)

            # Combine losses
            total_loss = (
                0.8*reconstruction_loss_cont + 0.2*reconstruction_loss_class + kl_loss
            )

        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)

        return {
            "loss": self.total_loss_tracker.result(),
        }

    def get_config(self):
        config = super(VAE, self).get_config()
        config.update(
            {
                "encoder": self.encoder,
                "decoder": self.decoder,
            }
        )
        return config

# Compile and Train
vae = VAE(encoder, decoder)
vae.compile(optimizer=keras.optimizers.Adam())
vae.fit(
    synthetic_array,
    epochs=1,
    batch_size=16,
)

# Save in TensorFlow SavedModel format
vae.save("path_to_save_model_tf")

1 个回答

0

这个错误信息告诉你该怎么做,虽然说得有点拗口。

首先,你需要为你的模型定义一个 call 函数。把这个添加到 VAE 类里面:

def call(self, inputs):
    return self.decoder(self.encoder(inputs)[2])

这样做是为了让 Keras 知道这个模型到底是要干什么。仅仅调用 fit 还不够,因为你重写了 train_step,它实际上并没有调用 vae 模型本身,只是调用了它的组成部分(编码器和解码器)。

所以我们还得实际调用一次模型,这样它才能有一个明确的输入形状。在你定义完模型之后,但在你尝试保存之前,随便加上这一行:

vae(synthetic_array[:1])

这行代码只是对一个数据点调用模型,以定义输入和输出的形状。之后,保存和加载这个模型就都能正常工作了。请注意,我是在 Colab 上测试的,它使用的是更新的 2.15 版本。我觉得他们在这段时间对模型的保存做了一些改动,所以在旧版本上可能不太好使。

撰写回答