CMU-深度学习系统-第五章&hw1

由于第五章只是介绍了一下needle这个库,我觉得其实直接做hw1就行了,所以不再单独写第五章。

实现前向传播

这个是最简单的,其实只需要一行左右的代码

PowerScalar

1
2
3
4
5
def compute(self, a: NDArray) -> NDArray:
### BEGIN YOUR SOLUTION
# raise NotImplementedError()
return a**self.scalar
### END YOUR SOLUTION

EwiseDive

1
2
3
4
5
def compute(self, a, b):
### BEGIN YOUR SOLUTION
# raise NotImplementedError()
return a/b
### END YOUR SOLUTION

DivScalar

1
2
3
4
5
def compute(self, a):
### BEGIN YOUR SOLUTION
# raise NotImplementedError()
return a/self.scalar
### END YOUR SOLUTION

MatMul

1
2
3
4
5
def compute(self, a, b):
### BEGIN YOUR SOLUTION
# raise NotImplementedError()
return a@b
### END YOUR SOLUTION

Summation

1
2
3
4
5
def compute(self, a):
### BEGIN YOUR SOLUTION
# raise NotImplementedError()
return array_api.sum(a,axis=self.axes)
### END YOUR SOLUTION

BroadcastTo

1
2
3
4
5
def compute(self, a):
### BEGIN YOUR SOLUTION
# raise NotImplementedError()
return array_api.broadcast_to(a,self.shape)
### END YOUR SOLUTION

Reshape

1
2
3
4
5
def compute(self, a):
### BEGIN YOUR SOLUTION
# raise NotImplementedError()
return array_api.reshape(a,self.shape)
### END YOUR SOLUTION

Negate

1
2
3
4
5
def compute(self, a):
### BEGIN YOUR SOLUTION
# raise NotImplementedError()
return -1 * a
### END YOUR SOLUTION

Transpose

我感觉只有这个稍微复杂一点,原本寻思直接调用array_api.transpose,然而这个函数并不是这样用的,相比之下swapaxes函数更合理

1
2
3
4
5
6
7
8
def compute(self, a):
### BEGIN YOUR SOLUTION
# raise NotImplementedError()
if self.axes is None:
return array_api.swapaxes(a, len(a.shape)-1,len(a.shape)-2)
else:
return array_api.swapaxes(a, self.axes[0],self.axes[1])
### END YOUR SOLUTION

Log

1
2
3
4
5
def compute(self, a):
### BEGIN YOUR SOLUTION
# raise NotImplementedError()
return array_api.log(a)
### END YOUR SOLUTION

Exp

1
2
3
4
5
def compute(self, a):
### BEGIN YOUR SOLUTION
# raise NotImplementedError()
return array_api.exp(a)
### END YOUR SOLUTION

EWisePow

1
2
3
4
5
def compute(self, a: NDArray, b: NDArray) -> NDArray:
### BEGIN YOUR SOLUTION
# raise NotImplementedError()
return array_api.power(a,b)
### END YOUR SOLUTION

实现反向传播

backwards的函数形参有out_grad, node,其中out_grad代表的是这个计算结果的伴随。比如,且最终输出是,那么out_grad就是;node代表的是这个运算节点本身。我们最终要输出的是这个运算的所有输入的部分伴随,根据,就是用这个out_grad乘上针对这个运算的微分,在根据维度进行调整(反正我是这样做的)

这里面有几个函数的反向传播过程还挺迷惑的,但是我觉得重点应该看懂test/hw1/test_autograd_hw.py里面的gradient_check函数,这个函数会教会如何用数学推导来验证反向传播过程的正确性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def gradient_check(f, *args, tol=1e-6, backward=False, **kwargs):
eps = 1e-4
numerical_grads = [np.zeros(a.shape) for a in args]
for i in range(len(args)):
for j in range(args[i].realize_cached_data().size):
args[i].realize_cached_data().flat[j] += eps
f1 = float(f(*args, **kwargs).numpy().sum())
args[i].realize_cached_data().flat[j] -= 2 * eps
f2 = float(f(*args, **kwargs).numpy().sum())
args[i].realize_cached_data().flat[j] += eps
numerical_grads[i].flat[j] = (f1 - f2) / (2 * eps)
if not backward:
out = f(*args, **kwargs)
computed_grads = [
x.numpy()
for x in out.op.gradient_as_tuple(ndl.Tensor(np.ones(out.shape)), out)
]
else:
out = f(*args, **kwargs).sum()
out.backward()
computed_grads = [a.grad.numpy() for a in args]
error = sum(
np.linalg.norm(computed_grads[i] - numerical_grads[i]) for i in range(len(args))
)
assert error < tol
return computed_grads

