用python加载MITBIH正常窦性心律数据库

2024-06-17 15:46:49 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试用python加载MIT-BIH正常窦性心律数据库(nsrdb)。我查看了本教程:

https://medium.com/@roszcz/machine-learning-for-medicine-qrs-detection-in-a-single-channel-ecg-signal-part-1-data-set-be36f70bbd38

但它是用来加载MIT-BIH心律失常数据库的。以下代码来自密数据库.pydatasets文件夹中的文件如下所示。如何调整get_records()函数以下载其他physoninet数据库以及nsrdb?在

物理网数据库链接: https://www.physionet.org/physiobank/database/nsrdb/

import os
import h5py
import wfdb as wf
import numpy as np
import pandas as pd
from glob import glob
from scipy import signal as ss
from utils import download as ud
from matplotlib import pyplot as plt

def get_records():
    """ Get paths for data in data/mit/ directory """
    # Download if doesn't exist
    if not os.path.isdir('data/mitdb'):
        print 'Downloading the mitdb ecg database, please wait'
        ud.download_mitdb()
        print 'Download finished'

    # There are 3 files for each record
    # *.atr is one of them
    paths = glob('data/mitdb/*.atr')

    # Get rid of the extension
    paths = [path[:-4] for path in paths]
    paths.sort()

    return paths

def good_types():
    """ Of annotations """
    # www.physionet.org/physiobank/annotations.shtml
    good = ['N', 'L', 'R', 'B', 'A',
            'a', 'J', 'S', 'V', 'r',
            'F', 'e', 'j', 'n', 'E',
            '/', 'f', 'Q', '?']

    return good

def beat_annotations(annotation):
    """ Get rid of non-beat markers """
    # Declare beat types
    good = good_types()
    ids = np.in1d(annotation.anntype, good)

    # We want to know only the positions
    beats = annotation.annsamp[ids]

    return beats

def convert_input(channel, annotation):
    """ Into output """
    # Remove non-beat annotations
    beats = beat_annotations(annotation)

    # Create dirac-comb signal
    dirac = np.zeros_like(channel)
    dirac[beats] = 1.0

    # Use hamming window as a bell-curve filter
    width = 36
    filter = ss.hamming(width)
    gauss = np.convolve(filter, dirac, mode = 'same')

    return dirac, gauss

def good_annotations():
    """ Const function with good annotations """
    # For now it seems those are most popular
    good_annotations = [1, 2, 3, 4,
                        5, 6, 7, 8,
                        9, 10, 11, 12,
                        13, 16, 31, 38]

    return good_annotations

def make_dataset(records, width, savepath):
    """ Inside an array """
    # Prepare containers
    signals, labels = [], []

    # Iterate files
    for path in records:
        print 'Processing file:', path
        record = wf.rdsamp(path)
        annotations = wf.rdann(path, 'atr')

        # Extract pure signals
        data = record.p_signals

        # Convert each channel into labeled fragments
        signal, label = convert_data(data, annotations, width)

        # Cumulate
        signals.append(signal)
        labels.append(label)

    # Convert to one huge numpy.array
    signals = np.vstack(signals)
    labels = np.vstack(labels)

    # Write to disk
    np.save(savepath, {'signals' : signals,
                       'labels'  : labels })

def convert_data(data, annotations, width):
    """ Into a batch """
    # Prepare containers
    signals, labels = [], []

    # Convert both channels
    for it in range(2):
        channel = data[:, it]
        dirac, gauss = convert_input(channel,
                                     annotations)
        # Merge labels
        label = np.vstack([dirac, gauss])

        # Prepare the moving window
        sta = 0
        end = width
        stride = width
        while end <= len(channel):
            # Chop out the fragments
            s_frag = channel[sta : end]
            l_frag = label[:, sta : end]

            # Cumulate
            signals.append(s_frag)
            labels.append(l_frag)

            # Go forth
            sta += stride
            end += stride

    # Turn into arrays
    signals = np.array(signals)
    labels = np.array(labels)

    return signals, labels

