我有一个Python代码,我想把它转换成Scala Spark,我的算法是LDA(潜在Dirichlet Allocation)的扩展,因为这个算法有一个采样过程,当数据非常大时非常耗时,我知道SparkMlib中有一个LDA的实现版本,但是我想单独编写我的算法再加上一些改进,我的代码是:
def init_est(self):
self.nd = [0 for x in xrange(self.dset.M)]
self.ndl = [[0 for y in xrange(self.sentinum)] for x in xrange(self.dset.M)]
self.ndlz = [[[0 for z in xrange(self.K)] for y in xrange(self.sentinum)] for x in xrange(self.dset.M)]
self.nlzw = [[[0 for z in xrange(self.dset.V)] for y in xrange(self.K)] for x in xrange(self.sentinum)]
self.nlz = [[0 for y in xrange(self.K)] for x in xrange(self.sentinum)]
self.p = [[0.0 for y in xrange(self.K)] for x in xrange(self.sentinum)]
self.Z = [ [] for x in xrange(self.dset.M)]
self.L = [ [] for x in xrange(self.dset.M)]
self.pi_dl = [[0 for y in xrange(self.sentinum)]for x in xrange(self.dset.M)]
self.theta_dlz= [[[0 for z in xrange(self.K)]for y in xrange(self.sentinum)]for x in xrange(self.dset.M)]
self.phi_lzw = [[[0 for z in xrange(self.dset.V)]for y in xrange(self.K)]for x in xrange(self.sentinum)]
self.alpha_lz = [[0 for y in xrange(self.K)]for x in xrange(self.sentinum)]
self.alphaSum_l = [0 for x in xrange(self.sentinum)]
self.beta_lzw = [[[0 for z in xrange(self.dset.V)]for y in xrange(self.K)]for x in xrange(self.sentinum)]
self.betaSum_lz = [[0 for y in xrange(self.K)]for x in xrange(self.sentinum)]
self.lambda_lw = [[0 for y in xrange(self.dset.V)]for x in xrange(self.sentinum)]
self.gama_dl = [[0 for y in xrange(self.sentinum)]for x in xrange(self.dset.M)]
self.gamaSum_d = [0 for x in xrange(self.dset.M)]
for x in xrange(self.dset.M):
self.Z[x] = [0 for y in xrange(self.dset.docs[x].length)]
self.L[x] = [0 for y in xrange(self.dset.docs[x].length)]
for y in xrange(self.dset.docs[x].length):
topic = random.randint(0, self.K-1)
self.Z[x][y] = topic
sentiment = random.randint(0,self.sentinum-1)
for i in xrange(len(self.sdset.sentiLex)):
if (self.dset.id2word[self.dset.docs[x].words[y]]) == self.sdset.sentiLex[i][0]:
sentiment = self.sdset.sentiLex[i][1]
#print sentiment
self.L[x][y] = sentiment
self.nd[x] +=1
self.ndl[x][sentiment] +=1
self.ndlz[x][sentiment][topic] +=1
self.nlzw[sentiment][topic][self.dset.docs[x].words[y]] +=1
self.nlz[sentiment][topic] +=1
def estimate(self):
print 'Sampling %d iterations!' % self.iter_num
for x in xrange(self.iter_num):
print 'Iteration %d ...' % (x+1)
for i in xrange(len(self.dset.docs)):
for j in xrange(self.dset.docs[i].length):
lst = self.sampling(i, j)
sentiment = lst[0]
topic = lst[1]
self.Z[i][j] = topic
self.L[i][j] = sentiment
if (self.updateParaStep > 0 ) & (x % self.updateParaStep == 0):
self.updateParameters()
if (self.savestep > 0) & (x % self.savestep == 0):
if (x == self.iter_num):
break
print 'Saving the model at iteration...'
self.compute_theta()
self.compute_phi()
self.compute_Pi()
self.save_model()
def sampling(self, i, j):
sentiment = self.L[i][j]
topic = self.Z[i][j]
wid = self.dset.docs[i].words[j]
self.nd[i] -=1
self.ndl[i][sentiment] -=1
self.ndlz[i][sentiment][topic] -=1
self.nlzw[sentiment][topic][wid] -=1
self.nlz[sentiment][topic] -=1
for l in xrange(self.sentinum):
for k in xrange(self.K):
self.p[l][k] = (self.nlzw[l][k][wid] + self.beta_lzw[l][k][wid])/(self.nlz[l][k] + self.betaSum_lz[l][k]) * \
(self.ndlz[i][l][k] + self.alpha_lz[l][k])/(self.ndl[i][l] + self.alphaSum_l[l]) * \
(self.ndl[i][l] + self.gama_dl[i][l])/(self.nd[i] + self.gamaSum_d[i])
for l in xrange(self.sentinum):
for k in xrange(self.K):
if k==0:
if l==0:
continue
else:
self.p[l][k] = self.p[l][k] + self.p[l-1][self.K-1]
else:
self.p[l][k] = self.p[l][k] + self.p[l][k-1]
flag= False
u = random.uniform(0, self.p[self.sentinum-1][self.K-1])
for sentiment in xrange(self.sentinum):
for topic in xrange(self.K):
if self.p[sentiment][topic]>u:
flag = True
break
if flag == True:
break
self.nd[i] +=1
self.ndl[i][sentiment] +=1
self.ndlz[i][sentiment][topic] +=1
self.nlzw[sentiment][topic][wid] +=1
self.nlz[sentiment][topic] +=1
z=[sentiment,topic]
return z
def compute_theta(self):
for m in xrange(self.dset.M):
for l in xrange(self.sentinum):
for z in xrange(self.K):
self.theta_dlz[m][l][z] = (self.ndlz[m][l][z] + self.alpha_lz[l][z]) \
/(self.ndl[m][l] + self.alphaSum_l[l])
def compute_phi(self):
for l in xrange(self.sentinum):
for z in xrange(self.K):
for r in xrange(self.dset.V):
self.phi_lzw[l][z][r] = (self.nlzw[l][z][r]+ self.beta_lzw[l][z][r])\
/(self.nlz[l][z] + self.betaSum_lz[l][z])
def compute_Pi(self):
for m in xrange(self.dset.M):
for l in xrange(self.sentinum):
self.pi_dl[m][l] = (self.ndl[m][l]+self.gama_dl[m][l])\
/(self.nd[m]+self.gamaSum_d[m])
def readtrnfile():
print 'Reading train data...'
with open(trnfile, 'r') as f:
docs = f.readlines()
dset = dataset()
items_idx = 0
for line in docs:
if line not in [""," ","\n"] :
tmp = line.strip().split()
# Generate a document object
doc = Document()
for item in tmp:
if dset.word2id.has_key(item):
doc.words.append(dset.word2id[item])
else:
dset.word2id[item] = items_idx
dset.id2word[items_idx] = item
doc.words.append(items_idx)
items_idx += 1
doc.length = len(tmp)
dset.docs.append(doc)
else:
pass
dset.M = len(dset.docs)
dset.V = len(dset.word2id)
print 'There are %d documents' % dset.M
print 'There are %d items' % dset.V
print 'Saving wordmap file...'
dset.writewordmap()
return dset
def readsentifile():
print 'Reading sentimen data...'
with open(sentifile, 'r') as s:
senti = s.readlines()
SL=[]
sdset = SDataset()
for line in senti:
if line != "":
tmp = line.strip().split()
sdoc = SDocument()
golabi=tmp[0]
hasan=[tmp[1],tmp[2],tmp[3]]
mval = max(hasan)
midx = hasan.index(mval)
s=[golabi,midx,mval]
sdset.sentiLex.append(s)
else:
pass
sdoc.length = len(sdset.sentiLex)
return sdset
def lda():
dset = readtrnfile()
sdset = readsentifile()
model = Model(dset,sdset)
model.init_est()
model.estimate()
if __name__=='__main__':
lda()
我在scala和for loop中实现3D矩阵时遇到了一些问题,比如我想把数据分到不同的处理器上,每个处理器都有自己的采样(for loop)过程,采样后必须更新一些全局变量,在这里对我来说,代码的简单性和可理解性比优化和高缩短时间更重要,Mlib实现是基于Graphx的,非常难以理解。在
有什么解决办法吗?在
目前没有回答
相关问题 更多 >
编程相关推荐