2024-06-02 22:43:07 发布
网友
有时我会在向MySQL数据库写入blob时遇到问题。为了在数据库中存储blob,应该将它加载到内存中,有时我的进程被OOM killer杀死。所以我认为流式传输可以解决我的问题,但是还没有发现任何关于使用SQLAlchemy将BLOB数据流式传输到MySQL的信息。在
我接受雷蒙德的评论,但我仍然需要解决方案。我找到了。如果您想将数据流传输到BLOB,可以使用以下奇怪的方法:
import sqlalchemy as sa from sqlalchemy import func from sqlalchemy.dialects.mysql import LONGBLOB from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import Session Base = declarative_base() def read_data_from_disk(compressed_report_file: str, stream_chunk: int): with open(compressed_report_file, 'rb') as compressed_report: compressed_report_data = compressed_report.read(stream_chunk) while compressed_report_data: yield compressed_report_data del compressed_report_data compressed_report_data = compressed_report.read(stream_chunk) class TestData(Base): __tablename__ = 'test_data' id = sa.Column(sa.BigInteger, nullable=False, autoincrement=True, primary_key=True) data = sa.Column(LONGBLOB, nullable=False) @classmethod def save_data(cls, session: Session, file_name: str, stream_chunk: int = 1024 ** 2): saved_data = cls(data=b'') session.add(saved_data) try: session.flush() saved_data_id = saved_data.id for file_data in read_data_from_disk(file_name, stream_chunk): session.query(cls).filter(cls.id == saved_data_id).update({ cls.data: func.concat(cls.data, file_data) }, synchronize_session=False) except Exception: session.rollback() raise return saved_data_id db = sa.create_engine('mysql://login:password@host:3306/database?charset=utf8') session = Session(bind=db) TestData.save_data(session=session, file_name='FatFile') session.commit()
我放入数据库文件:
数据库结果:
mysql> select id, OCTET_LENGTH(data) from test_data; + + + | id | OCTET_LENGTH(data) | + + + | 1 | 35229330 | + + + 1 row in set (0.00 sec)
我接受雷蒙德的评论,但我仍然需要解决方案。我找到了。如果您想将数据流传输到BLOB,可以使用以下奇怪的方法:
我放入数据库文件:
^{pr2}$数据库结果:
相关问题 更多 >
编程相关推荐