SVM:SMO算法的问题

2024-04-28 17:33:06 发布

您现在位置:Python中文网/ 问答频道 /正文

我目前正在尝试使用MNIST数据库编写一个用于手写数字识别的非线性支持向量机。在

我选择使用SMO算法(基于Platt的论文和其他书籍),但是我在实现它时遇到了一些困难。在

当我在训练集上运行代码时,偏差越来越大,有时直到“Inf”值,导致支持向量机“分类”同一个类中的每个示例。在

这是我的代码:

import numpy
import gzip
import struct
import matplotlib
from sklearn import datasets
from copy import copy

class SVM:

    def __init__(self, constant, data_set, label_set):
        self._N = len(data_set)
        if self._N != len(label_set):
            raise Exception("Data size and label size don't match.")


        self._C = constant
        self._epsilon = 0.001
        self._tol = 0.001

        self._data = [numpy.ndarray.flatten((1/255)*elt) for elt in data_set]
        self._dimension = len(self._data[0])
        self._label = label_set
        self._alphas = numpy.zeros((1, self._N))
        self._b = 0
        self._errors = numpy.ndarray((2, 0))







    def kernel(self, x1, x2):
        x1 = x1.reshape(1,self._dimension)
        result = numpy.power(numpy.dot(x1, x2), 3)

        return result






    def evaluate(self, x):
        result = 0
        i = 0
        while i < self._N:
            result +=  self._alphas[0, i]*self._label[i]*self.kernel(x, self._data[i])
            i += 1

        result += self._b       
        return result






    def update(self, i1, i2, E2):
        i1 = int(i1)
        i2 = int(i2)
        if i1 == i2:
            return 0

        y1 = self._label[i1]
        y2 = self._label[i2]
        alpha1 = self._alphas[0, i1]
        alpha2 = self._alphas[0, i2]

        #If alpha1 is non-bound, its error is in the cache.
        #So we check its position to extract its error.
        #Else, we compute it.
        if alpha1 > 0 and alpha1 < self._C :
            position = 0
            for i, elt in enumerate(self._errors[0, :]):
                if elt == i1:
                    position = i

            E1 = self._errors[1, position]
        else:
            E1 = self.evaluate(self._data[i1]) - y1


        s = y1*y2
        H = L = 0

        if y1 != y2:
            L = max(0, alpha2 - alpha1)
            H = min(self._C, self._C + alpha2 - alpha1)
        else:
            L = max(0, alpha2 + alpha1 - self._C)
            H = min(self._C, alpha2 + alpha1)


        if H == L:
            return 0

        K11 = self.kernel(self._data[i1], self._data[i1])
        K12 = self.kernel(self._data[i1], self._data[i2])
        K22 = self.kernel(self._data[i2], self._data[i2])

        eta = K11 + K22 - 2*K12
        if eta > 0:
            alpha2_new = alpha2 + (y2*(E1 - E2)/eta)
            if alpha2_new < L:
                alpha2_new = L
            elif alpha2_new > H:
                alpha2_new = H

        else:
            f1 = y1*(E1 + self._b) - alpha1*K11 - s*alpha2*K12
            f2 = y2*(E2 + self._b) - alpha2*K22 - s*alpha1*K12

            L1 = alpha1 + s*(alpha2 - L)
            H1 = alpha1 + s*(alpha2 - H)

            FuncL = L1*f1 + L*f2 + (1/2)*numpy.square(L1)*K11 + (1/2)*numpy.square(L)*K22 + s*L1*L*K12
            FuncH = H1*f1 + H*f2 + (1/2)*numpy.square(H1)*K11 + (1/2)*numpy.square(H)*K22 + s*H1*H*K12

            if FuncL < FuncH - self._epsilon:
                alpha2_new = L
            elif FuncL > FuncH + self._epsilon:
                alpha2_new = H
            else:
                alpha2_new = alpha2



        if numpy.abs(alpha2_new - alpha2) < self._epsilon*(alpha2_new+alpha2+ self._epsilon):
            return 0

        alpha1_new = alpha1 + s*(alpha2 - alpha2_new)

        #Update of the threshold.
        b1 = E1 + y1*(alpha1_new - alpha1)*K11 + y2*(alpha2_new - alpha2)*K12 + self._b
        b2 = E2 + y1*(alpha1_new - alpha1)*K12 + y2*(alpha2_new - alpha2)*K22 + self._b

        if L < alpha1_new < H:
            b_new = b1
        elif L < alpha2_new < H:
            b_new = b2
        else:
            b_new = (b1+b2)/2