以上函数的思路其实就是在各个输入上加上一个微小变量eps,然后求出变化后变量对应的函数值,再把这个函数值(矩阵)直接求和,得到一个常数;再使用这个输入减去微小变量eps,求出变化后变量对应的函数值,同样进行求和得到常数。将两个函数值相减,除以两倍的eps,这时候得到的就是针对这一个输入值(也就是矩阵里面单个元素)的数值的梯度,这里把多个元素凑一块,得到的是输入矩阵一样的大小的矩阵,这个矩阵也就是op的输出针对这个输入的梯度矩阵。在这里假设这个op计算的结果就是最终的输出结果,也即这个矩阵就是这个输入的伴随。

与此同时,因为这个函数是为了测试计算梯度是否与数值梯度相同,所以它将out_grad视为全1:for x in out.op.gradient_as_tuple(ndl.Tensor(np.ones(out.shape)), out),因为如果op的计算结果就是最终解的话,那么这个out_grad就是全1,和上面保持一致,然后交给自己写的反向传播过程进行验证,得到的结果和数学解进行进行assert。

反向传播里面好几个比较抽象的,都是按这个方式来分析。函数里面具体说。

PowerScalar

1
2
3
4
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
return out_grad * self.scalar * node.inputs[0]**(self.scalar-1)
### END YOUR SOLUTION

EWiseDiv

1
2
3
4
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
return out_grad/node.inputs[1], out_grad*node.inputs[0]*(-node.inputs[1]**(-2))
### END YOUR SOLUTION

DivScalar

1
2
3
4
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
return out_grad / self.scalar
### END YOUR SOLUTION

MatMul

如果只有二维矩阵,其实不需要加if,但是存在高维矩阵的情况,打个比方,形状为[3,2,5][5,1]的矩阵相乘,得到的结果应该是[3,2,1]。但是在反向传播时,left_adjoint是正常的,而right_adjoint中right_adjoint = transpose(node.inputs[0])@out_grad,也就是[3,5,2]@[3,2,1],得到的结果是[3,5,1],但是期待得到的结果是[5,1]。如果按照之前说的凑维度,很容易想到是对第一个维度求和。如果再高维,如形状为[4,3,2,5][5,1]的矩阵相乘,那么right_adjoint得到的结果就是[4,3,5,2]@[3,2,1][4,3,5,1],就是对前两个维度进行求和。因此写出以下代码,验证后发现正确。

1
2
3
4
5
6
7
8
9
10
11
12
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
left_adjoint = out_grad@transpose(node.inputs[1])
if len(left_adjoint.shape) > len(node.inputs[0].shape):
axes_to_sum = tuple(range(len(left_adjoint.shape) - len(node.inputs[0].shape)))
left_adjoint = summation(left_adjoint,axes=axes_to_sum)
right_adjoint = transpose(node.inputs[0])@out_grad
if len(right_adjoint.shape) > len(node.inputs[1].shape):
axes_to_sum = tuple(range(len(right_adjoint.shape) - len(node.inputs[1].shape)))
right_adjoint = summation(right_adjoint,axes=axes_to_sum)
return left_adjoint, right_adjoint
### END YOUR SOLUTION

Summation

这个和之后的广播之类的操作一样,刚开始没有一点思路,不知道为什么求和操作还能有梯度。还是阅读gradient_check函数后得知怎么做。其实本质来说,这个运算的梯度也即输入的每一个元素对输出有多大影响。用常数举一个例子,例如,那么每一个输入x对于y就是2倍的影响。同理,如果是,对于每一个x,可以用来获得当前这个输入对于输出有多大影响。当然,如果取极小值是可以通过这个式子获得准确的梯度计算公式。但是在写这个代码的时候我是直接带了具体数值进去,看一下不同输入是如何影响输出的,我举一个例子:

