使用PyQt / QtWebkit爬取多个链接

2 投票
4 回答
6515 浏览
提问于 2025-04-16 19:49

我正在尝试抓取一个大型的政府记录网站,这个过程需要用到一种叫“滚雪球”的方法,也就是说,从主搜索页面开始,然后跟着抓取器找到的每个链接去访问下一个页面。

我已经能够使用PyQt加载主页面,参考了这个SiteScraper教程

import sys
from PySide.QtGui import *
from PySide.QtCore import *
from PySide.QtWebKit import *
from BeautifulSoup import BeautifulSoup

class Render(QWebPage):
      def __init__(self, url):
           self.app = QApplication(sys.argv)
           QWebPage.__init__(self)
           self.loadFinished.connect(self._loadFinished)
           self.mainFrame().load(QUrl(url))
           self.app.exec_()

      def _loadFinished(self, result):
           self.frame = self.mainFrame()
           self.app.quit()

def main():
    baseUrl = 'http://www.thesite.gov'
    url = 'http://www.thesite.gov/search'
    r = Render(url)
    html = r.frame.toHtml()
    # use BeautifulSoup to cycle through each regulation
    soup = BeautifulSoup(html)

regs = soup.find('div',{'class':'x-grid3-body'}).findAll('a')

# cycle through list and call up each page separately
for reg in regs:
    link = baseUrl + reg['href']
    link = str(link)
    # use Qt to load each regulation page
    r = Render(link)

    html = r.frame.toHtml() # get actual rendered web page

但是,当我尝试渲染一个新网页时,出现了这个错误:

RuntimeError: A QApplication instance already exists.

我明白这个函数试图调用另一个QApplication实例。但是,我该如何在同一个实例中导航到新页面呢?


class Render(QWebPage):
     def __init__(self, app, url):
          QWebPage.__init__(self)
          self.loadFinished.connect(self._loadFinished)
          self.mainFrame().load(QUrl(url))
          app.exec_()

     def _loadFinished(self, result):
          self.frame = self.mainFrame()

def main():
    app = QApplication(sys.argv)

    baseUrl = 'http://www.thesite.gov'
    url = 'http://www.thesite.gov/search'

    r = Render(app, url)
    html = r.frame.toHtml()

4 个回答

3

你真是疯了!QT的DOM比beautifulsoup好太多了。

把这个:

soup = BeautifulSoup(html)

换成这个:

page = QWebPage()

page.settings().setAttribute(QWebSettings.AutoLoadImages, False)
page.settings().setAttribute(QWebSettings.PluginsEnabled, False)
page.mainFrame().setHtml(html)

dom = page.mainFrame().documentElement()

然后你可以像这样简单地抓取数据:

li = dom.findFirst("body div#content div#special ul > li")

if not li.isNull():
    class = li.attribute("class")
    text  = li.toPlainText()

最后,你应该用QWebView而不是QWebPage。你可以把它设置成像服务器一样,可以通过套接字来控制。这就是我做的:

class QTimerWithPause(QTimer):

    def __init__(self, parent = None):

        super(QTimerWithPause, self).__init__ (parent)

        self.startTime = 0
        self.interval  = 0

        return

    def start(self, interval):

        from time import time

        self.interval  = interval
        self.startTime = time()

        super(QTimerWithPause, self).start(interval)

        return

    def pause(self):

        from time import time

        if self.isActive ():

            self.stop()

            elapsedTime = self.startTime - time()
            self.startTime -= elapsedTime

            # time() returns float secs, interval is int msec
            self.interval -= int(elapsedTime*1000)+1

        return

    def resume(self):

        if not self.isActive():
            self.start(self.interval)

        return


