Flas的多线程处理

2024-06-02 12:17:18 发布

您现在位置:Python中文网/ 问答频道 /正文

我想从一个视图调用generate_async_audio_service,让它使用线程池异步生成单词列表的音频文件,然后将它们提交到数据库中。在

尽管每次都创建一个新的polly和s3实例,但我总是在应用程序上下文之外遇到一个错误。在

如何一次生成/上载多个音频文件?在

from flask import current_app, 
from multiprocessing.pool import ThreadPool
from Server.database import db
import boto3
import io
import uuid


def upload_audio_file_to_s3(file):
   app = current_app._get_current_object()
   with app.app_context():
      s3 = boto3.client(service_name='s3',
               aws_access_key_id=app.config.get('BOTO3_ACCESS_KEY'),
               aws_secret_access_key=app.config.get('BOTO3_SECRET_KEY'))
      extension = file.filename.rsplit('.', 1)[1].lower()
      file.filename = f"{uuid.uuid4().hex}.{extension}"
      s3.upload_fileobj(file,
         app.config.get('S3_BUCKET'),
         f"{app.config.get('UPLOADED_AUDIO_FOLDER')}/{file.filename}",
         ExtraArgs={"ACL": 'public-read', "ContentType": file.content_type})
      return file.filename

def generate_polly(voice_id, text):
   app = current_app._get_current_object()
   with app.app_context():
      polly_client = boto3.Session(
         aws_access_key_id=app.config.get('BOTO3_ACCESS_KEY'),                   
         aws_secret_access_key=app.config.get('BOTO3_SECRET_KEY'),
         region_name=app.config.get('AWS_REGION')).client('polly')
      response = polly_client.synthesize_speech(VoiceId=voice_id,
                     OutputFormat='mp3', Text=text)
      return response['AudioStream'].read()


def generate_polly_from_term(vocab_term, gender='m'):
   app = current_app._get_current_object()
   with app.app_context():
      audio = generate_polly('Celine', vocab_term.term)
      file = io.BytesIO(audio)
      file.filename = 'temp.mp3'
      file.content_type = 'mp3'
      return vocab_term.id, upload_audio_file_to_s3(file)

def generate_async_audio_service(terms):
   pool = ThreadPool(processes=12)
   results = pool.map(generate_polly_from_term, terms)
   # do something w/ results

Tags: fromimportidconfigappgets3def
1条回答
网友
1楼 · 发布于 2024-06-02 12:17:18

这不一定是一个充实的答案,但我将把它放在这里,而不是把事情放在评论中。在

Celery是python的任务管理器。你想要使用这个的原因是如果你有任务ping Flask,但是他们需要更长的时间来完成任务的间隔,那么某些任务将被阻塞,你不会得到所有的结果。要修复此问题,请将其交给另一个进程。情况是这样的:

1) Client sends a request to Flask to process audio files

2) The files land in Flask to be processed, Flask will send an asyncronous task to Celery.

3) Celery is notified of the task and stores its state in some sort of messaging system (RabbitMQ and Redis are the canonical examples)

4) Flask is now unburdened from that task and can receive more

5) Celery finishes the task, including the upload to your database

芹菜和烧瓶是两个相互通信的独立python进程。这应该满足您的多线程方法。如果希望客户机验证任务是否已完成,也可以通过Flask从任务中检索状态。烧瓶中的路径看起来像:

^{pr2}$

其中celery_app来自另一个模块worker.py

import os
from celery import Celery

env = os.environ

# This is for a rabbitMQ backend
CELERY_BROKER_URL = env.get('CELERY_BROKER_URL', 'amqp://0.0.0.0:5672/0')
CELERY_RESULT_BACKEND = env.get('CELERY_RESULT_BACKEND', 'rpc://')

celery_app = Celery('tasks', broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND)

然后,您的celery进程将配置一个worker,如下所示:

from celery import Celery
from celery.signals import after_task_publish

env = os.environ
CELERY_BROKER_URL = env.get('CELERY_BROKER_URL')
CELERY_RESULT_BACKEND = env.get('CELERY_RESULT_BACKEND', 'rpc://')

# Set celery_app with name 'tasks' using the above broker and backend
celery_app = Celery('tasks', broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND)

@celery_app.task(name='celery_worker.files')
def async_files(path):
    # Get file from path
    # Process
    # Upload to database
    # This is just if you want to return an actual result, you can fill this in with whatever
    return {'task_state': "FINISHED"}

这是相对基本的,但可以作为一个起点。我会说,Celery的一些行为和设置并不总是最直观的,但这将使您的flask应用程序对任何想要向其发送文件的人可用,而不会阻止任何其他东西。在

希望这能有所帮助

相关问题 更多 >