手撕 numpy写线性回归的随机梯度下降(stochastic gradient descent,SGD)


目录:

    # 数据加载
    from sklearn.datasets import fetch_california_housing
    from sklearn.model_selection import train_test_split
    import numpy as np
    from tqdm import tqdm
    
    from matplotlib import pyplot as plt
    from sklearn.preprocessing import StandardScaler
    
    
    def plot_loss(loss_train: np.ndarray, loss_val: np.ndarray, validate_every: int):
        x = np.arange(0, loss_train.shape[0], 1)
        plt.plot(x, loss_train, label='train')
        x = np.arange(0, loss_train.shape[0], validate_every)
        plt.plot(x, loss_val, label='validate')
        plt.legend()
        plt.title('loss')
        plt.show()
    
    
    X, Y = fetch_california_housing(return_X_y=True)
    X.shape, Y.shape  # (20640, 8), (20640, )
    
    ss = StandardScaler()
    X = ss.fit_transform(X)
    
    validate_size = 0.2
    X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=validate_size, shuffle=True)
    
    
    # batch 函数
    def get_batch(batchsize: int, X: np.ndarray, Y: np.ndarray):
        assert 0 == X.shape[0] % batchsize, f'{X.shape[0]}%{batchsize} != 0'
        batchnum = X.shape[0] // batchsize
        X_new = X.reshape((batchnum, batchsize, X.shape[1]))
        Y_new = Y.reshape((batchnum, batchsize,))
    
        for i in range(batchnum):
            yield X_new[i, :, :], Y_new[i, :]
    
    
    # 程序运行
    W0 = np.random.random(size=(X.shape[1],))  # 初始权重
    B0 = np.random.normal(0, 1, 1)  # shape: (1,)
    
    # 模型训练
    lr = 0.001  # 学习率
    num_epochs = 1000  # 训练周期
    batch_size = 64  # |每个batch包含的样本数
    validate_every = 4  # 多少个周期进行一次检验
    
    loop = tqdm(range(num_epochs))
    loss_train = []
    loss_validate = []
    W = W0.copy()
    B = B0.copy()
    # 遍历epoch
    for epoch in loop:
        loss_train_epoch = 0
        # 遍历batch
        for x_true, y_true in get_batch(batch_size, X_train, Y_train):
            y_pred = W.dot(x_true.T) + B
            loss_batch = 1 / 2 * np.mean((y_true - y_pred) ** 2)
    
            W = W - lr * 1 / batch_size * (-1) * (y_true - y_pred).dot(x_true)
            B = B - lr * 1 / batch_size * (-1) * (y_true - y_pred).sum()
    
            loss_train_epoch = loss_train_epoch + (loss_batch * batch_size)
    
        loss_train_epoch = loss_train_epoch / X_train.shape[0]
        loss_train.append(loss_train_epoch)
        loop.set_description(f'Epoch: {epoch}, loss: {loss_train_epoch}')
    
        if 0 == epoch % validate_every:
            y_pred = W.dot(X_test.T) + B
            loss_validate_epoch = 1 / 2 * np.mean((Y_test - y_pred) ** 2)
            loss_validate.append(loss_validate_epoch)
            print('============Validate=============')
            print(f'Epoch: {epoch}, train loss: {loss_train_epoch}, val loss: {loss_validate_epoch}')
            print('================================')
    
    plot_loss(np.array(loss_train), np.array(loss_validate), validate_every)