将Python转换为scalaasp

2024-04-24 16:11:05 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个Python代码,我想把它转换成Scala Spark,我的算法是LDA(潜在Dirichlet Allocation)的扩展,因为这个算法有一个采样过程,当数据非常大时非常耗时,我知道SparkMlib中有一个LDA的实现版本,但是我想单独编写我的算法再加上一些改进,我的代码是:

    def init_est(self):

        self.nd = [0 for x in xrange(self.dset.M)]
        self.ndl = [[0 for y in xrange(self.sentinum)] for x in xrange(self.dset.M)]
        self.ndlz = [[[0 for z in xrange(self.K)] for y in xrange(self.sentinum)] for x in xrange(self.dset.M)]
        self.nlzw = [[[0 for z in xrange(self.dset.V)] for y in xrange(self.K)] for x in xrange(self.sentinum)]
        self.nlz = [[0 for y in xrange(self.K)] for x in xrange(self.sentinum)]
        self.p = [[0.0 for y in xrange(self.K)] for x in xrange(self.sentinum)]

        self.Z = [ [] for x in xrange(self.dset.M)]
        self.L = [ [] for x in xrange(self.dset.M)]

        self.pi_dl = [[0 for y in xrange(self.sentinum)]for x in xrange(self.dset.M)]
        self.theta_dlz= [[[0 for z in xrange(self.K)]for y in xrange(self.sentinum)]for x in xrange(self.dset.M)]
        self.phi_lzw = [[[0 for z in xrange(self.dset.V)]for y in xrange(self.K)]for x in xrange(self.sentinum)]
        self.alpha_lz = [[0 for y in xrange(self.K)]for x in xrange(self.sentinum)]
        self.alphaSum_l = [0 for x in xrange(self.sentinum)]
        self.beta_lzw = [[[0 for z in xrange(self.dset.V)]for y in xrange(self.K)]for x in xrange(self.sentinum)]
        self.betaSum_lz = [[0 for y in xrange(self.K)]for x in xrange(self.sentinum)]
        self.lambda_lw = [[0 for y in xrange(self.dset.V)]for x in xrange(self.sentinum)]
        self.gama_dl = [[0 for y in xrange(self.sentinum)]for x in xrange(self.dset.M)]
        self.gamaSum_d = [0 for x in xrange(self.dset.M)]



        for x in xrange(self.dset.M):
            self.Z[x] = [0 for y in xrange(self.dset.docs[x].length)]
            self.L[x] = [0 for y in xrange(self.dset.docs[x].length)]

            for y in xrange(self.dset.docs[x].length):
                topic = random.randint(0, self.K-1)
                self.Z[x][y] = topic

                sentiment = random.randint(0,self.sentinum-1)
                for i in xrange(len(self.sdset.sentiLex)):
                   if (self.dset.id2word[self.dset.docs[x].words[y]]) == self.sdset.sentiLex[i][0]:
                      sentiment = self.sdset.sentiLex[i][1]

                #print sentiment
                self.L[x][y] = sentiment

                self.nd[x] +=1
                self.ndl[x][sentiment] +=1
                self.ndlz[x][sentiment][topic] +=1
                self.nlzw[sentiment][topic][self.dset.docs[x].words[y]] +=1
                self.nlz[sentiment][topic] +=1


    def estimate(self):
        print 'Sampling %d iterations!' % self.iter_num
        for x in xrange(self.iter_num):
            print 'Iteration %d ...' % (x+1)
            for i in xrange(len(self.dset.docs)):
                for j in xrange(self.dset.docs[i].length):
                    lst = self.sampling(i, j)
                    sentiment = lst[0]
                    topic = lst[1]

                    self.Z[i][j] = topic
                    self.L[i][j] = sentiment
            if (self.updateParaStep > 0 ) & (x % self.updateParaStep == 0):
                self.updateParameters()
            if (self.savestep > 0) & (x % self.savestep == 0):
                if (x == self.iter_num):
                    break
                print 'Saving the model at iteration...'
                self.compute_theta()
                self.compute_phi()
                self.compute_Pi()
                self.save_model()      

    def sampling(self, i, j):
        sentiment = self.L[i][j]
        topic = self.Z[i][j]
        wid = self.dset.docs[i].words[j]

        self.nd[i] -=1
        self.ndl[i][sentiment] -=1
        self.ndlz[i][sentiment][topic] -=1
        self.nlzw[sentiment][topic][wid] -=1
        self.nlz[sentiment][topic] -=1


        for l in xrange(self.sentinum):
            for k in xrange(self.K):
                self.p[l][k] = (self.nlzw[l][k][wid] + self.beta_lzw[l][k][wid])/(self.nlz[l][k] + self.betaSum_lz[l][k]) * \
                            (self.ndlz[i][l][k] + self.alpha_lz[l][k])/(self.ndl[i][l] + self.alphaSum_l[l]) * \
                            (self.ndl[i][l] + self.gama_dl[i][l])/(self.nd[i] + self.gamaSum_d[i])

        for l in xrange(self.sentinum):
            for k in xrange(self.K):
                if k==0:
                    if l==0:
                        continue
                    else:
                        self.p[l][k] = self.p[l][k] + self.p[l-1][self.K-1]
                else:
                    self.p[l][k] = self.p[l][k] + self.p[l][k-1]
        flag= False
        u = random.uniform(0, self.p[self.sentinum-1][self.K-1])
        for sentiment in xrange(self.sentinum):
            for topic in xrange(self.K):
                if self.p[sentiment][topic]>u:
                    flag = True
                    break
            if flag == True:
                break

        self.nd[i] +=1
        self.ndl[i][sentiment] +=1
        self.ndlz[i][sentiment][topic] +=1
        self.nlzw[sentiment][topic][wid] +=1
        self.nlz[sentiment][topic] +=1
        z=[sentiment,topic]
        return z

    def compute_theta(self):
        for m in xrange(self.dset.M):
            for l in xrange(self.sentinum):
                for z in xrange(self.K):
                    self.theta_dlz[m][l][z] = (self.ndlz[m][l][z] + self.alpha_lz[l][z]) \
                                       /(self.ndl[m][l] + self.alphaSum_l[l])

    def compute_phi(self):

        for l in xrange(self.sentinum):
            for z in xrange(self.K):
                for r in xrange(self.dset.V):
                    self.phi_lzw[l][z][r] = (self.nlzw[l][z][r]+ self.beta_lzw[l][z][r])\
                                            /(self.nlz[l][z] + self.betaSum_lz[l][z])

    def compute_Pi(self):
        for m in xrange(self.dset.M):
            for l in xrange(self.sentinum):
                self.pi_dl[m][l] = (self.ndl[m][l]+self.gama_dl[m][l])\
                                   /(self.nd[m]+self.gamaSum_d[m])

