Python多线程变量被覆盖和混淆

0 投票
2 回答
1909 浏览
提问于 2025-04-18 16:56

现在它每个线程尝试建立两个连接,但还是失败了。

我觉得我解决了共享访问的问题,因为它使用了 self.x 而不是局部变量?

#!/usr/bin/python
from xml.etree.ElementTree import fromstring
from socks import socksocket, PROXY_TYPE_SOCKS5
from socket import socket, AF_INET, SOCK_STREAM
from linecache import getline
from threading import Thread, current_thread, Lock, activeCount
from os.path import isfile, getmtime
from urllib import urlopen
from time import time, sleep
from sys import exit
from json import loads
from random import randint, randrange, choice
from urlparse import parse_qs
from pprint import pprint

class myThread (Thread):
    def __init__(self, threadID, name):
        Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        
    def run(self):
        self.user = parse_qs(getline('./_files/ids.txt', randint(1, idLen)).strip("\n"))
        self.proxy = getline('./_files/proxies.txt', randint(1, proxyLen)).strip("\n").split(":")
        
        self.user2 = parse_qs(getline('./_files/ids.txt', randint(1, idLen)).strip("\n"))
        self.proxy2 = getline('./_files/proxies.txt', randint(1, proxyLen)).strip("\n").split(":")
        try:
            self.socket = socksocket(AF_INET, SOCK_STREAM)
            self.socket.settimeout(5)
            self.socket.setproxy(PROXY_TYPE_SOCKS5, self.proxy[0], int(self.proxy[1]))
            
            self.socket2 = socksocket(AF_INET, SOCK_STREAM)
            self.socket2.settimeout(5)
            self.socket2.setproxy(PROXY_TYPE_SOCKS5, self.proxy2[0], int(self.proxy2[1]))
            
            self.socket.connect((chatConnection[0], int(chatConnection[1])))
            self.socket2.connect((chatConnection[0], int(chatConnection[1])))
            
            send(self.socket, "<y r=\"%s\" v=\"0\" u=\"%s\" />\0" % (room, self.user["UserId"][0]))
            send(self.socket2, "<y r=\"%s\" v=\"0\" u=\"%s\" />\0" % (room, self.user2["UserId"][0]))
            
            self.data = read(self.socket)
            self.data2 = read(self.socket2)
            
            if self.data == "" or not self.data: return
            if self.data2 == "" or not self.data2: return
            
            self.xml = fromstring(self.data.strip(chr(0))).attrib
            self.xml2 = fromstring(self.data2.strip(chr(0))).attrib
            
            self.bSock = socket(AF_INET, SOCK_STREAM)
            self.bSock.settimeout(5)
            
            self.bSock2 = socket(AF_INET, SOCK_STREAM)
            self.bSock2.settimeout(5)
            
            self.bSock.connect(("127.0.0.1", 1337))

            send(self.bSock, "<bot p=\"%s\" yi=\"%s\" au=\"%s\"  />\0" % (self.xml["p"], self.xml["i"], self.xml["au"]))
            self.data = read(self.bSock)
            
            send(self.bSock, "<bot p=\"%s\" yi=\"%s\" au=\"%s\"  />\0" % (self.xml2["p"], self.xml2["i"], self.xml2["au"]))
            self.data2 = read(self.bSock)
            
            self.data = self.data.replace("_lol", "")
            self.l5 = self.data[self.data.find('l5="') + 4:]
            self.l5 = self.l5[:self.l5.find('"')]
            self.ya = self.data[self.data.find('c="') + 3:]
            self.ya = self.ya[:self.ya.find('"')]
            
            self.data2 = self.data2.replace("_lol", "")
            self.l52 = self.data2[self.data2.find('l5="') + 4:]
            self.l52 = self.l52[:self.l52.find('"')]
            self.ya2 = self.data2[self.data2.find('c="') + 3:]
            self.ya2 = self.ya2[:self.ya2.find('"')]
            
            print self.ya2 + " : " + self.l52
            
            self.bSock.close()
            
            self.yaSock = socksocket(AF_INET, SOCK_STREAM)
            self.yaSock.settimeout(5)
            self.yaSock.setproxy(PROXY_TYPE_SOCKS5, self.proxy[0], int(self.proxy[1]))
            self.yaSock.connect((chatConnection[0], int(chatConnection[1])))
            
            self.yaSock2 = socksocket(AF_INET, SOCK_STREAM)
            self.yaSock2.settimeout(5)
            self.yaSock2.setproxy(PROXY_TYPE_SOCKS5, self.proxy2[0], int(self.proxy2[1]))
            self.yaSock2.connect((chatConnection[0], int(chatConnection[1])))
            
            
            send(self.yaSock, "<ya r=\"%s\" u=\"%s\" c=\"%s\" k=\"%s\" />\0" % (room, self.user["UserId"][0], self.ya, self.xml["k"]))
            print read(self.yaSock)
            self.yaSock.close()
            
            send(self.yaSock2, "<ya r=\"%s\" u=\"%s\" c=\"%s\" k=\"%s\" />\0" % (room, self.user2["UserId"][0], self.ya2, self.xml2["k"]))
            print read(self.yaSock2)
            self.yaSock2.close()
            
            self.j2 = "<j2 Y=\"2\" l5=\"" + self.l5 + "\" l4=\"1200\" l3=\"844\" l2=\"0\" cb=\"0\" q=\"1\" y=\"" + self.xml["i"] + "\" k=\"" + self.user["k1"][0] + "\" k3=\"0\" p=\"0\" c=\"" + room + "\" f=\"2\" u=\"" + self.user["UserId"][0] + "\" d0=\"0\" n=\"Zuhnny\" a=\"1\" h=\"xat sux\" v=\"0\" />\0"
            self.j22 = "<j2 Y=\"2\" l5=\"" + self.l52 + "\" l4=\"1200\" l3=\"844\" l2=\"0\" cb=\"0\" q=\"1\" y=\"" + self.xml2["i"] + "\" k=\"" + self.user2["k1"][0] + "\" k3=\"0\" p=\"0\" c=\"" + room + "\" f=\"2\" u=\"" + self.user2["UserId"][0] + "\" d0=\"0\" n=\"Zuhnny\" a=\"1\" h=\"xat sux\" v=\"0\" />\0"

            send(self.socket, self.j2)
            send(self.socket2, self.j22)
            
            while True:
                print self.socket.recv(6096)
                print self.socket2.recv(6096)
                sleep(1)
                send(self.socket, "<m t=\" F U C K X A T %s\" u=\"%s\" />\0" % (randint(0,5000), self.user["UserId"][0]))
                send(self.socket2, "<m t=\" F U C K X A T %s\" u=\"%s\" />\0" % (randint(0,5000), self.user2["UserId"][0]))
        except IOError, err:  pass
        except Exception, error: pass
            