#Update the cache error

        #If alpha2 was bound and its new value is non-bound, we add its index and its error to the cache.
        #If alpha2 was unbound and its new value is bound, we delete it from the cache.
        if (alpha2 == 0 or alpha2 == self._C) and (alpha2_new > 0 and alpha2_new < self._C):
            vector_alpha2_new = numpy.array([i2, E2])
            vector_alpha2_new = vector_alpha2_new.reshape((2, 1))
            self._errors = numpy.concatenate((self._errors, vector_alpha2_new), 1)


        if (alpha2 > 0 and alpha2 < self._C) and (alpha2_new == 0 or alpha2_new == self._C):
            l = 0
            position = 0
            while l < len(self._errors[0, :]):
                if self._errors[0, l] == i2:
                    position = l
                l += 1

            self._errors = numpy.delete(self._errors, position, 1)


        #We do the exact same thing with alpha1.
        if (alpha1 == 0 or alpha1 == self._C) and (alpha1_new > 0 and alpha1_new < self._C):
            vector_alpha1_new = numpy.array([i1, E1])
            vector_alpha1_new = vector_alpha1_new.reshape((2, 1))
            self._errors = numpy.concatenate((self._errors, vector_alpha1_new), 1)


        if (alpha1 > 0 and alpha1 < self._C) and (alpha1_new == 0 or alpha1_new == self._C):
            l = 0
            position = 0
            while l < len(self._errors[0, :]):
                if self._errors[0, l] == i1:
                    position = l
                l += 1

            self._errors = numpy.delete(self._errors, position, 1)      



        #Then we update the error for each non bound point using the new values for alpha1 and alpha2.
        for i,error in enumerate(self._errors[1, :]):
            self._errors[1, i] = error + (alpha2_new - alpha2)*y2*self.kernel(self._data[i2], self._data[int(self._errors[0, i])]) + (alpha1_new - alpha1)*y1*self.kernel(self._data[i1], self._data[int(self._errors[0, i])]) - self._b + b_new


        #Storing the new values of alpha1 and alpha2:

        self._alphas[0, i1] = alpha1_new
        self._alphas[0, i2] = alpha2_new
        self._b = b_new

        print(self._errors)
        return 1




    def examineExample(self, i2):
        i2 = int(i2)
        y2 = self._label[i2]
        alpha2 = self._alphas[0, i2]

        if alpha2 > 0 and alpha2 < self._C:
            position = 0
            for i, elt in enumerate(self._errors[0, :]):
                if elt == i2:
                    position = i

            E2 = self._errors[1, position]
        else:
            E2 = self.evaluate(self._data[i2]) - y2

        r2 = E2*y2

        if (r2< -self._tol and alpha2 < self._C) or (r2 > self._tol and alpha2 > 0):

            n = numpy.shape(self._errors)[1]            
            if n > 1:   
                i1 = 0

                if E2 > 0:
                    min = self._errors[1, 0]
                    position = 0
                    for l, elt in enumerate(self._errors[1, :]):
                        if elt < min:
                            min = elt
                            position = l

                    i1 = self._errors[0, position]

                else:
                    max = self._errors[1, 0]
                    position = 0
                    for l, elt in enumerate(self._errors[1, :]):
                        if elt > max:
                            max = elt
                            position = l

                    i1 = self._errors[0, position]

                if self.update(i1, i2, E2):
                    return 1



            #loop over all non bound examples starting at a random point.
            list_index = [i for i in range(n)]
            numpy.random.shuffle(list_index)

            for i in list_index:
                i1 = self._errors[0, i]
                if self.update(i1, i2, E2):
                    return 1


            #Loop over all the training examples, starting at a random point.
            list_bound = [i for i in range(self._N) if not numpy.any(self._errors[0, :] == i)]
            numpy.random.shuffle(list_bound)

            for i in list_bound:
                i1 = i
                if self.update(i1, i2, E2):
                    return 1


        return 0




    def SMO(self):
        numChanged = 0
        examineAll = 1
        cpt = 1
        while(numChanged > 0 or examineAll):
            numChanged = 0

            if examineAll == 1:
                for i in range(self._N):
                    numChanged += self.examineExample(i)

            else:
                for i in self._errors[0, :]:
                    numChanged += self.examineExample(i)

            if examineAll == 1:
                examineAll = 0
            elif numChanged == 0:
                examineAll = 1

            cpt += 1    

