假如summation操作中,为了简化取axis=none,如果输入的矩阵为,前向计算得到的结果是10。那么如果第一个元素1增加一个微小的,也就是得到的结果是,不用赘述,梯度的数学表达式可以表示为即等于1,多测试几个数发现每一个元素对于最终的输出的贡献都为1,也就是说梯度为,由于out_grad此时是一个常数,直接乘上去就行,相当于广播。

对于axis != None的情况,我们需要理解前向传播中求和操作如何改变张量维度:当沿着指定axis进行求和时,输出张量会消除这些维度(例如输入形状为[2,3]且axis=1时,输出会坍缩为[2])。这导致反向传播时,上层传递来的out_grad会缺失这些被消除的维度。

例如输入为[[1,2],[3,4]]沿axis=1求和得到[3,7],此时out_grad的shape是(2,)。但原始输入的每个元素(如位置[0,1]的2)对输出的贡献是沿axis=1方向的——当梯度从结果[3,7]传回时,每个元素的梯度需要分配到对应行的所有位置。

具体来说,反向传播的数学本质是:若输入元素x_i参与了n个输出元素的计算,则x_i的梯度是这些n个输出梯度之和。但在求和操作的特殊情况下,每个输入元素恰好只贡献给一个输出元素(即对应坍缩后的位置),因此梯度直接复制到所有参与求和的维度。

假设前向传播的求和操作如下:

输入矩阵x = [[1, 2], [3, 4]] 操作:沿 axis=1 求和(即对每行求和) 输出sum = [3, 7] 反向传播时:假设输出的梯度(out_grad)为 [0.5, 1.5],求输入 x 的梯度。


反向传播的关键分析

普通操作的情况(对比)

假设某个操作中,一个输入元素会影响多个输出元素。例如矩阵乘法:

  • 输入 x = [x1, x2]
  • 权重矩阵 W = [[w11, w12], [w21, w22]]
  • 输出 y = [w11*x1 + w21*x2, w12*x1 + w22*x2]

此时 x1 同时影响 y1y2,反向传播时 x1 的梯度是 ∂y1/∂x1 * out_grad1 + ∂y2/∂x1 * out_grad2(即多个输出梯度的累加)。


求和操作的特殊性

在求和操作中,每个输入元素只影响一个输出元素。例如:

  • 输入元素 x[0,0] = 1 只参与输出 sum[0] = 3 的计算
  • 输入元素 x[0,1] = 2 也只参与输出 sum[0] = 3 的计算
  • 输入元素 x[1,0] = 3x[1,1] = 4 只参与输出 sum[1] = 7 的计算

此时反向传播的规则是: 输入元素的梯度 = 对应的输出元素的梯度

  • x[0,0] 的梯度 = out_grad[0]
  • x[0,1] 的梯度 = out_grad[0]
  • x[1,0] 的梯度 = out_grad[1]
  • x[1,1] 的梯度 = out_grad[1]

最终梯度矩阵: x_grad = [[0.5, 0.5], [1.5, 1.5]]

为了实现这一点,梯度计算需要两步操作:

  1. 维度重建:通过reshape在out_grad被消除的轴上插入尺寸1(例如将(2,)转为(2,1)),这恢复了被求和操作消除的维度信息;
  2. 广播填充:通过乘以全1矩阵,将out_grad沿原始求和的轴进行广播(例如(2,1)广播到(2,2)),使得每个原始输入位置都能获得对应的梯度分量。

这种维度操作的本质是:将输出梯度在坍缩的轴上进行”逆向坍缩”,把标量梯度值平摊到该轴所有位置,最终通过广播恢复与原输入一致的维度结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def gradient(self, out_grad, node):
# 获取输入张量的形状
a_shape = node.inputs[0].shape

# 如果没有指定 axis(axes=None),说明前向求和是对所有元素求和,
# 此时 out_grad 为标量,我们需要将其广播到整个输入张量的形状。
if self.axes is None:
# 利用全1的张量乘以 out_grad 来实现广播
return out_grad * array_api.ones(a_shape)

# 如果指定了 axis,那么 axes 可能是整数或者元组,
# 为了统一处理,将其转换为元组形式。
axes = self.axes if isinstance(self.axes, tuple) else (self.axes,)

