搜索域中的所有电子邮件

2024-04-25 09:10:15 发布

您现在位置:Python中文网/ 问答频道 /正文

我能够使用Haveibeenpwend搜索1个帐户的折衷方案。但是,我找不到使用API密钥搜索域上所有电子邮件帐户的泄露的选项。(例如,如果域是xyz.com,我想搜索abc@xyz.com,彼得。charlie@xyz.com等等)。我知道我可以注册的通知电子邮件。但是,这是一个漫长的过程,我更喜欢使用API。 所以,我写了一个脚本来搜索haveibeenpwned,搜索我域中的所有电子邮件地址,但这需要很长时间。我搜索了几个Github项目,但没有找到任何这样的实现。以前有人试过这个吗

我在下面添加了代码。我正在使用多线程方法,但仍然需要很长时间,是否还有其他优化策略可以使用?请帮忙。多谢各位

import requests, json
import threading
from time import sleep
import datetime
import splunklib.client as client
import splunklib.results as results
date = datetime.datetime.now()
from itertools import islice
import linecache
import sys

def PrintException():
    exc_type, exc_obj, tb = sys.exc_info()
    f = tb.tb_frame
    lineno = tb.tb_lineno
    filename = f.f_code.co_filename
    linecache.checkcache(filename)
    line = linecache.getline(filename, lineno, f.f_globals)
    print 'EXCEPTION IN ({}, LINE {} "{}"): {}'.format(filename, lineno, line.strip(), exc_obj)


class myThread (threading.Thread):
   def __init__(self, threadID, name, list_emails):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.list_emails = list_emails
   def run(self):
      i=0
      print "Starting " + self.name
      for email in self.list_emails:
          print i
          i=i+1
          result = check_pasteaccount(email)
          print email
          print result
          print result
      print "Exiting " + self.name

def check_pasteaccount(account):
    account = str(account)
    result = ""
    URL = "https://haveibeenpwned.com/api/v3/pasteaccount/%s?truncateResponse=false" % (account)
    # print(URL)
    headers= {'hibp-api-key':api_key}
    result = ""
    try:
        r = requests.get(url=URL,headers=headers)
        # sleep(2)
        status_code = r.status_code
        if status_code == 200:
            data = r.text
            result = []
            for entry in json.loads(data.decode('utf8')):
                if int((date - datetime.datetime.strptime(entry['Date'], '%Y-%m-%dT%H:%M:%SZ')).days) > 120:
                    pass
                else:
                    result.append(['Title: {0}'.format(entry['Title']), \
                                  'Source: {0}'.format(['Source']), \
                                  'Paste ID: {0}'.format(entry['Id'])])

            if len(result) == 0:
                result = "No paste reported for given account and time frame."
            else:
                paste_result = ""
                for entry in result:
                    for item in entry:
                        paste_result += str(item) + "\r\n"
                    paste_result += "\r\n"
                result = paste_result
        elif status_code == 404:
            result = "No paste for the account"
        else:
            if status_code == 429:
                sleep(5)
                # print "Limit exceeded, sleeping"
                result = check_pasteaccount(account)
            else:
                result = "Exception"
                print status_code
    except Exception as e:
        result = "Exception"
        PrintException()
        pass
    return result

def split_every(n, iterable):
    iterable = iter(iterable)
    for chunk in iter(lambda: list(islice(iterable, n)), []):
        yield chunk


def main():
    print datetime.datetime.now()
    # Fetching the list of email addresses from Splunk
    list_emails = connect_splunk()
    print datetime.datetime.now()
    i=0
    list_split = split_every(1000,list_emails)
    threads=[]
    for list in list_split:
        i=i+1
        thread_name = "Thread" + str(i)
        thread = myThread(1, thread_name, list)
        thread.start()
        threads.append(thread)
    # Wait for all the threads to complete
    for t in threads:
        t.join()
    print "Completed Search"


Tags: nameinimportselffordatetimedefstatus
1条回答
网友
1楼 · 发布于 2024-04-25 09:10:15

下面是一个使用标准multiprocessing库而不是手动线程系统的脚本的较短且可能更高效的版本

  • 因为我们使用的是f字符串,所以您需要Python3.6+
  • 您需要为奇特的进度条安装tqdm模块
  • 您可以使用pool size参数调整并发请求的数量
  • 输出以机器可读的JSON Lines格式写入时间戳文件
  • 单个请求会话是共享的(每个工作者),这意味着连接HIBP所花费的时间更少
import datetime
import json
import multiprocessing
import random
import time

import requests
import tqdm

HIBP_PARAMS = {
    "truncateResponse": "false",
}

HIBP_HEADERS = {
    "hibp-api-key": "xxx",
}

sess = requests.Session()


def check_pasteaccount(account):
    while True:
        resp = sess.get(
            url=f"https://haveibeenpwned.com/api/v3/pasteaccount/{account}",
            params=HIBP_PARAMS,
            headers=HIBP_HEADERS,
        )
        if resp.status_code == 429:
            print("Quota exceeded, waiting for a while")
            time.sleep(random.uniform(3, 7))
            continue

        if resp.status_code >= 400:
            return {
                "account": account,
                "status": resp.status_code,
                "result": resp.text,
            }

        return {
            "account": account,
            "status": resp.status_code,
            "result": resp.json(),
        }


def connect_splunk():
    # TODO: return emails
    return []


def main():
    list_emails = [str(account) for account in connect_splunk()]
    datestamp = datetime.datetime.now().isoformat().replace(":", "-")
    output_filename = f"accounts-log-{datestamp}.jsonl"
    print(f"Accounts to look up: {len(list_emails)}")
    print(f"Output filename:     {output_filename}")
    with multiprocessing.Pool(processes=16) as p:
        with open(output_filename, "a") as f:
            results_iterable = p.imap_unordered(
                check_pasteaccount, list_emails, chunksize=20
            )
            for result in tqdm.tqdm(
                results_iterable,
                total=len(list_emails),
                unit="acc",
                unit_scale=True,
            ):
                print(json.dumps(result, sort_keys=True), file=f)


if __name__ == "__main__":
    main()

相关问题 更多 >