Flask应用:在函数运行时更新进度条

54 投票
3 回答
60617 浏览
提问于 2025-04-18 10:00

我正在用Flask搭建一个相对简单的网页应用,这个应用通过网站的API来执行一些功能。我的用户需要填写一个表单,里面有他们的账户网址和API令牌;当他们提交表单后,我有一个Python脚本可以通过API从他们的账户导出PDF文件。这个过程可能会花费比较长的时间,所以我想在表单页面上显示一个Bootstrap进度条,来指示脚本的执行进度。我的问题是,如何在这个功能运行时更新进度条?下面是我想说的一个简化版本。

views.py:

@app.route ('/export_pdf', methods = ['GET', 'POST'])
def export_pdf():
    form = ExportPDF()
    if form.validate_on_submit():
      try:
        export_pdfs.main_program(form.account_url.data,
          form.api_token.data)
        flash ('PDFs exported')
        return redirect(url_for('export_pdf'))
      except TransportException as e:
        s = e.content
        result = re.search('<error>(.*)</error>', s)
        flash('There was an authentication error: ' + result.group(1))
      except FailedRequest as e:
        flash('There was an error: ' + e.error)
    return render_template('export_pdf.html', title = 'Export PDFs', form = form)

export_pdf.html:

{% extends "base.html" %}

{% block content %}
{% include 'flash.html' %}
<div class="well well-sm">
  <h3>Export PDFs</h3>
  <form class="navbar-form navbar-left" action="" method ="post" name="receipt">
    {{form.hidden_tag()}}
    <br>
    <div class="control-group{% if form.errors.account_url %} error{% endif %}">
      <label class"control-label" for="account_url">Enter Account URL:</label>
      <div class="controls">
        {{ form.account_url(size = 50, class = "span4")}}
        {% for error in form.errors.account_url %}
          <span class="help-inline">[{{error}}]</span><br>
        {% endfor %}
      </div>
    </div>
    <br>
    <div class="control-group{% if form.errors.api_token %} error{% endif %}">
      <label class"control-label" for="api_token">Enter API Token:</label>
      <div class="controls">
        {{ form.api_token(size = 50, class = "span4")}}
        {% for error in form.errors.api_token %}
          <span class="help-inline">[{{error}}]</span><br>
        {% endfor %}
      </div>
    </div>
    <br>
    <button type="submit" class="btn btn-primary btn-lg">Submit</button>
  <br>
  <br>
  <div class="progress progress-striped active">
  <div class="progress-bar"  role="progressbar" aria-valuenow="0" aria-valuemin="0" aria-valuemax="100" style="width: 0%">
    <span class="sr-only"></span>
  </div>
</form>
</div>
</div>
{% endblock %}

还有export_pdfs.py:

def main_program(url, token):
    api_caller = api.TokenClient(url, token)
    path = os.path.expanduser('~/Desktop/'+url+'_pdfs/')
    pdfs = list_all(api_caller.pdf.list, 'pdf')
    total = 0
    count = 1
    for pdf in pdfs:
        total = total + 1
    for pdf in pdfs:
        header, body = api_caller.getPDF(pdf_id=int(pdf.pdf_id))
        with open('%s.pdf' % (pdf.number), 'wb') as f:
          f.write(body)
        count = count + 1
        if count % 50 == 0:
          time.sleep(1)

在最后一个函数中,我计算了要导出的PDF总数,并在处理过程中进行实时计数。我该如何将当前的进度发送到我的.html文件,以便能够适应进度条的'style='标签?最好是能让我在其他页面的进度条上也能重复使用同样的工具。如果我提供的信息不够,请告诉我。

3 个回答

1