def create_datasets():
    """ Training, validation, test """
    # Prepare paths
    records = get_records()

    # Shuffle up determinitically
    np.random.seed(666)
    np.random.shuffle(records)

    # Define the data
    width = 200

    # Make training
    make_dataset(records[:30], width, 'data/training')

    # ... validation ...
    make_dataset(records[30 : 39], width, 'data/validation')

    # ... and test
    make_dataset(records[39 : 48], width, 'data/test')

Tags: pathimportdatalabelsdefasnpchannel
2条回答

函数ud.download_mitdb()的源代码here

摘录:(免责声明:我找不到git repo中附带的许可证。我想原著作者可以在这里引用以供参考。如果没有,请删除以下代码)

import os
import urllib2
import requests
from tqdm import tqdm
from bs4 import BeautifulSoup as BSoup

def download_mitdb():
    """ All """
    extensions = ['atr', 'dat', 'hea']
    the_path = 'https://www.physionet.org/physiobank/database/mitdb/'

    # Save to proper data/ directory
    savedir = 'data/mitdb'
    if not os.path.exists(savedir):
        os.makedirs(savedir)

    # With this format
    savename = savedir + '/{}.{}'

    # Find all interesting files on that site:
    soup = BSoup(urllib2.urlopen(the_path).read())

    # Find all links pointing to .dat files
    hrefs = []
    for a in soup.find_all('a', href=True):
        href = a['href']
        # Download datafiles with markers given
        if href[-4::] == '.dat':
            hrefs.append(href[:-4])

    # Path to the file on the internet
    down_path = the_path + '{}.{}'

    for data_id in hrefs:
        for ext in extensions:
            webpath = down_path.format(data_id, ext)
            datafile = urllib2.urlopen(webpath)

            # Save locally
            filepath = savename.format(data_id, ext)
            with open(filepath, 'wb') as out:
                out.write(datafile.read())

    print 'Downloaded {} data files'.format(len(hrefs))

def download_qt():
    """ All """
    extensions = ['atr', 'dat', 'hea',
                  'man', 'q1c', 'q2c',
                  'qt1', 'qt2', 'pu', 'pu0', 'pu1']
    the_path = 'https://www.physionet.org/physiobank/database/qtdb/'

    # Save to proper data/ directory
    savedir = 'data/qt'
    if not os.path.exists(savedir):
        os.makedirs(savedir)

    # With this format
    savename = savedir + '/{}.{}'

    # Find all interesting files on that site:
    soup = BSoup(urllib2.urlopen(the_path).read())

    # Find all links pointing to .dat files
    hrefs = []
    for a in soup.find_all('a', href=True):
        href = a['href']
        # Download datafiles with markers given
        if href[-4::] == '.dat':
            hrefs.append(href[:-4])

    # Path to the file on the internet
    down_path = the_path + '{}.{}'

    for data_id in hrefs:
        for ext in extensions:
            webpath = down_path.format(data_id, ext)
            try:
                datafile = urllib2.urlopen(webpath)

                # Save locally
                filepath = savename.format(data_id, ext)
                with open(filepath, 'wb') as out:
                    out.write(datafile.read())

            # Assuming that 404 (Not Found)
            # is the only one possible http error
            except urllib2.HTTPError:
                print 'Not available:', webpath

    print 'Downloaded {} data files'.format(len(hrefs))


if __name__ == '__main__':
    download_mitdb()

{cd2>就像你看到的一样,硬线是编码的。 您可以调整副本以从其他URL获取数据,也可以将其作为参数传入,默认为该URL

如果有人还在想: 有一个python包装器用于加载WFDB Software Project,用于从physoninet加载波形数据(从本地和远程加载)。在

下面是一个如何使用它的示例:https://github.com/MIT-LCP/wfdb-python/blob/master/demo.ipynb

要加载示例,可以在安装wfdb后运行以下行:

import wfdb
record = wfdb.rdrecord('16265', pb_dir='nsrdb/')
wfdb.plot_wfdb(record=record, title='Record 16265 from Physionet NSRDB') 
print(record.__dict__)

用于加载整个数据库使用

^{pr2}$

相关问题 更多 >