我使用的是TF2.11。
为了更深入地了解 PINN,我想计算 PINN 参数损失的 Hessian 矩阵。我的玩具箱是一个 2D 泊松方程 $\Delta u = f$
我有三个版本,但只有两个适用于小模型。当我有更大的模型时,代码会崩溃。
注释代码如下:
# # [2D Poisson](2d-poisson.ipynb)
#
# This notebook will show new users how to:
# - use Rectangle
# - use Fully connected Neural Network (FNN)
# - define a boundary condition
# - use a custom implemented PDE
# - use PINNacle's default training loop by using the Trainer object
#
# ## Problem setup
#
# $\forall (x,y)\in [-1,1]^2$:
# $$\Delta u = \partial_{xx}u + \partial_{yy}u = f$$
#
# with $f(x,y) = -\frac{\pi²}{2}sin(\pi \frac{x+1}{2})sin(\pi \frac{y+1}{2})$.
#
# The exact solution is $u(x,y) = sin(\pi \frac{x+1}{2})sin(\pi \frac{y+1}{2})$.
# In[1]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
# tensorflow imports
import tensorflow as tf
from tensorflow import keras
from copy import deepcopy as dcp
import numpy as np
import math as m
import matplotlib.pyplot as plt
from tensorflow.python.ops.parallel_for.gradients import jacobian
pi = np.pi
#--------------------------- seed init ----------------------------------
# for reproductibility
seed = 1234
keras.utils.set_random_seed(seed)
# ## Geometry object definition
# The space domain is a rectangle and will be sampled using the Rectangle class. The arguments of Rectangle.sample are args_interior and args_boundary. p1 and p2 are respectively the lower/upper boundary of the rectangle.
# In[2]:
Xpde = tf.random.uniform(shape=[1000,2])
Xpde = 2*Xpde-1
Xdirichlet = tf.random.uniform(shape=[1000,1])
Xdirichlet = tf.concat([Xdirichlet, tf.ones_like(Xdirichlet)], axis=1)
# ## Model definition
# In[3]:
# works if 50 is replaced by 10
model = keras.Sequential(
[
keras.layers.Dense(50, activation="tanh", name="layer1"),
keras.layers.Dense(50, activation="tanh", name="layer2"),
keras.layers.Dense(50, activation="tanh", name="layer4"),
keras.layers.Dense(1, name="layer3"),
]
)
model(keras.Input(shape=(2,)))
model.summary()
# ## Custom PDE definition
# In[4]:
@tf.function
def f(X):
"""RHS aka source term"""
x , y = X[:,0:1] , X[:,1:2]
return -pi**2/2*tf.sin(pi*(x+1)/2)*tf.sin(pi*(y+1)/2)
# The custom PDE will be defined as a class having a residuals function.
# In[5]:
class MyPDE:
@tf.function
def residuals(self, X ,model):
"""returns the residuals of the PDE at each data point"""
x , y = X[:,0:1] , X[:,1:2]
# making predictions
u = model(tf.concat([x,y],axis=1))
# first derivatives
u_x = tf.gradients(u,x)[0]
u_y = tf.gradients(u,y)[0]
# second derivatives
u_xx = tf.gradients(u_x,x)[0]
u_yy = tf.gradients(u_y,y)[0]
# values of source term
fvals = f(X)
return u_xx + u_yy - fvals
pde = MyPDE()
# ## Boundary condition
# Let's define the Dirichlet condition:
# In[6]:
def on_boundary(X):
"""returns True if the point is on the boundary"""
l1 = np.isclose(np.abs(X[:,0]), 1.0)
l2 = np.isclose(np.abs(X[:,1]), 1.0)
return np.logical_or(l1,l2)
def func(X):
"""returns the value of the solution on the boundary"""
return tf.zeros([X.shape[0],1])
@tf.function
def dirichlet_residuals(X, model):
return func(X) - model(X)
# The function *on_boundary* will be used later to have access to points on the boundary so that they can be separated from the points inside the domain.
# In[8]:
mse = keras.losses.MeanSquaredError()
@tf.function
def get_losses(model, Xpde, pde, Xdirichlet, dirichlet_residuals):
"""compute loss of PDE + Dirichlet """
loss_pde = mse(0, pde.residuals(Xpde, model))
loss_dirichlet = mse(0, dirichlet_residuals(Xdirichlet, model))
return loss_pde, loss_dirichlet
def get_hessian(model, Xpde, pde, Xdirichlet, dirichlet_residuals):
"""First version working with tape, tape2 + gradient
Note: pretty slow and doesnt work for big neural networks
upgrades?:
- use tf.function (graph mode)
- optimize memory
- using tape.jacobian
Error: crashes the notebook without any error
"""
with tf.GradientTape(persistent=True) as tape:
with tf.GradientTape() as tape2:
losses = get_losses(model, Xpde, pde, Xdirichlet, dirichlet_residuals)
loss_total = tf.reduce_sum(losses)
grads = tape2.gradient(loss_total, model.trainable_weights, unconnected_gradients=tf.UnconnectedGradients.ZERO)
flat_grads = tf.concat([tf.reshape(g,(-1,1)) for g in grads], axis=0)
hessian = []
for g in flat_grads:
hessian.append(
tf.concat([tf.reshape(gg, (1,-1)) for gg in tape.gradient(g, model.trainable_weights)], axis=1)
)
hessian = tf.concat(hessian, axis=0)
return hessian
res = get_hessian(model, Xpde, pde, Xdirichlet, dirichlet_residuals)
print(res.shape)
@tf.function
def get_hessian(model, Xpde, pde, Xdirichlet, dirichlet_residuals):
"""Second version working tf.gradients -> jacobian (from tensorflow.python.ops.parallel_for.gradients)
Note: pretty slow and doesnt work for big neural networks
upgrades?/Error: same as last version
"""
loss = tf.reduce_sum(get_losses(model, Xpde, pde, Xdirichlet, dirichlet_residuals))
grads = tf.gradients(loss, model.trainable_weights, unconnected_gradients=tf.UnconnectedGradients.ZERO)
grads = tf.concat([tf.reshape(gg, (-1,1)) for gg in grads], axis=0)
hessian = jacobian(grads, model.trainable_weights)
hessian = tf.concat([tf.reshape(h_, (tf.shape(grads)[0], -1)) for h_ in hessian], axis=1)
return hessian
res = get_hessian(model, Xpde, pde, Xdirichlet, dirichlet_residuals)
print(res.shape)
@tf.function
def get_hessian(model, Xpde, pde, Xdirichlet, dirichlet_residuals):
"""Second version working tf.gradients -> jacobian (from tensorflow.python.ops.parallel_for.gradients)
Note: pretty slow and doesnt work for big neural networks
upgrades?/Error: same as last version
"""
loss = tf.reduce_sum(get_losses(model, Xpde, pde, Xdirichlet, dirichlet_residuals))
grads = tf.gradients(loss, model.trainable_weights, unconnected_gradients=tf.UnconnectedGradients.ZERO)
grads = tf.concat([tf.reshape(gg, (-1,1)) for gg in grads], axis=0)
hessian = jacobian(grads, model.trainable_weights)
hessian = tf.concat([tf.reshape(h_, (tf.shape(grads)[0], -1)) for h_ in hessian], axis=1)
return hessian
res = get_hessian(model, Xpde, pde, Xdirichlet, dirichlet_residuals)
print(res.shape)
如果实际的 Hessian 矩阵太昂贵,也许使用具有较低导数的 Hessian 矩阵的近似值是一种选择。
我确信对于小模型,这已经在 TF、PyTorch、JAX 中实现...但是有更大模型的示例吗?
您可以尝试张量流版本 2.14 中的张量流 tf.hessian() 中的内置函数
https://www.tensorflow.org/api_docs/python/tf/hessians
另一种方法在下面列出的论文中,它说它比使用 pyhessian 张量流库的 tf.hessian() 更快