luigi批处理模块用于is直批处理任务

2024-05-26 07:47:25 发布

您现在位置:Python中文网/ 问答频道 /正文

我有500个链接下载,并想批处理它们,例如10个项目。在

这个伪代码是什么样子的?在

class BatchJobTask(luigi.Task)
    items = luigi.Parameter()
    def run(self):
        listURLs = []
        with ('urls_chunk', 'r') as urls
            for line in urls:
                listURLs.append('http://ggg'+line+'.org')
            10_urls = listURLs[0:items] #10 items here
            for i in 10_urls:
                req = request.get(url)
                req.contents
    def output(self):
        return self.LocalTarger("downloaded_filelist.txt")

class BatchWorker(luigi.Task)
    def run(self)
        # Here I should run BatchJobTask from 0 to 10, next 11 - 21 new etc...

会怎么样?在


Tags: runinselffortask链接defline
2条回答

这里有一种方法可以像您想要的那样做,但是字符串列表作为单独的行存储在一个文件中。在

import luigi
import requests

BATCH_SIZE = 10


class BatchProcessor(luigi.Task):
    items = luigi.ListParameter()
    max = luigi.IntParameter()

    def requires(self):
        return None

    def output(self):
        return luigi.LocalTarget('processed'+str(max)+'.txt')

    def run(self):
        for item in self.items:
            req = requests.get('http://www.'+item+'.org')
            # do something useful here
            req.contents
        open("processed"+str(max)+".txt",'w').close()


class BatchCreator(luigi.Task):
    file_with_urls = luigi.Parameter()

    def requires(self):
        required_tasks = []
        f = open(self.file_with_urls)
        batch_index = 0
        total_index = 0
        lines = []
        while True:
            line = f.readline()
            if not line: break
            total_index += 1
            if batch_index < BATCH_SIZE:
                lines.append(line)
                batch_index += 1
            else:
                required_tasks.append(BatchProcessor(batch=lines))
                lines = [line]
                batch_index = 1
        return required_tasks

    def output(self):
        return luigi.LocalTarget(str(self.file_with_urls) + 'processed')

    def run(self):
        open(str(self.file_with_urls) + 'processed', 'w').close()

我做到了。在

class GetListtask(luigi.Task)
    def run(self):
        ...
    def output(self):
    return luigi.LocalTarget(self.outputfile)

class GetJustOneFile(luigi.Task):
    fid = luigi.IntParameter()
    def requires(self):
        pass

    def run(self):
        url = 'http://my-server.com/test' + str(self.fid) + '.txt'
        download_file = requests.get(url, stream=True)
        with self.output().open('w') as downloaded_file:
            downloaded_file.write(str(download_file.content))

    def output(self):
        return luigi.LocalTarget("test{}.txt".format(self.fid))


class GetAllFiles(luigi.WrapperTask):
    def requires(self):
        listoffiles = []  # 0..999
        for i in range(899):
            listoffiles.append(i)
        return [GetJustOneFile(fid=fileid) for fileid in listoffiles]

这个代码很糟糕吗?在

相关问题 更多 >