def load_training_data(a, b):
    train = gzip.open("train-images-idx3-ubyte.gz", "rb")
    labels = gzip.open("train-labels-idx1-ubyte.gz", "rb")

    train.read(4)
    labels.read(4)

    number_images = train.read(4)
    number_images = struct.unpack(">I", number_images)[0]

    rows = train.read(4)
    rows = struct.unpack(">I", rows)[0]

    cols = train.read(4)
    cols = struct.unpack(">I", cols)[0]

    number_labels = labels.read(4)
    number_labels = struct.unpack(">I", number_labels)[0]

    image_list = []
    label_list = []
    if number_images != number_labels:
        raise Exception("The number of labels doesn't match with the number of images")
    else:
        for l in range(number_labels):
            if l % 1000 == 0:
                print("l:{}".format(l))

            mat = numpy.zeros((rows, cols), dtype = numpy.uint8)
            for i in range(rows):
                for j in range(cols):
                    pixel = train.read(1)
                    pixel = struct.unpack(">B", pixel)[0]
                    mat[i][j] = pixel


            image_list += [mat]
            lab = labels.read(1)
            lab = struct.unpack(">B", lab)[0]
            label_list += [lab]


    train.close()
    labels.close()


    i = 0
    index_a = []
    index_b = []
    while i < number_labels:
        if label_list[i] == a:
            index_a += [i]
        elif label_list[i] == b:
            index_b += [i]

        i += 1

    image_list = [m for i,m in enumerate(image_list) if (i in index_a) | (i in index_b)]
    mean = (a+b)/2
    label_list = [ numpy.sign(m - mean) for l,m in enumerate(label_list) if l in index_a+index_b]

    return ([image_list, label_list])





def load_test_data():
    test = gzip.open("t10k-images-idx3-ubyte.gz", "rb")
    labels = gzip.open("t10k-labels-idx1-ubyte.gz", "rb")

    test.read(4)
    labels.read(4)

    number_images = test.read(4)
    number_images = struct.unpack(">I", number_images)[0]

    rows = test.read(4)
    rows = struct.unpack(">I", rows)[0]

    cols = test.read(4)
    cols = struct.unpack(">I", cols)[0]

    number_labels = labels.read(4)
    number_labels = struct.unpack(">I", number_labels)[0]

    image_list = []
    label_list = []
    if number_images != number_labels:
        raise Exception("The number of labels doesn't match with the number of images")
    else:
        for l in range(number_labels):
            if l % 1000 == 0:
                print("l:{}".format(l))

            mat = numpy.zeros((rows, cols), dtype = numpy.uint8)
            for i in range(rows):
                for j in range(cols):
                    pixel = test.read(1)
                    pixel = struct.unpack(">B", pixel)[0]
                    mat[i][j] = pixel


            image_list += [mat]
            lab = labels.read(1)
            lab = struct.unpack(">B", lab)[0]
            label_list += [lab]


    test.close()
    labels.close()

    return ([image_list, label_list])   

data = load_training_data(0, 7)
images_training = data[0]
labels_training = data[1]

svm = SVM(0.1, images_training[0:200], labels_training[0:200])

svm.SMO()




def view(image, label=""):
    print("Number : {}".format(label))
    pylab.imshow(image, cmap = pylab.cm.gray)
    pylab.show()

Tags: inselfnumpynumbernewfordatalabels
2条回答

首先,SMO是一个相当复杂的算法——在这种格式下调试并不容易。在

第二,你的测试起点太高了。一些帮助您调试问题的建议。在

1)首先,切换到使用线性核。用另一种算法计算精确的线性解,并与精确解进行比较。这样就只有权重向量和偏差项了。如果你停留在对偶空间,你必须比较所有的系数,并确保事情保持在同一顺序。在

2)从一个更简单的二维问题开始,你知道一般解决方案应该是什么样子。然后你可以可视化解决方案,并观察它在每一步的变化-这可以是一个可视化的工具,帮助你找出哪里出了问题。在

重要的是你说过:

    b1 = E1 + y1*(alpha1_new - alpha1)*K11 + y2*(alpha2_new - alpha2)*K12 + self._b
    b2 = E2 + y1*(alpha1_new - alpha1)*K12 + y2*(alpha2_new - alpha2)*K22 + self._b

基本上你每次用这个代码都是在b上加。你的b应该更像这样:

^{pr2}$

这个版本并不完美,但我建议您在Github上检查apex51的版本以查找指针: SVM-and-sequential-minimal-optimization

笔记中的数学基础非常强大(尽管与普拉特的论文有一些小的差异),代码并不完美,但对你来说是一个很好的方向。我还建议您看看其他已完成的smo,并尝试调整代码以计算您的需求,而不是从头开始编写。在

相关问题 更多 >