我正在使用Flask构建一个web应用程序来管理和控制彩色智能灯泡的场景。该应用程序使用数据库检索/存储场景信息,并使用RabbitMQ代理在后台运行场景。 我想我还应该提到,这是我的第一个web应用程序,所以这仍然是一项正在进行的工作
我创建了一个链接,当按下该链接时,它将启动一个芹菜任务,执行Python函数。该功能基本上是一个无限循环,它使用所选场景中随机选择的颜色更新每个智能灯泡
在大多数情况下,我已经解决了所有问题,除了两个主要问题:
现在的情况是,当我开始一个场景,然后立即开始另一个场景时,当芹菜工人同时执行任务时,灯光将在场景之间跳跃。我在启动worker时使用了--concurrency=1
选项,在某种程度上避开了这个问题。这允许一次只执行一个任务,并将其他任务添加到队列中
虽然这允许场景按预期运行,但我不希望后续场景在选中时排队。选择新场景后,我希望首先检查正在运行的场景,如果找到,则终止该场景,然后运行新场景
这就引出了我的第二个问题,希望能够通过linkpress在任何时候终止一个场景。我打算主要使用语音助手来使用HTTP请求启动/停止这些场景,因此我想找出一种实现方法,使之成为可能
PyScenes
├── Pipfile
├── Pipfile.lock
├── PyScenes
│ ├── __init__.py
│ ├── hue_scenes.db
│ ├── hue_script.py
│ ├── static
│ │ ├── css
│ │ │ └── main.css
│ │ └── updateOptions.js
│ ├── templates
│ │ ├── add.html
│ │ ├── base.html
│ │ ├── index.html
│ │ └── update.html
│ └── webapp
│ ├── __init__.py
│ ├── model.py
│ ├── routes.py
│ └── tasks.py
└── run.py
from flask import Flask
from PyScenes import base_path
app = Flask(__name__,
template_folder=f'{ base_path }/templates',
static_folder=f'{ base_path }/static'
)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{ base_path }/hue_scenes.db'
from PyScenes.webapp import routes
from PyScenes.webapp import tasks
from PyScenes.webapp import app, tasks
from PyScenes.webapp.model import db, HueScenes
from flask import render_template, request, redirect
@app.route('/')
def index():
scenes = HueScenes.query.order_by(HueScenes.date_created).all()
return render_template('index.html', scenes=scenes)
@app.route('/enable/<int:id>')
def enable(id):
HueScenes.query.get_or_404(id)
try:
tasks.enableScene.delay(id) // Celery task
return redirect('/')
except:
return "There was a problem enabling that scene"
from celery import Celery
from PyScenes.webapp import app
from PyScenes.hue_script import startScene
celery = Celery(app.import_name, broker='amqp://PyScenes:password123@192.168.1.2:5672/vhost')
@celery.task()
def enableScene(id):
startScene(id) // Python script to update lights
目前没有回答
相关问题 更多 >
编程相关推荐