def read(socket):
    data = socket.recv(1024)
    return data
    
def send(socket, data):
    socket.sendall(data)
    
def getChatConnection(room):
    print '\ntest\n'
    if not isfile('./_files/ips.txt') or time() - getmtime('./_files/ips.txt') > 86400:
        fh = open('./_files/ips.txt', 'w')
        fh.write(urlopen('http://xat.com/web_gear/chat/ip2.htm?' + str(time())).read())
        fh.close()
    try:
        fh = open('./_files/ips.txt', 'r')
        iprules = loads(fh.read())
        
        Fx = iprules[iprules["order"][0][0]]
        xAddr = Fx[1][randint(0, len(Fx[1]) - 1)].split(':')
        
        if len(xAddr) == 1: xAddr.append(10000)
        if len(xAddr) == 2: xAddr.append(39)
        
        xPort = xAddr[1] + randint(0, xAddr[2] - 1)
        return (xAddr[0], 9999 + int(room) if int(room) < 8 else 10007 + (int(room) % 32))
    except Exception, e:
        print e

file = open("./_files/proxies.txt")
proxyLen = len(map(lambda(x): x.split(':'), file))

file2 = open("./_files/ids.txt")
idLen = len(map(lambda(x): x.split('\n'), file2))

threadLock = Lock()
threads = []

