如何将大的30GB bz2文件拆分为多个小的bz2文件,并为每个文件添加一个头

2024-09-19 23:40:46 发布

您现在位置:Python中文网/ 问答频道 /正文

我有大量的bz2格式的文件(30GB)没有任何头。我可以用下面的pileline轻松地将它们分成500M个大小

bzcat logging.abc_gps.bz2 | pv | split -b 500M -d -a 4 --filter='bzip > $FILE.csv.bz2' - splitted_file-

但是我无法添加要包含每个分割的bz2文件的头文件['a' 'b' 'c' 'd' 'e' 'f' 'timestamp']

更重要的是,我不希望基于500M分割文件,而是希望基于数据中timestamp的内容每天分割bz2文件(例如:splitted_file_2021-01-01.csv.bz2splitted_file_2021-01-02.csv.bz2

数据是以制表符分隔的文本,如下所示(无标题,需要添加):

19252547212 1   3041    2   1   74.18   1.8504  2021-05-01 00:00:00
19252547213 1   5055    2   1   0       0       2021-05-01 00:00:00
19252547214 1   5073    1   1   53.81   0.1836  2021-05-01 00:00:00

Tags: 文件csv数据logging格式timestampgpsfile
1条回答
网友
1楼 · 发布于 2024-09-19 23:40:46

您可以使用bz2包打开BZ2编码的文件,并将其视为常规文件对象。二进制读/写有一个较小的性能优势。假设您的数据是ASCII或UTF-8,并且数据中不需要转义制表符,您可以逐行读取文件,在出现新的时间戳时打开并写入输出

import bz2
import os

outfile = None
date = b""

with bz2.open("file") as fileobj:
    for line in filobj:
        # get date from, ex. "2021-05-01 00:00:00", timestamp
        new_date = line.split(b"\t")[7].split(b" ")[0]
        # roll to new file as needed, appending, so existing data not overwritten
        if new_date != date:
            date = new_date
            new_file = f"splitted_file_{new_date}.csv.bz2"
            exists = os.path.exists(new_file)
            outfile = bz2.open(new_file, "ab")
            if not exists:
                outfile.write(b"\t".join([b'a', b'b', b'c', b'd', b'e', b'f', b'timestamp']) + b"\n")
        # write the row
        outfile.writeline(line)
if outfile:
    outfile.close()

你可以通过管道来加速这一过程。为将在不同内核上并行运行的单独bzip2进程提供解密和加密。您可以在脚本本身中创建管道和文件,而不是shell管道。假设系统上存在bzip2,则可以执行以下操作。我添加了tqdm模块来打印进程

#!/usr/bin/env python3

import subprocess as subp
from pathlib import Path
import sys
import tqdm

# TODO: Better command line
try:
    in_file_name = Path(sys.argv[1])
except IndexError:
    print("usage: unbzcsv.py filename")
    exit(1)

# build the format string used for generating output file names
out_file_name_fmt = "{}-{{}}.{}".format(*in_file_name.name.split(".", maxsplit=1))
out_file = None
date = b""
bzwriter = None
bzfile = None

# run bzip2 to decompress to stdout
bzreader = subp.Popen(["bzip2", " decompress", " stdout", in_file_name], 
        stdin=subp.DEVNULL, stdout=subp.PIPE)

# use tqdm to display progress as line count
progress = tqdm.tqdm(bzreader.stdout, desc="Lines", unit=" lines", unit_scale=True)

# read lines and fan out to files
try:
    for line in progress:
        # get date from, ex. "2021-05-01 00:00:00", timestamp
        new_date = line.split(b"\t")[7].split(b" ")[0]
        # roll to new file as needed, appending, so existing data not overwritten
        if new_date != date:
            date = new_date
            out_file_name = out_file_name_fmt.format(date.decode("utf-8"))
            if bzwriter is not None:
                bzwriter.stdin.close()
                bzwriter.wait()
                bzwriter = None
                bzfile.close()
            print("\nwriting", out_file_name)
            progress.refresh()
            bzfile = open(out_file_name, "wb")
            bzwriter = subp.Popen(["bzip2", " compress"],
                    stdin=subp.PIPE, stdout=bzfile)
        # write the row
        bzwriter.stdin.write(line)
finally:
    bzreader.terminate() # in case of error
    if bzwriter:
        bzwriter.stdin.close()
        bzwriter.wait()
        bzfile.close()
    bzreader.wait()

相关问题 更多 >