我正在尝试创建一个程序,使用api调用根据可用用户名自动创建帐户。程序运行良好。但是,我希望它跑得更快。因此,我尝试使用线程,但问题是,每当我使用多个线程时,我的终端就会造成很大的混乱youtube video on what happens
我的代码:
# Login --> Threading --> Checker
def login(self):
self.headers["Authorization"] = "Basic " + self.finalLogin
r = requests.post("https://public-ubiservices.ubi.com/v3/profiles/sessions", json={"Content-Type":"application/json"}, headers=self.headers)
if r.status_code == 200:
if r.json()["ticket"]:
token = "Ubi_v1 t=" + r.json()["ticket"]
self.headers['Authorization'] = token
self.threading()
def checker(self):
[self.usernames.put(line.strip()) for line in open("external/wordlist.txt")]
while not self.usernames.empty():
name = self.usernames.get(); self.usernames.put(name)
url = f"https://public-ubiservices.ubi.com/v3/profiles?nameOnPlatform={name}&platformType=uplay"
try:
r = requests.get(url, headers=self.headers)
if self.checkedCount % 100 == 0:
self.checkedCount += 1
print(f"{Fore.LIGHTBLACK_EX}[+]{Fore.RESET} Message: Using new login")
self.accounts()
ctypes.windll.kernel32.SetConsoleTitleW(f"Gx | Checked: {self.checkedCount}, Available: {self.availableCount}, Errors: {self.errorCount}")
if r.status_code == 200:
self.checkedCount += 1
if len(r.json()['profiles']) != 0:
print(f"{Fore.LIGHTBLACK_EX}[+]{Fore.RESET} Taken: {name}")
else:
print(f"{Fore.LIGHTBLACK_EX}[+]{Fore.RESET} Available: {name}")
self.availableCount += 1
self.create(name)
else:
self.errorCount += 1
print(f"{Fore.LIGHTBLACK_EX}[+]{Fore.RESET} Error: Check errors.txt")
with open('external/errors.txt', "a") as errorFile:
errorFile.write(f'{self.currentTime} | Error code: {r.status_code}, Error message: {r.text}\n')
self.checkedCount += 1
self.accounts()
except Exception:
self.errorCount += 1
pass
def threading(self):
[self.usernames.put(line.strip()) for line in open("external/wordlist.txt")]
for x in range(5):
threading.Thread(target=self.checker, args=()).start()
你所说的混乱就是在运行多个线程时使用
print
时发生的情况。这是因为print
,特别是输出缓冲区,不是线程安全的。如here中更详细的描述,您必须使用线程安全的方法我想有两种选择:
相关问题 更多 >
编程相关推荐