# 构造 out_grad 应该重新排列的形状:
# 对于输入张量中的每个维度,如果该维度在 axes 中,
# 则在输出梯度中该位置需要插入一个单例维度(即尺寸为 1),
# 否则取出 out_grad 原有的对应尺寸。
new_shape = []
out_grad_shape_iter = iter(out_grad.shape)
for i in range(len(a_shape)):
if i in axes:
new_shape.append(1)
else:
new_shape.append(next(out_grad_shape_iter))

# 将 out_grad reshape 到 new_shape 后利用广播乘以全1张量扩展到原始形状
return out_grad.reshape(new_shape) * array_api.ones(a_shape)

BroadcastTo

做的时候感觉也挺费脑的,但是可以参考上面的求和操作,不再赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def gradient(self, out_grad, node):
original_shape = node.inputs[0].shape
broadcasted_shape = self.shape

# 找到所有需要求和的轴(被扩展的轴)
axes = []
original_ndim = len(original_shape)
for i in range(len(broadcasted_shape)):
if i >= original_ndim or original_shape[i] != broadcasted_shape[i]:
axes.append(i)

# 沿这些轴求和,恢复原始形状
grad = summation(out_grad, axes=tuple(axes))
return grad.reshape(original_shape)

Reshape

1
2
3
4
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
return reshape(out_grad, node.inputs[0].shape)
### END YOUR SOLUTION

Negate

1
2
3
4
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
return -1 * out_grad
### END YOUR SOLUTION

Transpose

1
2
3
4
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
return transpose(out_grad, self.axes)
### END YOUR SOLUTION

Log

1
2
3
4
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
return out_grad * power_scalar(node.inputs[0], -1)
### END YOUR SOLUTION

Exp

1
2
3
4
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
return out_grad * exp(node.inputs[0])
### END YOUR SOLUTION

EWisePow

1
2
3
4
5
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
return out_grad * (node.inputs[1]) * power(node.inputs[0], node.inputs[1]-1), \
out_grad * log(node.inputs[0]) * power(node.inputs[0], node.inputs[1])
### END YOUR SOLUTION

拓扑排序

这个没啥说的,就是个递归,挺简单的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def find_topo_sort(node_list: List[Value]) -> List[Value]:
"""Given a list of nodes, return a topological sort list of nodes ending in them.

A simple algorithm is to do a post-order DFS traversal on the given nodes,
going backwards based on input edges. Since a node is added to the ordering
after all its predecessors are traversed due to post-order DFS, we get a topological
sort.
"""
### BEGIN YOUR SOLUTION
# raise NotImplementedError()
# 输入是汇点

topo_order = []
topo_sort_dfs(node_list[0],False,topo_order)
return topo_order
### END YOUR SOLUTION


def topo_sort_dfs(node, visited, topo_order):
"""Post-order DFS"""
### BEGIN YOUR SOLUTION
# raise NotImplementedError()
if visited or node is None:
return
# 可以不要,但是要在下面的input[0]加上判断,但是用这句话可以省事
if node.is_leaf():
topo_order.append(node)
return
topo_sort_dfs(node.inputs[0],node.inputs[0] in topo_order,topo_order)
# 很重要!
if len(node.inputs)>1:
topo_sort_dfs(node.inputs[1],node.inputs[1] in topo_order,topo_order)
topo_order.append(node)

反向模式微分实现

这个完全按照图上的伪代码写就行了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def compute_gradient_of_variables(output_tensor, out_grad):
"""Take gradient of output node with respect to each node in node_list.

Store the computed result in the grad field of each Variable.
"""
# a map from node to a list of gradient contributions from each output node
node_to_output_grads_list: Dict[Tensor, List[Tensor]] = {}
# Special note on initializing gradient of
# We are really taking a derivative of the scalar reduce_sum(output_node)
# instead of the vector output_node. But this is the common case for loss function.
node_to_output_grads_list[output_tensor] = [out_grad]

# Traverse graph in reverse topological order given the output_node that we are taking gradient wrt.
reverse_topo_order = list(reversed(find_topo_sort([output_tensor])))

### BEGIN YOUR SOLUTION
# raise NotImplementedError()
for node in reverse_topo_order:
node.grad = sum_node_list(node_to_output_grads_list[node])
if node.is_leaf():
continue
grads = node.op.gradient(node.grad,node)
if isinstance(grads,Tuple):
for grad, index in zip(grads, range(len(grads))):
if node.inputs[index] in node_to_output_grads_list:
node_to_output_grads_list[node.inputs[index]].append(grad)
else:
node_to_output_grads_list[node.inputs[index]] = [grad]
else:
if node.inputs[0] in node_to_output_grads_list:
node_to_output_grads_list[node.inputs[0]].append(grads)
else:
node_to_output_grads_list[node.inputs[0]] = [grads]
### END YOUR SOLUTION

