使用Flas从API调用端点的正确方法

2024-05-20 10:26:21 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试构建一个Flask API,我有一个端点应该创建一个用户,另一个端点应该检查数据库中是否存在用户:

@API.route('/users/', methods=['POST'])
def new_user():
    user_json = json.loads(request.get_json())
    first_name = user_json.get('first_name')
    last_name = user_json.get('last_name')
    email = user_json.get('email')
    password = user_json.get('password')
    # Call the other endpoint here
    if response == 400:
        try:
           user = User(first_name=first_name, last_name=last_name, email=email, password=password)
           db.session.add(user)
           db.session.commit()
           return jsonify(user=user.to_json()), 200
       except:
           return jsonify(error=500), 500
    else:
       return jsonify(user=user.to_json()), 409



@API.route('/users/<string:email>', methods=['GET'])
def is_present(email):
    user = User.query.filter_by(email=email).first()
    if user:
        print(user)
        return jsonify(user=user.to_json()), 200
    else:
        return jsonify(error=404), 404

问题是我不知道在new_user端点中调用我的is_present的最佳方法是什么。我应该用requests.get吗?或者在烧瓶里有其他的方法吗?


Tags: to用户nameapijsongetreturnemail
2条回答

对于您的用例,我会这样做:

@API.route('/users', defaults={'email': None} ,methods=['GET', 'POST'])
@API.route('/users/<string:email>', methods=['GET', 'POST'])
def new_user(email):
    if(email):
        user = User.query.filter_by(email=email).first()
        if user:
            return jsonify(user=user.to_json()), 200
        else:
            return jsonify(error=404), 404
    else:
        user_json = json.loads(request.get_json())
        first_name = user_json.get('first_name')
        last_name = user_json.get('last_name')
        email = user_json.get('email')
        password = user_json.get('password')
        user = User(first_name=first_name, last_name=last_name, email=email, password=password)
        db.session.add(user)
        db.session.commit()
        return jsonify(user=user.to_json()), 200

处理这个问题的一种常见方法是将视图的逻辑分解成一个单独的内部函数(即不通过API公开),然后调用内部函数。然后视图将处理该请求,并根据需要调用内部函数。尽管在这种情况下这很简单(只是一个数据库查找),但可以这样做:

def get_user(email):
    return User.query.filter_by(email=email).first()

@API.route('/users/', methods=['POST'])
def new_user():
    user_json = json.loads(request.get_json())
    first_name = user_json.get('first_name')
    last_name = user_json.get('last_name')
    email = user_json.get('email')
    password = user_json.get('password')
    user = get_user(email)
    if not user:
        try:
           user = User(first_name=first_name, last_name=last_name, email=email, password=password)
           db.session.add(user)
           db.session.commit()
           return jsonify(user=user.to_json()), 200
       except:
           return jsonify(error=500), 500
    else:
       return jsonify(user=user.to_json()), 409

@API.route('/users/<string:email>', methods=['GET'])
def is_present(email):
    user = get_user(email)
    if user:
        print(user)
        return jsonify(user=user.to_json()), 200
    else:
        return jsonify(error=404), 404

使用这种方法的一个明显优点是完全避免了HTTP请求,从而产生了一个更高效、更不易出错(例如网络问题)的解决方案。


如上所述,作为从多个视图访问公共功能的一般处理方式,在创建新用户之前执行查找不能总是保证用户创建不会由于重复记录而失败。有一个竞争条件;在第一个请求检查之后,但在尝试创建操作之前,另一个请求可以创建用户。

假设您的用户表有一个主键,例如在(first_name, last_name, email)上,您可以简单地尝试创建一个新用户并处理由于重复而引发的任何异常。或者你可以看看^{}

相关问题 更多 >