scrapy + couchbase:使用中间件还是管道?如何存储和获取数据

1 投票
3 回答
881 浏览
提问于 2025-04-17 22:44

我想用scrapycouchbase一起存储和获取数据。

为了存储和获取我的数据,我对该采用什么解决方案感到困惑:

  1. 我应该实现一个管道吗?

我指的是像这样:

    Class CouchbasePipeline(object):
      def __init__(self):
          ## init client here using settings

    def process_item(self, item, spider):
          ## store item here 
  1. 还是我应该实现一个下载中间件?

像这样:

Class CouchBaseCacheStorage(object):

   def __init__(self, settings):
      ## init client here using settings

   def get_response(self, spider, request):
       pass


   def save_response(self, spider, request, response):
      pass

或者我可能应该同时实现这两者?(管理缓存/数据库)。

我真的很困惑,特别是我对python/couchbase/scrapy都很陌生。我的问题不是关于最佳的实现或工具,而是关于做这些scrapy事情的标准方式,因为我在文档或网上找不到相关信息。

提前感谢任何帮助。

3 个回答

0

存储数据的标准方法是使用项目管道,但如果你想获取数据,我觉得应该使用下载中间件。为了更清楚地理解,可以查看Scrapy的架构概述,特别是下面这个图:

Scrapy architecture

1

这是我实现的解决方案:

  1. 我使用了信号/事件来确保每个爬虫只初始化/关闭一次Couchbase,因为连接服务器需要一些额外的时间。
  2. 对于每个项目,我假设有一个字段用来创建键。你应该根据自己的需求来修改它。

代码:

from scrapy.conf import settings
from couchbase.exceptions import CouchbaseError
from couchbase import Couchbase

class CouchbaseStore(object):
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler.settings)


    def __init__(self,settings):
        self._server = settings.get('COUCHBASE_SERVER')
        self._bucket = settings.get('COUCHBASE_BUCKET')
        self._password = settings.get('COUCHBASE_PASSWORD')
        dispatcher.connect(self.spider_opened, signals.spider_opened)
        dispatcher.connect(self.spider_closed, signals.spider_closed)


    def process_item(self, item, spider):
        data = {}
        for key in item.keys():
            if isinstance(item[key], datetime):
                data[key] = item[key].isoformat()
            else:
                data[key] = item[key]
        ## I assume item have a unique time field
        key = "{0}".format(item['time'].isoformat())
        self.cb.set(key,data)
        log.msg("Item with key % s stored in bucket %s/ node %s" %
                        (key, settings['COUCHBASE_BUCKET'],   
                              settings['COUCHBASE_SERVER']),
                        level=log.INFO, spider=spider)  
        return item

    def spider_opened(self, spider):
        self._server = settings['COUCHBASE_SERVER']
        self._bucket = settings['COUCHBASE_BUCKET']
        self._password = settings['COUCHBASE_PASSWORD']
        try:
            self.cb = Couchbase.connect(self._bucket)
        except CouchbaseError:
            log.msg('Connection problem to bucket %s'%self._bucket,
                     log.ERROR)
        log.msg("CouchbaseStore.spider_opened called", 
                     level=log.DEBUG)
    def spider_closed(self, spider):
        self.cb._close()
        log.msg("CouchbaseStore.spider_closed called", 
                 level=log.DEBUG)
1

在@agstudy发布的回答之后,这里有一些代码建议。

  • 可以参考这个例子来连接信号:http://doc.scrapy.org/en/latest/topics/extensions.html#sample-extension
  • 你已经在__init__里保存了设置,所以之后可以直接使用这些设置。
  • 不要使用from scrapy.conf import settings这行代码。
  • 增加了_port这个选项。
  • (外观调整)把self.cb改成self.couchbase,这样就不会和“回调”搞混了。

见下文:

from scrapy import signals
from couchbase.exceptions import CouchbaseError
from couchbase import Couchbase

class CouchbaseStore(object):

    @classmethod
    def from_crawler(cls, crawler):
        o = cls(crawler.settings)
        crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)
        crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)
        return o

    def __init__(self, settings):
        self._server = settings.get('COUCHBASE_SERVER')
        self._port = settings.get('COUCHBASE_PORT', 8091)
        self._bucket = settings.get('COUCHBASE_BUCKET')
        self._password = settings.get('COUCHBASE_PASSWORD')

    def process_item(self, item, spider):
        data = {}
        for key in item.keys():
            if isinstance(item[key], datetime):
                data[key] = item[key].isoformat()
            else:
                data[key] = item[key]
        ## I assume item have a unique time field
        key = "{0}".format(item['time'].isoformat())
        self.couchbase.set(key, data)
        log.msg("Item with key % s stored in bucket %s/ node %s" %
                    (key, self._bucket, self._server),
                level=log.INFO, spider=spider)  
        return item

    def spider_opened(self, spider):
        try:
            self.couchbase = Couchbase.connect(bucket=self._bucket,
                                               host=self._server,
                                               post=self._port,
                                               password=self._password)
        except CouchbaseError:
            log.msg('Connection problem to bucket %s'% self._bucket,
                     log.ERROR)
        log.msg("CouchbaseStore.spider_opened called", level=log.DEBUG)

    def spider_closed(self, spider):
        self.couchbase._close()
        log.msg("CouchbaseStore.spider_closed called", level=log.DEBUG)

撰写回答