class CrawlerWebServer(QWebView):  

    TIMEOUT = 60
    STUPID  = r"(bing|yahoo|google)"

    def __init__(self, host="0.0.0.0", port=50007, parent=None, enableImages=True, enablePlugins=True):

        # Constructor

        super(CrawlerWebServer, self).__init__(parent)  

        self.command     = None
        self.isLoading   = True
        self.isConnected = False
        self.url         = QUrl("http://mast3rpee.tk/")
        self.timeout     = QTimerWithPause(self)
        self.socket      = QTcpServer(self)


        # 1: Settings

        self.settings().enablePersistentStorage()
        self.settings().setAttribute(QWebSettings.AutoLoadImages, enableImages)
        self.settings().setAttribute(QWebSettings.PluginsEnabled, enablePlugins)
        self.settings().setAttribute(QWebSettings.DeveloperExtrasEnabled, True)

        # 2: Server
        if args.verbosity > 0: print "Starting server..."

        self.socket.setProxy(QNetworkProxy(QNetworkProxy.NoProxy))
        self.socket.listen(QHostAddress(host), int(port))
        self.connect(self.socket, SIGNAL("newConnection()"), self._connect)

        if args.verbosity > 1:
            print "    Waiting for connection(" + host + ":" + str(port) + ")..."

        # 3: Default page

        self._load(10*1000, self._loadFinished)

        return

    def __del__(self):

        try:
            self.conn.close()
            self.socket.close()
        except:
            pass

        return

    def _sendAuth(self):

        self.conn.write("Welcome to WebCrawler server (http://mast3rpee.tk)\r\n\rLicenced under GPL\r\n\r\n")

    def _connect(self): 

        self.disconnect(self.socket, SIGNAL("newConnection()"), self._connect)

        self.conn               = self.socket.nextPendingConnection()
        self.conn.nextBlockSize = 0

        self.connect(self.conn, SIGNAL("readyRead()"), self.io)
        self.connect(self.conn, SIGNAL("disconnected()"), self.close)
        self.connect(self.conn, SIGNAL("error()"), self.close)
        self._sendAuth()

        if args.verbosity > 1:
            print "    Connection by:", self.conn.peerAddress().toString()

        self.isConnected = True

        if self.isLoading == False:
            self.conn.write("\r\nEnter command:")

        return

    def io(self):

        if self.isLoading: return None

        if args.verbosity > 0:
            print "Reading command..."

        data = self.conn.read(1024).strip(" \r\n\t")

        if not data: return None

        elif self.command is not None:
            r = self.command(data)
            self.command = None
            return r

        return self._getCommand(data)

    def _getCommand(self, d):

        from re import search

        d = unicode(d, errors="ignore")

        if search(r"(help|HELP)", d) is not None:

            self.conn.write("URL | JS | WAIT | QUIT\r\n\r\nEnter Command:")

        elif search(r"(url|URL)", d) is not None:

            self.command = self._print

            self.conn.write("Enter address:")

        elif search(r"(js|JS|javascript|JAVASCRIPT)", d) is not None:

            self.command = self._js

            self.conn.write("Enter javascript to execte:")

        elif search(r"(wait|WAIT)", d) is not None:

            self.loadFinished.connect(self._loadFinishedPrint)
            self.loadFinished.connect(self._loadFinished)  

        elif search(r"(quit|QUIT|exit|EXIT)", d) is not None:

            self.close()

        else:

            self.conn.write("Invalid command!\r\n\r\nEnter Command:")

        return

    def _print(self, d):

        u = d[:250]

        self.out(u)

        return True

    def _js(self, d):

        try:
            self.page().mainFrame().evaluateJavaScript(d)

        except:
            pass

        self.conn.write("Enter Javascript:")

        return True

    def _stop(self):

        from time import sleep

        if self.isLoading == False: return

        if args.verbosity > 0:
            print "    Stopping..."

        self.timeout.stop()
        self.stop()

    def _load(self, timeout, after):

        # Loads a page into frame / sets up timeout

        self.timeout.timeout.connect(self._stop)
        self.timeout.start(timeout)

        self.loadFinished.connect(after)  
        self.load(self.url)

        return

    def _loadDone(self, disconnect = None):

        from re   import search
        from time import sleep

        self.timeout.timeout.disconnect(self._stop)
        self.timeout.stop()

        if disconnect is not None:

            self.loadFinished.disconnect(disconnect)

            # Stick a while on the page

            if search(CrawlerWebServer.STUPID, self.url.toString(QUrl.RemovePath)) is not None:
                sleep(5)
            else:
                sleep(1)

        return

    def _loadError(self):

        from time import sleep, time

        if not self.timeout.isActive(): return True

        if args.verbosity > 0: print "    Error retrying..."

        # 1: Pause timeout

        self.timeout.pause()

        # 2: Check for internet connection

        while self.page().networkAccessManager().networkAccessible() == QNetworkAccessManager.NotAccessible: sleep(1)

        # 3: Wait then try again

        sleep(2)
        self.reload()
        self.timeout.resume()

        return False

    def go(self, url, after = None):

        # Go to a specific address

        global args

        if after is None:
            after = self._loadFinished

        if args.verbosity > 0:
            print "Loading url..."

        self.url        = QUrl(url)
        self.isLoading  = True

        if args.verbosity > 1:
            print "   ", self.url.toString()

        self._load(CrawlerWebServer.TIMEOUT * 1000, after)

        return

    def out(self, url):

        # Print html of a a specific url

        self.go(url, self._loadFinishedPrint)

        return

    def createWindow(self, windowType):  

        # Load links in the same web-view.

        return self  

    def _loadFinished(self, ok):

        # Default LoadFinished

        from time import sleep
        from re   import search

        if self.isLoading == False: return

        if ok == False:
            if not self._loadError(): return 

        self._loadDone(self._loadFinished)

        if args.verbosity > 1:
            print "    Done"

        if self.isConnected == True:
            self.conn.write("\r\nEnter command:")

        self.isLoading = False

        return

    def _loadFinishedPrint(self, ok):  

        # Print the evaluated HTML to stdout

        if self.isLoading == False: return

        if ok == False:
            if not self._loadError(): return 

        self._loadDone(self._loadFinishedPrint)  

        if args.verbosity > 1:
            print "    Done"

        h = unicode( self.page().mainFrame().toHtml(), errors="ignore" )

        if args.verbosity > 2:
            print "------------------\n" + h + "\n--------------------"

        self.conn.write(h)
        self.conn.write("\r\nEnter command:")

        self.isLoading  = False

        return

    def contextMenuEvent(self, event):  

        # Context Menu

        menu = self.page().createStandardContextMenu()  
        menu.addSeparator()  
        action = menu.addAction('ReLoad')  

        @action.triggered.connect  
        def refresh():  
            self.load(self.url)

        menu.exec_(QCursor.pos())


