无法序列化对象:AttributeError:“内置函数”或“方法”对象没有属性“代码”

2024-04-18 22:30:34 发布

您现在位置:Python中文网/ 问答频道 /正文

我用python中的tensorflow训练了一个DNN分类器模型。现在我想把它加载到pyspark中,并使用这个模型来预测每个RDD记录的性别。首先按照训练模型构建张量流图,然后加载训练模型并尝试预测RDD的每一行:

"""
code to generate the tensorflow graph omitted
"""

with tf.Session(graph=graph) as sess:
    # load the trained model
    saver.restore(sess, "./nonClass_gender")
    # lib is the RDD, each Row has the form of Row(key = ..., values = ..., indcies =..., shape = ...)
    predictions_1 = lib.map(lambda e: Row(key = e["key"], 
    prob = y_proba.eval(feed_dict={values: e["values"], 
    indices: e["indices"], shape: [1,2318]})))
    predictions_1.take(5)

注意,在RDD中,每一行的形式都是row(key=…,values=…,indices=…,shape=…)。值、指数和形状与此答案中的值、指数和密集形状相等: Use coo_matrix in TensorFlow。它们用于生成sparsetenservalue。不同的是,在我的代码中,每行将生成一个sparsetenservalue。在

那么我有以下错误:

^{pr2}$

在上面的代码中,如果我将prob = y_proba.eval(feed_dict={values: e["values"], indices: e["indices"], shape: [1,2318]})))更改为一个python定义的函数,比如proba = test(e["values"],e["indices"], [1,2318]),它就可以工作了。另外,如果我在python中使用y_proba.eval(而不是在RDD映射中),它也可以工作。在


Tags: thekey模型libtensorflowevalgraphrow
2条回答

感谢@user8371915,从他的回答和这个相关主题得到启发:Transform map to mapPartition using pyspark,我可以完成任务。解决方案的关键是在mapPartitions使用的函数内部而不是函数外部构建tensoflow图。以下是有效的代码:

def predict(rows,worker_session_path):

    n_inputs = 2318 # the second dimension of the input sparse matrix X
    n_hidden1 = 200 # first hidden layer neuron 
    n_hidden2 = 20 # second hidden layer neuron 
    n_outputs = 2 # binary classification
    # build the graph as in the training model
    graph = tf.Graph()
    with graph.as_default():
        # for sparse tensor X
        values = tf.placeholder(tf.float32) 
        indices = tf.placeholder(tf.int64)
        shape = tf.placeholder(tf.int64)

        y = tf.placeholder(tf.int32, shape=(None), name="y")

        training = tf.placeholder_with_default(False, shape=(), name='training')

        with tf.name_scope("dnn"):
            hidden1 = first_layer(values, indices, shape, n_hidden1, name="hidden1", 
                                  activation=tf.nn.relu, n_inputs = n_inputs)
            hidden1_drop = tf.layers.dropout(hidden1, dropout_rate, training=training)
            hidden2 = neuron_layer(hidden1_drop, n_hidden2, name="hidden2",
                                   activation=tf.nn.relu)
            hidden2_drop = tf.layers.dropout(hidden2, dropout_rate, training=training)
            logits = neuron_layer(hidden2_drop, n_outputs, name="outputs")
            y_proba = tf.nn.softmax(logits)

        saver = tf.train.Saver()

    with tf.Session(graph=graph) as sess:
        saver.restore(sess, worker_session_path)
        for e in rows:
            proba = sess.run(y_proba, feed_dict={indices:e["indices"], 
                                             values:e["values"], shape: [1,2318]})
            # np.squeeze convet proba shape from (1,2) to (2,)
            yield(Row(key = e['key'], proba = np.squeeze(proba)))

lib2 = lib.mapPartitions(lambda rows: predict(rows, "./nonClass_gender"))
lib2.take(5)
  • 将模型分发到每台计算机(您可以使用SparkFiles)。在
  • 重写器

    def predict(rows, worker_session_path):
        with tf.Session(graph=graph) as sess:
            # load the trained model
            saver.restore(sess, worker_session_path)
            # lib is the RDD, each Row has the form of Row(key = ..., values = ..., indcies =..., shape = ...)
            return map(lambda e: Row(key = e["key"], 
                prob = y_proba.eval(feed_dict={values: e["values"], 
                indices: e["indices"], shape: [1,2318]})), rows)
    
  • mapPartitions一起使用

    lib.mapPartitions(lambda rows: predict(rows, worker_session_path))
    

相关问题 更多 >