没有项目描述
featureflow的Python项目详细描述
特性流
featureflow是一个python库,它允许用户构建特性 以声明方式提取管道,并控制如何以及在何处 这些特性将被持久化。
用法
下面的例子将计算单个文本中的单词频率 文档,然后覆盖整个文档集,但是特性流 不限于文本数据。它的设计与 经常处理的顺序/流式数据(例如音频或视频) 迭代地,以小块的形式。
你可以看到all the code in this example in one place here。
我们可以定义这样的处理节点图:
importfeatureflowasff@ff.simple_in_memory_settingsclassDocument(ff.BaseModel):""" Define the processing graph needed to extract document-level features, whether, and how those features should be persisted. """raw=ff.ByteStreamFeature(ff.ByteStream,chunksize=128,store=True)checksum=ff.JSONFeature(CheckSum,needs=raw,store=True)tokens=ff.Feature(Tokenizer,needs=raw,store=False)counts=ff.JSONFeature(WordCount,needs=tokens,store=True)
我们可以定义图中引用的各个处理“节点” 上面是这样的:
importfeatureflowasfffromcollectionsimportCounterimportreimporthashlibclassTokenizer(ff.Node):""" Tokenize a stream of text into individual, normalized (lowercase) words/tokens """def__init__(self,needs=None):super(Tokenizer,self).__init__(needs=needs)self._cache=''self._pattern=re.compile('(?P<word>[a-zA-Z]+)\W+')def_enqueue(self,data,pusher):self._cache+=datadef_dequeue(self):matches=list(self._pattern.finditer(self._cache))ifnotmatches:raiseff.NotEnoughData()last_boundary=matches[-1].end()self._cache=self._cache[last_boundary:]returnmatchesdef_process(self,data):yieldmap(lambdax:x.groupdict()['word'].lower(),data)classWordCount(ff.Aggregator,ff.Node):""" Keep track of token frequency """def__init__(self,needs=None):super(WordCount,self).__init__(needs=needs)self._cache=Counter()def_enqueue(self,data,pusher):self._cache.update(data)classCheckSum(ff.Aggregator,ff.Node):""" Compute the checksum of a text stream """def__init__(self,needs=None):super(CheckSum,self).__init__(needs=needs)self._cache=hashlib.sha256()def_enqueue(self,data,pusher):self._cache.update(data)def_process(self,data):yielddata.hexdigest()
我们还可以定义一个图形来处理存储的 文档功能:
importfeatureflowasff@ff.simple_in_memory_settingsclassCorpus(ff.BaseModel):""" Define the processing graph needed to extract corpus-level features, whether, and how those features should be persisted. """docs=ff.Feature(lambdadoc_cls:(doc.countsfordocindoc_cls),store=False)total_counts=ff.JSONFeature(WordCount,needs=docs,store=True)
最后,我们可以执行这些处理图并访问存储的 这样的功能:
from__future__importprint_functionimportargparsedefprocess_urls(urls):forurlinurls:Document.process(raw=url)defsummarize_document(doc):return'doc {_id} with checksum {cs} contains "the" {n} times'.format(_id=doc._id,cs=doc.checksum,n=doc.counts.get('the',0))defprocess_corpus(document_cls):corpus_id=Corpus.process(docs=document_cls)returnCorpus(corpus_id)defsummarize_corpus(corpus):return'The entire text corpus contains "the" {n} times'.format(n=corpus.total_counts.get("the",0))if__name__=='__main__':parser=argparse.ArgumentParser()parser.add_argument('--url',help='specify one or more urls of text files to ingest',required=True,action='append')args=parser.parse_args()process_urls(args.url)fordocinDocument:print(summarize_document(doc))corpus=process_corpus(Document)print(summarize_corpus(corpus))
我们可以:
python wordcount.py \ --url http://textfiles.com/food/1st_aid.txt \ --url http://textfiles.com/food/antibiot.txt \ ...
安装
python头是必需的。您可以通过运行以下命令进行安装:
apt-get install python-dev
numpy是可选的。如果你想用的话, Anaconda分布很高 推荐。
最后,只要
pip install featureflow