class CrawlerWebClient(object):

    def __init__(self, host, port):

        import socket

        global args

        # CONNECT TO SERVER

        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        self.socket.connect((host, port))

        o = self.read()

        if args.verbosity > 2:
            print "\n------------------------------\n" + o + "\n------------------------------\n"

        return

    def __del__(self):

        try: self.socket.close()
        except: pass

    def read(self):

        from re import search

        r = ""

        while True:
            out = self.socket.recv(64*1024).strip("\r\n")

            if out.startswith(r"Enter"):
                break

            if out.endswith(r"Enter command:"):
                r += out[:-14]
                break

            r += out

        return r

    def command(self, command):

        global args

        if args.verbosity > 2:
            print "    Command: [" + command + "]\n------------------------------"

        self.socket.sendall(unicode(command))

        r =  self.read()

        if args.verbosity > 2:
            print r, "\n------------------------------\n"

        return r
6

我也遇到过同样的问题(需要用QWebPage加载多个页面),但是我试了这些答案都没用。这里有一个有效的方法,关键是使用QEventLoop,并把loadFinished连接到loop.quit上:

from PySide import QtCore, QtGui, QtWebKit
import sys

def loadPage(url):
      page = QtWebKit.QWebPage()
      loop = QtCore.QEventLoop() # Create event loop
      page.mainFrame().loadFinished.connect(loop.quit) # Connect loadFinished to loop quit
      page.mainFrame().load(url)
      loop.exec_() # Run event loop, it will end on loadFinished
      return page.mainFrame().toHtml()

app = QtGui.QApplication(sys.argv)

urls = ['https://google.com', 'http://reddit.com', 'http://wikipedia.org']
for url in urls:
      print '-----------------------------------------------------'
      print 'Loading ' + url
      html = loadPage(url)
      print html

app.exit()

这里发一个简化的例子,比起原问题的例子更简单,目的是为了展示核心问题和解决方案。

2

好的,如果你真的需要用JavaScript的话。(你能从JSON中获取答案吗?那样可能会更简单,使用simplejsonjson就可以了。)答案是不要创建多个QApplication。这是不允许的。让main去创建一个QApplication,然后直接使用QWebPage,不用去调用QApplication.exec_()。如果这样不行,就把所有的内容放在另一个QThread里运行。

撰写回答