我在本地运行了一个简单但很有教育意义的Flask SSE实现。为了在GAE中处理第三方(用户上传的)库,您可以按照以下步骤操作:

  1. 在您的根目录下创建一个名为 lib 的文件夹。
  2. gevent 库的文件夹复制到 lib 文件夹中。
  3. 在您的 main.py 文件中添加以下几行代码:

    import sys
    sys.path.insert(0,'lib')
    
  4. 就这样。如果您从子文件夹使用 lib 文件夹,请使用相对路径引用: sys.path.insert(0, ../../blablabla/lib')

来源于 http://flask.pocoo.org/snippets/116/

# author: oskar.blom@gmail.com
#
# Make sure your gevent version is >= 1.0
import gevent
from gevent.wsgi import WSGIServer
from gevent.queue import Queue

from flask import Flask, Response

import time


# SSE "protocol" is described here: http://mzl.la/UPFyxY
class ServerSentEvent(object):

    def __init__(self, data):
        self.data = data
        self.event = None
        self.id = None
        self.desc_map = {
            self.data : "data",
            self.event : "event",
            self.id : "id"
        }

    def encode(self):
        if not self.data:
            return ""
        lines = ["%s: %s" % (v, k) 
                 for k, v in self.desc_map.iteritems() if k]

        return "%s\n\n" % "\n".join(lines)

app = Flask(__name__)
subscriptions = []

# Client code consumes like this.
@app.route("/")
def index():
    debug_template = """
     <html>
       <head>
       </head>
       <body>
         <h1>Server sent events</h1>
         <div id="event"></div>
         <script type="text/javascript">

         var eventOutputContainer = document.getElementById("event");
         var evtSrc = new EventSource("/subscribe");

         evtSrc.onmessage = function(e) {
             console.log(e.data);
             eventOutputContainer.innerHTML = e.data;
         };

         </script>
       </body>
     </html>
    """
    return(debug_template)

@app.route("/debug")
def debug():
    return "Currently %d subscriptions" % len(subscriptions)

@app.route("/publish")
def publish():
    #Dummy data - pick up from request for real data
    def notify():
        msg = str(time.time())
        for sub in subscriptions[:]:
            sub.put(msg)

    gevent.spawn(notify)

    return "OK"

@app.route("/subscribe")
def subscribe():
    def gen():
        q = Queue()
        subscriptions.append(q)
        try:
            while True:
                result = q.get()
                ev = ServerSentEvent(str(result))
                yield ev.encode()
        except GeneratorExit: # Or maybe use flask signals
            subscriptions.remove(q)

    return Response(gen(), mimetype="text/event-stream")

if __name__ == "__main__":
    app.debug = True
    server = WSGIServer(("", 5000), app)
    server.serve_forever()
    # Then visit http://localhost:5000 to subscribe 
    # and send messages by visiting http://localhost:5000/publish
3

我做了一个可以运行并经过测试的例子,使用了线程。你只需要复制粘贴,然后随意修改就可以了。

Python

from flask import Flask, render_template
from threading import Thread
from time import sleep
import json

app = Flask(__name__)
status = None

def task():
  global status
  for i in range(1,11):
    status = i
    sleep(1)

@app.route('/')
def index():
  t1 = Thread(target=task)
  t1.start()
  return render_template('index.html')
  
@app.route('/status', methods=['GET'])
def getStatus():
  statusList = {'status':status}
  return json.dumps(statusList)

if __name__ == '__main__':
  app.run(debug=True)

HTML CSS JS

<!doctype html>
<html>

<head>
  <meta charset="UTF-8">

  <style>
  
  body {
    background-color: #D64F2A;
  }
  
  .progress {
    display: flex;
    position: absolute;
    height: 100%;
    width: 100%;
  }
  
  .status {
    color: white;
    margin: auto;
  }

  .status h2 {
    padding: 50px;
    font-size: 80px;
    font-weight: bold;
  }
  
  </style>

  <title>Status Update</title>

</head>

<body>
  <div class="progress">
    <div class="status">
      <h2 id="innerStatus">Loading...</h2>
    </div>
  </div>
</body>

<script>
var timeout;

async function getStatus() {

  let get;
  
  try {
    const res = await fetch("/status");
    get = await res.json();
  } catch (e) {
    console.error("Error: ", e);
  }
  
  document.getElementById("innerStatus").innerHTML = get.status * 10 + "&percnt;";
  
  if (get.status == 10){
    document.getElementById("innerStatus").innerHTML += " Done.";
    clearTimeout(timeout);
    return false;
  }
   
  timeout = setTimeout(getStatus, 1000);
}

getStatus();
</script>

</html>
37

正如其他人评论中提到的,最简单的解决办法是把你的导出功能放在另一个线程里运行,同时让客户端通过另一个请求来获取进度信息。处理这个任务的方法有很多种,具体选择哪种取决于你的需求,你可以选择简单一点的,也可以选择复杂一点的。

下面是一个非常简单的例子,展示如何使用线程来实现这个功能:

import random
import threading
import time

from flask import Flask


class ExportingThread(threading.Thread):
    def __init__(self):
        self.progress = 0
        super().__init__()

    def run(self):
        # Your exporting stuff goes here ...
        for _ in range(10):
            time.sleep(1)
            self.progress += 10


exporting_threads = {}
app = Flask(__name__)
app.debug = True


@app.route('/')
def index():
    global exporting_threads

    thread_id = random.randint(0, 10000)
    exporting_threads[thread_id] = ExportingThread()
    exporting_threads[thread_id].start()

    return 'task id: #%s' % thread_id


@app.route('/progress/<int:thread_id>')
def progress(thread_id):
    global exporting_threads

    return str(exporting_threads[thread_id].progress)


if __name__ == '__main__':
    app.run()

在首页路由(/)中,我们为每个导出任务启动一个线程,并返回一个任务ID,这样客户端就可以通过进度路由(/progress/[exporting_thread])来获取这个任务的进度。导出线程会在它认为合适的时候更新进度值。

在客户端,你会得到类似这样的东西(这个例子使用了jQuery):

function check_progress(task_id, progress_bar) {
    function worker() {
        $.get('progress/' + task_id, function(data) {
            if (progress < 100) {
                progress_bar.set_progress(progress)
                setTimeout(worker, 1000)
            }
        })
    }
}

如前所述,这个例子非常简单,你可能需要选择一个稍微复杂一点的方法。通常,我们会把某个线程的进度存储在数据库或者某种缓存中,这样就不需要依赖共享结构,从而避免我这个例子中大部分内存和并发问题。

Redis(https://redis.io)是一个内存数据库,通常非常适合这种任务。它与Python(https://pypi.python.org/pypi/redis)的集成也非常好。

撰写回答