DTeam 团队日志

Doer、Delivery、Dream

使用 TensorFlow 实现简单的前馈网络

胡键 Posted at — Nov 29, 2019 阅读

之前曾经在这篇文章里提到,实现迷你原型有助于理解框架和原理背后的运作机制。

那么,理解深度学习,让我们从最简单的前馈网络的迷你实现开始。所谓前馈网络,它是一种不存在循环的网络结构。最简单的“前馈网络”就是全部用 Dense 层连接起来的多层单向网络。一个简单的例子如下:

model = keras.models.Sequential([
    keras.layers.Dense(32, activation="relu", input_shape=X_train.shape[1:]),
    keras.layers.Dense(8, activation="relu"),
    keras.layers.Dense(1)
])

本文将以此网络结构为例,展示如何使用 TensorFlow 来实现它(不依赖 Keras ),同时使用一个简单的数据集来看看其效果。

注:初学者可能搞不清楚 TF 和 Keras 之间的关系,那不妨简单地将 TF 类比于 Numpy ,而 Keras 类比于 SciKit-Learn 。

使用 Keras 的做法

在实际开始准备自己造轮子之前,先参考一个 Keras 的做法,将其视为标准解法模仿实现。这里要解决的问题是使用神经网络解决回归问题,数据集来自 SciKit-Learn 的“加州房价”。废话少说,直接看代码吧。

from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow import keras

# 获取数据集 
housing = fetch_california_housing()

# 预处理、划分数据集、归一化
X_train_full, X_test, y_train_full, y_test = train_test_split(housing.data, housing.target)
X_train, X_valid, y_train, y_valid = train_test_split(X_train_full, y_train_full)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_valid = scaler.transform(X_valid)
X_test = scaler.transform(X_test)

# 搭建全连接网络,因为是回归问题,输出层不需要激活函数
model = keras.models.Sequential([
    keras.layers.Dense(32, activation="relu", input_shape=X_train.shape[1:]),
    keras.layers.Dense(8, activation="relu"),
    keras.layers.Dense(1)
])

model.compile(loss="mean_squared_error", optimizer="sgd")

# 训练
model.fit(X_train, y_train, epochs=30, validation_data=(X_valid, y_valid))

# 预测
model.predict(X_test[:3])

就本文的目标来讲,关键部分就是实现构建模型部分的那段代码。

使用 TF 实现自定义结构

理论上,不采用 TF 同样也可以自行实现前馈网络。但出于以下的考虑本文选择基于 TF 来实做:

对于第二点,TensorFlow 2.0 + Keras Overview for Deep Learning Researchers 已经有所言及:

  • If you’re an engineer, Keras provides you with reusable blocks such as layers, metrics, training loops, to support common use cases. It provides a high-level user experience that’s accessible and productive.
  • If you’re a researcher, you may prefer not to use these built-in blocks such as layers and training loops, and instead create your own. Of course, Keras allows you to do this. In this case, Keras provides you with templates for the blocks you write, it provides you with structure, with an API standard for things like Layers and Metrics. This structure makes your code easy to share with others and easy to integrate in production workflows.
  • The same is true for library developers: TensorFlow is a large ecosystem. It has many different libraries. In order for different libraries to be able to talk to each other and share components, they need to follow an API standard. That’s what Keras provides.

在开始写代码之前,回忆一下神经网络中的一些数学知识:

同时,对于权重和偏置,它们的 shape 由上下层来决定:

了解完这些理论预备知识(详细地解读理论不是本文的重点,这方面内容请参见相关文章和书籍),我们还需了解一些代码方面的预备知识:

那么,看看用 TF 来实现的模型吧(参考了 TensorFlow 2.0 + Keras Overview for Deep Learning Researchers 中的代码)。

先看看自定义全连接层。

class MyDense(Layer):

  def __init__(self, units=32):
      super(MyDense, self).__init__()
      self.units = units

  def build(self, input_shape):
      self.w = self.add_weight(shape=(input_shape[-1], self.units),
                               initializer='random_normal',
                               trainable=True)
      self.b = self.add_weight(shape=(self.units,),
                               initializer='random_normal',
                               trainable=True)

  def call(self, inputs):
      return tf.matmul(inputs, self.w) + self.b   

其中:

接下来,自定义模型,同样继承 Layer。

class MyModel(Layer):

    def __init__(self, layers):
        super(MyModel, self).__init__()
        self.layers = layers

    def call(self, inputs):
        x = tf.nn.relu(self.layers[0](inputs))
        for layer in self.layers[1:-1]:
            x = tf.nn.relu(layer(x))
        return self.layers[-1](x)
    
    def train(self, x_train, y_train, epochs = 5):
        loss = tf.keras.losses.MeanSquaredError()
        optimizer = tf.keras.optimizers.SGD()
        accuracy = tf.keras.metrics.MeanSquaredError()

        dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
        dataset = dataset.shuffle(buffer_size=1024).batch(64)

        for epoch in range(epochs):
            for step, (x, y) in enumerate(dataset):
                with tf.GradientTape() as tape:

                    # Forward pass.
                    y_pred = model(x)

                    # Loss value for this batch.
                    loss_value = loss(y, y_pred)

                    # Get gradients of loss wrt the weights.
                    gradients = tape.gradient(loss_value, model.trainable_weights)

                    # Update the weights of our linear layer.
                    optimizer.apply_gradients(zip(gradients, model.trainable_weights))
                    
                    # Update the running accuracy.
                    accuracy.update_state(y, y_pred)

            print('Epoch:', epoch, ', Loss from last epoch: %.3f' % loss_value, ', Total running accuracy so far: %.3f' % accuracy.result())

其中:

模型使用部分,跟原来的差别不大。

model = MyModel([
    MyDense(32),
    MyDense(8),
    MyDense(1)
])

# 训练,为了简化问题,这里没有考虑验证集。因此,在归一化时针对整个训练集进行。如下代码:
# X_train_full = scaler.fit_transform(X_train_full)
model.train(X_train_full, y_train_full, 15)

# 预测
model(X_test[:3])

至于效果呢,马马虎虎吧,各位可以将其与实际值(y_test[:3])做个对比。