def readtrnfile():
    print 'Reading train data...'
    with open(trnfile, 'r') as f:
        docs = f.readlines()

    dset = dataset()
    items_idx = 0
    for line in docs:
        if line not in [""," ","\n"] :
            tmp = line.strip().split()
            # Generate a document object
            doc = Document()
            for item in tmp:
                if dset.word2id.has_key(item):
                    doc.words.append(dset.word2id[item])
                else:
                    dset.word2id[item] = items_idx
                    dset.id2word[items_idx] = item
                    doc.words.append(items_idx)
                    items_idx += 1
            doc.length = len(tmp)
            dset.docs.append(doc)
        else:
            pass

    dset.M = len(dset.docs)
    dset.V = len(dset.word2id)
    print 'There are %d documents' % dset.M
    print 'There are %d items' % dset.V
    print 'Saving wordmap file...'
    dset.writewordmap()
    return dset


def readsentifile():

    print 'Reading sentimen data...'
    with open(sentifile, 'r') as s:
        senti = s.readlines()
    SL=[]
    sdset = SDataset()

    for line in senti:
        if line != "":
            tmp = line.strip().split()
            sdoc = SDocument()

            golabi=tmp[0]
            hasan=[tmp[1],tmp[2],tmp[3]]
            mval = max(hasan)
            midx = hasan.index(mval)

            s=[golabi,midx,mval]
            sdset.sentiLex.append(s)

        else:
            pass

    sdoc.length = len(sdset.sentiLex)
    return sdset

def lda():
    dset = readtrnfile()
    sdset = readsentifile()
    model = Model(dset,sdset)
    model.init_est()
    model.estimate()

if __name__=='__main__':
    lda()

我在scala和for loop中实现3D矩阵时遇到了一些问题,比如我想把数据分到不同的处理器上,每个处理器都有自己的采样(for loop)过程,采样后必须更新一些全局变量,在这里对我来说,代码的简单性和可理解性比优化和高缩短时间更重要,Mlib实现是基于Graphx的,非常难以理解。在

有什么解决办法吗?在


Tags: inselfdocsfortopicifdeftmp