room = raw_input("Room ID to raid: ")
chatConnection = getChatConnection(room)

for x in range(1000):
    threads.append(myThread(x, "Thread-" + str(x)).start())

# Wait for all threads to complete
for t in threads:
    t.join()
    
print "Exiting Main Thread"

2 个回答

0

我猜你的问题可能不是你想的那样。我没有仔细看你的代码,但我没有看到有什么全局变量或者共享变量被改变。不过,我发现了一个不同的问题。

你没有把读取的数据进行缓冲;你只是期待每次调用 bSock.recv(1024) 都能收到一条完整的消息。其实TCP的工作方式并不是这样的;你可能会收到一条消息的一半,或者两条消息,甚至是上一条消息的后半部分和下一条消息的前半部分。

如果你的电脑和网络没有被过度使用,而且你的消息都比较小,那么在某些情况下,它可能会99.9%地正常工作,这样你就不会注意到有什么问题。但一旦你让系统负荷加重,它就会更频繁地出错。

而且你有400个线程,从你使用的老式代码(比如 except Type, value)来看,你可能在用一个老到只能运行Python 2.5的系统,这意味着你可能在给系统带来很大的压力。

你需要通过在一个循环中接收数据,直到你有一条或多条完整的消息,然后处理这些消息,再回到循环中,而不是把每次 recv 当作一定会收到一条完整消息来处理。

幸运的是,你在处理IRC(假设你没有做任何DCC等),每行只有一个命令,而Python有一个很好的套接字封装,让它们看起来像是行缓冲的文件。所以你可以这样做:

bfile = bsock.makefile()
for line in bfile:

现在你可以确保 line 是一整行,即使它可能需要进行三次读取,并且在下一次循环之前把大部分第三次读取的数据缓冲起来。

你在至少三个地方都在做同样的事情,所以显然你需要把它们都修复。此外,你还需要确保适当地 close 套接字和文件。你还需要检测对方何时关闭了套接字。(这时 recv 或下一个 line 会返回一个空字符串。)

0

还有一种可能性:

所有线程都有一个共同点,那就是它们都在使用同一个 bsock 套接字。而且它们都是在启动后5秒钟的时候开始这样做的:

bSock.sendall("<bot p=\"%s\" au=\"%s\" yi=\"%s\" />\0" % (xml["p"], xml["au"], xml["i"]))
data = bSock.recv(1024)

那么,怎么能防止线程 #42 先执行它的 sendall,然后线程 #23 再执行它的 sendall,接着线程 #42 再执行 recv 并获取本该由它自己接收的数据呢?

这就是所谓的“临界区”或“原子块”:一段代码一次只能由一个线程执行,否则大家都会搞混。通常的解决办法是使用一个 Lock(锁),让每个线程在执行这段代码之前先获取这个锁。如果线程 #42 已经拿到了锁,而线程 #23 想要获取这个锁,它就会被阻塞,直到线程 #42 释放锁,这样就不会出现冲突的情况。所以:

bSockLock = threading.Lock()

# ...

for x in range(400):
    Thread(target = __init__, args=[chatConnection, bSock, bSockLock]).start()

# ...

def __init__(chatConnection, bSock):
    # ...
    for x in range(3):
        start(chatConnection, proxies[x][0], proxies[x][1], [ids[x]["UserId"][0], ids[x]["k1"][0], ids[x]["k2"][0]], room, bSock, bSockLock)

# ...

def start(chatConnection, proxyIP, proxyPort, user, room, bSock, bSockLock):
    # ...
    with bSockLock:
        bSock.sendall("<bot p=\"%s\" au=\"%s\" yi=\"%s\" />\0" % (xml["p"], xml["au"], xml["i"]))
        data = bSock.recv(1024)

撰写回答