如何使用pytest测试Flask应用程序?

2024-04-25 00:51:22 发布

您现在位置:Python中文网/ 问答频道 /正文

嘿,伙计们,我是单元测试新手,老实说,我真的不知道我做错了什么,只是在测试应用程序中的一个简单连接时出错,即使遵循测试烧瓶应用程序文档https://flask.palletsprojects.com/en/1.1.x/testing/

我正在建立一个幻想交易股票,我需要对交易进行单元测试。但是,如果用户导航到所述路线或发出请求(post、get等),我将按照测试烧瓶应用程序文档连接pytest并开始测试状态_code==200 下面是我的结构服务器目录 我知道如何开始测试路线或让pytest运行的任何资源或想法。每次运行test_transactions.py时,我都会留下错误

注意:我的服务器目录是包结构的

错误:

========================================================== FAILURES ==========================================================
______________________________________________________ test_transaction ______________________________________________________

client = <FlaskClient <Flask 'server_files'>>

    def test_transaction(client):
>       landing = client.get('http://localhost:5000/multiple_stocks')

test_transactions.py:13:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
env\lib\site-packages\werkzeug\test.py:1006: in get
    return self.open(*args, **kw)
env\lib\site-packages\flask\testing.py:222: in open
    return Client.open(
env\lib\site-packages\werkzeug\test.py:970: in open
    response = self.run_wsgi_app(environ.copy(), buffered=buffered)
env\lib\site-packages\werkzeug\test.py:861: in run_wsgi_app
    rv = run_wsgi_app(self.application, environ, buffered=buffered)
env\lib\site-packages\werkzeug\test.py:1096: in run_wsgi_app
    app_rv = app(environ, start_response)
env\lib\site-packages\flask\app.py:2464: in __call__
    return self.wsgi_app(environ, start_response)
env\lib\site-packages\flask\app.py:2450: in wsgi_app
    response = self.handle_exception(e)
env\lib\site-packages\flask\app.py:1867: in handle_exception
    reraise(exc_type, exc_value, tb)
env\lib\site-packages\flask\_compat.py:39: in reraise
    raise value
env\lib\site-packages\flask\app.py:2447: in wsgi_app
    response = self.full_dispatch_request()
env\lib\site-packages\flask\app.py:1952: in full_dispatch_request
    rv = self.handle_user_exception(e)
env\lib\site-packages\flask\app.py:1821: in handle_user_exception
    reraise(exc_type, exc_value, tb)
env\lib\site-packages\flask\_compat.py:39: in reraise
    raise value
env\lib\site-packages\flask\app.py:1950: in full_dispatch_request
    rv = self.dispatch_request()
env\lib\site-packages\flask\app.py:1936: in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
server_files\routes.py:31: in multiple
    resp = req.json()
env\lib\site-packages\requests\models.py:900: in json
    return complexjson.loads(self.text, **kwargs)
C:\Python39\lib\json\__init__.py:346: in loads
    return _default_decoder.decode(s)
C:\Python39\lib\json\decoder.py:337: in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <json.decoder.JSONDecoder object at 0x00000238C4AF11C0>, s = 'The API key provided is not valid.', idx = 0

    def raw_decode(self, s, idx=0):
        """Decode a JSON document from ``s`` (a ``str`` beginning with
        a JSON document) and return a 2-tuple of the Python
        representation and the index in ``s`` where the document ended.

        This can be used to decode a JSON document from a string that may
        have extraneous data at the end.

        """
        try:
            obj, end = self.scan_once(s, idx)
        except StopIteration as err:
>           raise JSONDecodeError("Expecting value", s, err.value) from None
E           json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

C:\Python39\lib\json\decoder.py:355: JSONDecodeError
================================================== short test summary info ===================================================
FAILED test_transactions.py::test_transaction - json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
===================================================== 1 failed in 0.42s ======================================================

server
_______ app.py
_______ server_files
                    _______ __init__.py
                    _______ models.py
                    _______routes.py
_______ test_transactions.py

app.py

import os
import pytest
from server_files import app



if __name__ == '__main__':

    app.run(host='127.0.0.1', port=int(os.environ.get("PORT", 5000)))

test_transactions.py

try:
    import pytest
    import requests
    from server_files import client
    # from server_files.routes import configure_app
    # from server_files.routes import add_stoc
    print("unittest line 5 {}".format(client))
except Exception as e:
    print("Some Modules are missings {}".format(e))


def test_transaction(client):
    landing = client.get('http://localhost:5000/multiple_stocks')
    response = landing.data

    assert response == 200


if __name__ == '__main__':
    pytest.main()

init.py

import pytest
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_bcrypt import Bcrypt
from flask_login import LoginManager

app = Flask(__name__)


app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://postgres:1209lmc@localhost/fantasy_stock_app'
bcrypt = Bcrypt(app)
login_manager = LoginManager(app)
db = SQLAlchemy(app)
from server_files import routes


@pytest.fixture
def client():
    # app.testing = True
    app.config['TESTING'] = True
    client = app.test_client()
    return client

routes.py

import os
import json
import base64
import requests
from datetime import datetime
from flask import jsonify, request, render_template, redirect, url_for
from werkzeug.security import generate_password_hash, check_password_hash
from server_files import app, db, bcrypt
from server_files.models import Users, Transactions, Stock

api_key = os.environ.get('API_KEY')

base_url = "https://cloud.iexapis.com"


@app.route("/multiple_stocks", methods=["GET"])
def multiple():
    tesla = "tsla"
    apple = "aapl"
    fb = "fb"
    qcom = "qcom"
    microsft = "msft"
    sony = 'sne'
    american_airline = "aal"

    search_url = "{}/stable/stock/market/batch?symbols={},{},{},{},{},{},{}&types=quote&token={}".format(
        base_url, tesla, apple, fb, qcom, microsft, sony, american_airline, api_key)

    req = requests.get(search_url)

    resp = req.json()

    stocks_data = [
        {
            "company_name": resp['AAPL']["quote"]["companyName"],
            "symbol": resp['AAPL']["quote"]["symbol"],
            "latestPrice": resp['AAPL']["quote"]["latestPrice"],
            "change": resp['AAPL']["quote"]["change"],
        },
        {
            "company_name": resp['TSLA']["quote"]["companyName"],
            "symbol": resp['TSLA']["quote"]["symbol"],
            "latestPrice": resp['TSLA']["quote"]["latestPrice"],
            "change": resp['TSLA']["quote"]["change"]
        },

        {
            "company_name": resp['FB']["quote"]["companyName"],
            "symbol": resp['FB']["quote"]["symbol"],
            "latestPrice": resp['FB']["quote"]["latestPrice"],
            "change": resp['FB']["quote"]["change"]
        },

        {
            "company_name": resp['AAL']["quote"]["companyName"],
            "symbol": resp['AAL']["quote"]["symbol"],
            "latestPrice": resp['AAL']["quote"]["latestPrice"],
            "change": resp['AAL']["quote"]["change"]
        },

        {
            "company_name": resp['MSFT']["quote"]["companyName"],
            "symbol": resp['MSFT']["quote"]["symbol"],
            "latestPrice": resp['MSFT']["quote"]["latestPrice"],
            "change": resp['MSFT']["quote"]["change"]
        },

        {
            "company_name": resp['QCOM']["quote"]["companyName"],
            "symbol": resp['QCOM']["quote"]["symbol"],
            "latestPrice": resp['QCOM']["quote"]["latestPrice"],
            "change": resp['QCOM']["quote"]["change"]
        },

        {
            "company_name": resp['SNE']["quote"]["companyName"],
            "symbol": resp['SNE']["quote"]["symbol"],
            "latestPrice": resp['SNE']["quote"]["latestPrice"],
            "change": resp['SNE']["quote"]["change"]
        },
    ]

    return jsonify({"data": stocks_data})


@app.route("/search_stock/<string:stock>", methods={'GET'})
def search_stock(stock):
    search_url = "{}/stable/stock/{}/quote?token={}".format(
        base_url, stock, api_key)
    req = requests.get(search_url)
    resp = req.json()
    stock_data = {
        "company_name": resp["companyName"],
        "cost": resp["latestPrice"],
        "change": resp["change"],
        "symbol": resp["symbol"]
    }
    return jsonify({"data": stock_data})


@app.route('/add_stock', methods=['POST'])
def add_stock():
    user_detail = request.get_json()
    print(user_detail)

    user = Users.query.filter_by(id=user_detail['id']).first()
    if user:
        filter_by_stock_symbol = Stock.query.filter_by(
            stock_symbol=user_detail['stockSymbol']).first()
        if filter_by_stock_symbol != None:
            print("line 115 {}".format(filter_by_stock_symbol))

            update_user_cost = (filter_by_stock_symbol.user_estimated_cost +
                                user_detail['estimatedCost'])
            update_user_shares = (filter_by_stock_symbol.user_estimated_shares +
                                  user_detail['estimatedShares'])
            update_user_holdings = user.user_holdings - update_user_cost
            filter_by_stock_symbol.user_estimated_cost = update_user_cost
            filter_by_stock_symbol.user_estimated_shares = update_user_shares
            user.user_holdings = update_user_holdings
            print("line 125 {}".format(user.user_holdings))
            transaction = Transactions(company_name=user_detail['company_name'], user_estimated_cost=user_detail['estimatedCost'],
                                       user_holdings=user_detail['estimatedCost'], user_id=user_detail['id'])
            db.session.add(transaction)
            db.session.commit()
            return "Success! Stock added to db", 200
        else:
            print("line 131 {}".format(filter_by_stock_symbol))
            user_holdings = user.user_holdings - user_detail['estimatedCost']
            user.user_holdings = user_holdings
            """
            Transactions model column user_holdings
            has to be the amount of $ the user currently has after
            each transaction is complete.
            Leaving as is, working on the client side first
            """
            user_stock = Stock(company_name=user_detail['company_name'],
                               stock_symbol=user_detail['stockSymbol'], stock_cost=user_detail['stockCost'],
                               user_estimated_shares=user_detail['estimatedShares'], user_estimated_cost=user_detail['estimatedCost'], user_id=user_detail['id'])
            transaction = Transactions(company_name=user_detail['company_name'], user_estimated_cost=user_detail[
                                       'estimatedCost'], user_holdings=user_holdings, user_id=user_detail['id'])
            db.session.add(user_stock)
            db.session.add(transaction)
            db.session.commit()
            return jsonify("Success! Stock in db updated!", 200)
    else:
        return jsonify('Something went wrong on our end! Please try again later.', 500)


@app.route('/sell_stock', methods=["POST"])
def sell_stock():
    user_detail = request.get_json()

    user = Users.query.filter_by(id=user_detail['id']).first()

    filter_by_stock = Stock.query.filter_by(
        stock_symbol=user_detail['stockSymbol']).first()

    search_stock = "{}/stable/stock/{}/quote?token={}".format(
        base_url, user_detail['stockSymbol'], api_key)

    req = requests.get(search_stock)

    resp = req.json()

    user_estimated_shares = filter_by_stock.user_estimated_shares

    actual_stock_cost = resp['latestPrice']

    user_selling_ammout = user_detail['userSellingAmount']

    stock_bought_at = filter_by_stock.stock_cost

    difference_in_cost = (actual_stock_cost -
                          stock_bought_at) * user_estimated_shares

    difference_in_shares = (filter_by_stock.user_estimated_cost -
                            user_selling_ammout) / filter_by_stock.stock_cost
    user_holdings = user.user_holdings + \
        difference_in_cost + user_detail['userSellingAmount']
    print("line 185 {}".format(user_holdings))
    print("line 186 user.user_holdings{} user detail selling amount {}".format(
        user.user_holdings, user_detail['userSellingAmount']))

    if user:
        filter_by_stock.user_estimated_cost = filter_by_stock.user_estimated_cost - \
            user_selling_ammout
        filter_by_stock.user_estimated_shares = difference_in_shares
        user.user_holdings = user_holdings
        if filter_by_stock.user_estimated_cost == 0:
            transaction = Transactions(company_name=user_detail['companyName'], user_estimated_cost=user_detail[
                'userSellingAmount'], user_holdings=user_holdings, user_id=user_detail['id'])
            stock = Stock.query.filter_by(
                stock_symbol=user_detail['stockSymbol']).delete()
            db.session.add(transaction)
            db.session.commit()
            print("deleted stock {}".format(user_detail['stockSymbol']))
        else:
            print("updated stock {}".format(user_detail['stockSymbol']))
            transaction = Transactions(company_name=user_detail['companyName'], user_estimated_cost=user_detail[
                'userSellingAmount'], user_holdings=user_holdings, user_id=user_detail['id'])
            db.session.add(transaction)
            db.session.commit()

        return "ok", 200
    else:
        return 'nope', 500


@app.route('/user_stock', methods=['POST'])
def user_stock():
    user_detail = request.get_json()
    user = Users.query.filter_by(id=user_detail['id']).first()
    stock = Stock.query.filter_by(user_id=user_detail['id']).all()
    stock_list = []

    if user:
        for data in stock:
            search_url = "{}/stable/stock/{}/quote?token={}".format(
                base_url, data.stock_symbol, api_key)
            req = requests.get(search_url)
            resp = req.json()
            difference_in_cost = (
                resp['latestPrice'] - data.stock_cost) * data.user_estimated_shares
            print(resp['symbol'])
            print(resp['latestPrice'])
            print('difference in cost {}'.format(difference_in_cost))

            stock_obj = {
                "companyName": data.company_name,
                "symbol": data.stock_symbol,
                "cost": data.stock_cost,
                "userEstimatedShares": data.user_estimated_shares,
                "userEstimatedHolding": data.user_estimated_cost,
                "differenceInCost": difference_in_cost
            }

            stock_list.append(stock_obj)

        if stock_list != '':
            return jsonify({"stock": stock_list})
        else:
            return jsonify("An issue has occurred on our end! Please try again later", 500)

    else:
        return jsonify('User not found in our record! You will be redirected to the home page.', 500)


@app.route('/signup', methods=["POST"])
def signup():
    user_details = request.get_json()
    filter_user_model_by_username = Users.query.filter_by(
        username=user_details['username']).first()
    hashed_password = bcrypt.generate_password_hash(
        user_details['password']).decode('utf-8')

    if filter_user_model_by_username is None:
        user = Users(first_name=user_details['first_name'], last_name=user_details['last_name'],
                     email=user_details['email'], username=user_details['username'], password=hashed_password, user_holdings=user_details['userHoldings'])
        db.session.add(user)
        db.session.commit()
        return jsonify("Success! You will be redirect to your account shortly!", user.id, 200)
    else:
        return jsonify("The username has already been used! Please choose another username!", 500)


@app.route('/login', methods=["POST"])
def login():
    user_details = request.get_json()

    user = Users.query.filter_by(username=user_details["username"]).first()
    print(user.username)

    if user and bcrypt.check_password_hash(user.password, user_details['password']):
        response = {
            "user_id": user.id,
            "success_msg": "You are logged in successfully! You will be redirect to your account shortly!",
            "username": user.username
        }
        return jsonify(response, 200)
    else:
        return jsonify("hmmm.. We don't recognize that username or password. Please try again!", 500)


@app.route('/user', methods=["POST"])
def user():
    user_detail = request.get_json()

    user = Users.query.filter_by(id=user_detail['id']).first()

    user_obj = {
        "username": user.username,
        "user_holdings": user.user_holdings
    }

    if user:
        return jsonify(user_obj)
    else:
        return "User not found!", 500

Tags: nameinpyappbyreturnstockfilter