Softmax Loss

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def softmax_loss(Z, y_one_hot):
"""Return softmax loss. Note that for the purposes of this assignment,
you don't need to worry about "nicely" scaling the numerical properties
of the log-sum-exp computation, but can just compute this directly.

Args:
Z (ndl.Tensor[np.float32]): 2D Tensor of shape
(batch_size, num_classes), containing the logit predictions for
each class.
y (ndl.Tensor[np.int8]): 2D Tensor of shape (batch_size, num_classes)
containing a 1 at the index of the true label of each example and
zeros elsewhere.

Returns:
Average softmax loss over the sample. (ndl.Tensor[np.float32])
"""
### BEGIN YOUR SOLUTION
# raise NotImplementedError()
Z_exp = ndl.exp(Z)
Z_exp_sum = ndl.summation(Z_exp, axes=1)
Z_exp_sum_log = ndl.log(Z_exp_sum)
Z_y = ndl.summation(Z*y_one_hot,axes = 1)
loss = ndl.summation(Z_exp_sum_log - Z_y)/Z.shape[0]
return loss
### END YOUR SOLUTION

用needle来重写SGD

这个函数我改了很久,主要是关于numpy和ndl.tensor之间经常发生一些混用,用的时候一定小心。另外不同维度的运算在needle里面要进行现实的广播,这一点也很容易出错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def nn_epoch(X, y, W1, W2, lr=0.1, batch=100):
"""Run a single epoch of SGD for a two-layer neural network defined by the
weights W1 and W2 (with no bias terms):
logits = ReLU(X * W1) * W1
The function should use the step size lr, and the specified batch size (and
again, without randomizing the order of X).

Args:
X (np.ndarray[np.float32]): 2D input array of size
(num_examples x input_dim).
y (np.ndarray[np.uint8]): 1D class label array of size (num_examples,)
W1 (ndl.Tensor[np.float32]): 2D array of first layer weights, of shape
(input_dim, hidden_dim)
W2 (ndl.Tensor[np.float32]): 2D array of second layer weights, of shape
(hidden_dim, num_classes)
lr (float): step size (learning rate) for SGD
batch (int): size of SGD mini-batch

Returns:
Tuple: (W1, W2)
W1: ndl.Tensor[np.float32]
W2: ndl.Tensor[np.float32]
"""

### BEGIN YOUR SOLUTION
# raise NotImplementedError()
num_classes = W2.shape[1]
for i in range(0, X.shape[0], batch):
end = min(i + batch, X.shape[0])
X_batch = X[i:end, :]
y_batch = y[i:end]

# 创建one-hot标签
I_y = np.zeros((len(y_batch), num_classes), dtype=np.float32)
I_y[np.arange(len(y_batch)), y_batch] = 1.0
I_y = ndl.Tensor(I_y) # 转换为Tensor

# 前向计算
X_tensor = ndl.Tensor(X_batch)
Z_1 = ndl.relu(ndl.matmul(X_tensor, W1))
logits = ndl.matmul(Z_1, W2)

# 计算softmax梯度
exp_logits = ndl.exp(logits)
sum_exp = ndl.summation(exp_logits, axes=1)
softmax = exp_logits / ndl.broadcast_to(ndl.reshape(sum_exp,(exp_logits.shape[0],1)),exp_logits.shape)
G_2 = softmax - I_y

# 计算隐藏层梯度
relu_mask = (Z_1.numpy() > 0).astype(np.float32)
G_1 = ndl.Tensor(relu_mask) * ndl.matmul(G_2, W2.transpose())

delta_W1 = ndl.matmul(ndl.transpose(X_tensor), G_1) / (end - i)
delta_W2 = ndl.matmul(ndl.transpose(Z_1), G_2) / (end - i)

# 更新权重(确保结果为Tensor)
W1 = ndl.Tensor(W1.numpy() - (lr * delta_W1).numpy())
W2 = ndl.Tensor(W2.numpy() - (lr * delta_W2).numpy())

return W1, W2
### END YOUR SOLUTION