diff --git a/wfdb/io/_coreio.py b/wfdb/io/_coreio.py index 9b3a7876..033424ed 100644 --- a/wfdb/io/_coreio.py +++ b/wfdb/io/_coreio.py @@ -1,14 +1,16 @@ import posixpath +import os from wfdb.io import _url from wfdb.io.download import config def _open_file( - pn_dir, file_name, mode="r", *, + dir_name="", + pn_dir=None, buffering=-1, encoding=None, errors=None, @@ -24,15 +26,18 @@ def _open_file( Parameters ---------- - pn_dir : str or None - The PhysioNet database directory where the file is stored, or None - if file_name is a local path. file_name : str The name of the file, either as a local filesystem path (if `pn_dir` is None) or a URL path (if `pn_dir` is a string.) mode : str, optional The standard I/O mode for the file ("r" by default). If `pn_dir` is not None, this must be "r", "rt", or "rb". + dir_name : str or None + If passed, and pn_dir is None, the directory will be prepended + to the file_name. + pn_dir : str or None + The PhysioNet database directory where the file is stored, or None + if file_name is a local path. buffering : int, optional Buffering policy. encoding : str, optional @@ -48,7 +53,7 @@ def _open_file( """ if pn_dir is None: return open( - file_name, + os.path.join(dir_name, file_name), mode, buffering=buffering, encoding=encoding, diff --git a/wfdb/io/_signal.py b/wfdb/io/_signal.py index 68ca57e4..a1e8246e 100644 --- a/wfdb/io/_signal.py +++ b/wfdb/io/_signal.py @@ -4,7 +4,9 @@ import numpy as np -from wfdb.io import download, _coreio, util +from wfdb.io import download +from wfdb.io import util +from wfdb.io import _coreio MAX_I32 = 2147483647 @@ -1195,23 +1197,24 @@ def _rd_segment( signals = [None] * len(channels) for fn in w_file_name: - # Get the list of all signals contained in the dat file - datsignals = _rd_dat_signals( - file_name=fn, - dir_name=dir_name, - pn_dir=pn_dir, - fmt=w_fmt[fn], - n_sig=len(datchannel[fn]), - sig_len=sig_len, - byte_offset=w_byte_offset[fn], - samps_per_frame=w_samps_per_frame[fn], - skew=w_skew[fn], - init_value=w_init_value[fn], - sampfrom=sampfrom, - sampto=sampto, - no_file=no_file, - sig_data=sig_data, - ) + with _coreio._open_file( + fn, "rb", pn_dir=pn_dir, dir_name=dir_name + ) as io: + # Get the list of all signals contained in the dat file + datsignals = _rd_dat_signals( + io, + fmt=w_fmt[fn], + n_sig=len(datchannel[fn]), + sig_len=sig_len, + byte_offset=w_byte_offset[fn], + samps_per_frame=w_samps_per_frame[fn], + skew=w_skew[fn], + init_value=w_init_value[fn], + sampfrom=sampfrom, + sampto=sampto, + no_file=no_file, + sig_data=sig_data, + ) # Copy over the wanted signals for cn in range(len(out_dat_channel[fn])): @@ -1221,9 +1224,7 @@ def _rd_segment( def _rd_dat_signals( - file_name, - dir_name, - pn_dir, + io, fmt, n_sig, sig_len, @@ -1241,14 +1242,8 @@ def _rd_dat_signals( Parameters ---------- - file_name : str - The name of the dat file. - dir_name : str - The full directory where the dat file(s) are located, if the dat - file(s) are local. - pn_dir : str - The PhysioNet directory where the dat file(s) are located, if - the dat file(s) are remote. + io : io + The io to read the dat file from. fmt : str The format of the dat file. n_sig : int @@ -1324,10 +1319,8 @@ def _rd_dat_signals( if no_file: data_to_read = sig_data elif fmt in COMPRESSED_FMTS: - data_to_read = _rd_compressed_file( - file_name=file_name, - dir_name=dir_name, - pn_dir=pn_dir, + data_to_read = _rd_compressed_stream( + io, fmt=fmt, sample_offset=byte_offset, n_sig=n_sig, @@ -1336,9 +1329,7 @@ def _rd_dat_signals( end_frame=sampto, ) else: - data_to_read = _rd_dat_file( - file_name, dir_name, pn_dir, fmt, start_byte, n_read_samples - ) + data_to_read = _rd_dat_stream(io, fmt, start_byte, n_read_samples) if extra_flat_samples: if fmt in UNALIGNED_FMTS: @@ -1577,7 +1568,7 @@ def _required_byte_num(mode, fmt, n_samp): return int(n_bytes) -def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp): +def _rd_dat_stream(io, fmt, start_byte, n_samp): """ Read data from a dat file, either local or remote, into a 1d numpy array. @@ -1588,14 +1579,8 @@ def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp): Parameters ---------- - file_name : str - The name of the dat file. - dir_name : str - The full directory where the dat file(s) are located, if the dat - file(s) are local. - pn_dir : str - The PhysioNet directory where the dat file(s) are located, if - the dat file(s) are remote. + io : io + The io to read the dat file from. fmt : str The format of the dat file. start_byte : int @@ -1635,21 +1620,12 @@ def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp): element_count = n_samp byte_count = n_samp * BYTES_PER_SAMPLE[fmt] - # Local dat file - if pn_dir is None: - with open(os.path.join(dir_name, file_name), "rb") as fp: - fp.seek(start_byte) - sig_data = np.fromfile( - fp, dtype=np.dtype(DATA_LOAD_TYPES[fmt]), count=element_count - ) - # Stream dat file from Physionet - else: - dtype_in = np.dtype(DATA_LOAD_TYPES[fmt]) - sig_data = download._stream_dat( - file_name, pn_dir, byte_count, start_byte, dtype_in - ) - - return sig_data + io.seek(start_byte) + return np.frombuffer( + io.read(byte_count), + dtype=np.dtype(DATA_LOAD_TYPES[fmt]), + count=element_count, + ) def _blocks_to_samples(sig_data, n_samp, fmt): @@ -1770,10 +1746,8 @@ def _blocks_to_samples(sig_data, n_samp, fmt): return sig -def _rd_compressed_file( - file_name, - dir_name, - pn_dir, +def _rd_compressed_stream( + io, fmt, sample_offset, n_sig, @@ -1783,16 +1757,10 @@ def _rd_compressed_file( ): """ Read data from a compressed file into a 1D numpy array. - Parameters ---------- - file_name : str - The name of the signal file. - dir_name : str - The full directory where the signal file is located, if local. - This argument is ignored if `pn_dir` is not None. - pn_dir : str or None - The PhysioNet database directory where the signal file is located. + io : io + The io to read the dat file from. fmt : str The format code of the signal file. sample_offset : int @@ -1806,7 +1774,6 @@ def _rd_compressed_file( The starting frame number to read. end_frame : int The ending frame number to read. - Returns ------- signal : ndarray @@ -1814,13 +1781,11 @@ def _rd_compressed_file( array in the same order the samples would be stored in a binary signal file; `signal[(i*n_sig+j)*samps_per_frame[0]+k]` is sample number `i*samps_per_frame[0]+k` of signal `j`. - Notes ----- Converting the output array into "dat file order" here is inefficient, but necessary to match the behavior of _rd_dat_file. It would be better to reorganize _rd_dat_signals to make the reshaping unnecessary. - """ import soundfile @@ -1830,78 +1795,74 @@ def _rd_compressed_file( "sampling rate and samples per frame" ) - if pn_dir is None: - file_name = os.path.join(dir_name, file_name) - - with _coreio._open_file(pn_dir, file_name, "rb") as fp: - signature = fp.read(4) - if signature != b"fLaC": - raise ValueError(f"{fp.name} is not a FLAC file") - fp.seek(0) - - with soundfile.SoundFile(fp) as sf: - # Determine the actual resolution of the FLAC stream and the - # data type will use when reading it. Note that soundfile - # doesn't support int8. - if sf.subtype == "PCM_S8": - format_bits = 8 - read_dtype = "int16" - elif sf.subtype == "PCM_16": - format_bits = 16 - read_dtype = "int16" - elif sf.subtype == "PCM_24": - format_bits = 24 - read_dtype = "int32" - else: - raise ValueError(f"unknown subtype in {fp.name} ({sf.subtype})") + signature = io.read(4) + if signature != b"fLaC": + raise ValueError(f"{io.name} is not a FLAC file") + io.seek(0) + + with soundfile.SoundFile(io) as sf: + # Determine the actual resolution of the FLAC stream and the + # data type will use when reading it. Note that soundfile + # doesn't support int8. + if sf.subtype == "PCM_S8": + format_bits = 8 + read_dtype = "int16" + elif sf.subtype == "PCM_16": + format_bits = 16 + read_dtype = "int16" + elif sf.subtype == "PCM_24": + format_bits = 24 + read_dtype = "int32" + else: + raise ValueError(f"unknown subtype in {io.name} ({sf.subtype})") - max_bits = int(fmt) - 500 - if format_bits > max_bits: - raise ValueError( - f"wrong resolution in {fp.name} " - f"({format_bits}, expected <= {max_bits})" - ) + max_bits = int(fmt) - 500 + if format_bits > max_bits: + raise ValueError( + f"wrong resolution in {io.name} " + f"({format_bits}, expected <= {max_bits})" + ) - if sf.channels != n_sig: - raise ValueError( - f"wrong number of channels in {fp.name} " - f"({sf.channels}, expected {n_sig})" - ) + if sf.channels != n_sig: + raise ValueError( + f"wrong number of channels in {io.name} " + f"({sf.channels}, expected {n_sig})" + ) + + # Read the samples. + start_samp = start_frame * samps_per_frame[0] + end_samp = end_frame * samps_per_frame[0] + sf.seek(start_samp + sample_offset) + + # We could do this: + # sig_data = sf.read(end_samp - start_samp, dtype=read_dtype) + # However, sf.read fails for huge blocks (over 2**24 total + # samples) due to a bug in libsndfile: + # https://github.com/libsndfile/libsndfile/issues/431 + # So read the data in chunks instead. + n_samp = end_samp - start_samp + sig_data = np.empty((n_samp, n_sig), dtype=read_dtype) + CHUNK_SIZE = 1024 * 1024 + for chunk_start in range(0, n_samp, CHUNK_SIZE): + chunk_end = chunk_start + CHUNK_SIZE + chunk_data = sf.read(out=sig_data[chunk_start:chunk_end]) + samples_read = chunk_data.shape[0] + if samples_read != CHUNK_SIZE: + sig_data = sig_data[: chunk_start + samples_read] + break - # Read the samples. - start_samp = start_frame * samps_per_frame[0] - end_samp = end_frame * samps_per_frame[0] - sf.seek(start_samp + sample_offset) - - # We could do this: - # sig_data = sf.read(end_samp - start_samp, dtype=read_dtype) - # However, sf.read fails for huge blocks (over 2**24 total - # samples) due to a bug in libsndfile: - # https://github.com/libsndfile/libsndfile/issues/431 - # So read the data in chunks instead. - n_samp = end_samp - start_samp - sig_data = np.empty((n_samp, n_sig), dtype=read_dtype) - CHUNK_SIZE = 1024 * 1024 - for chunk_start in range(0, n_samp, CHUNK_SIZE): - chunk_end = chunk_start + CHUNK_SIZE - chunk_data = sf.read(out=sig_data[chunk_start:chunk_end]) - samples_read = chunk_data.shape[0] - if samples_read != CHUNK_SIZE: - sig_data = sig_data[: chunk_start + samples_read] - break - - # If we read an 8-bit stream as int16 or a 24-bit stream as - # int32, soundfile shifts each sample left by 8 bits. We - # want to undo this shift (and, in the case of 8-bit data, - # convert to an int8 array.) - if format_bits == 8: - # np.right_shift(sig_data, 8, dtype='int8') doesn't work. - # This seems wrong, but the numpy documentation is unclear. - sig_data2 = np.empty(sig_data.shape, dtype="int8") - sig_data = np.right_shift(sig_data, 8, out=sig_data2) - elif format_bits == 24: - # Shift 32-bit array in-place. - np.right_shift(sig_data, 8, out=sig_data) + # If we read an 8-bit stream as int16 or a 24-bit stream as + # int32, soundfile shifts each sample left by 8 bits. We + # want to undo this shift (and, in the case of 8-bit data, + # convert to an int8 array.) + if format_bits == 8: + # np.right_shift(sig_data, 8, dtype='int8') doesn't work. + # This seems wrong, but the numpy documentation is unclear. + sig_data2 = np.empty(sig_data.shape, dtype="int8") + sig_data = np.right_shift(sig_data, 8, out=sig_data2) + elif format_bits == 24: + # Shift 32-bit array in-place. + np.right_shift(sig_data, 8, out=sig_data) # Suppose we have 3 channels and 2 samples per frame. The array # returned by sf.read looks like this: diff --git a/wfdb/io/annotation.py b/wfdb/io/annotation.py index b398fa07..fc4284c2 100644 --- a/wfdb/io/annotation.py +++ b/wfdb/io/annotation.py @@ -8,6 +8,7 @@ from wfdb.io import download from wfdb.io import _header +from wfdb.io import _coreio from wfdb.io import record @@ -1946,8 +1947,75 @@ def rdann( sampfrom, sampto, return_label_elements ) + with _coreio._open_file( + record_name + "." + extension, + "rb", + pn_dir=pn_dir, + ) as fp: + annotation = _rdann( + fp, + sampfrom, + sampto, + shift_samps, + return_label_elements, + summarize_labels, + ) + + # Try to get fs from the header file if it is not contained in the + # annotation file + if annotation.fs is None: + try: + rec = record.rdheader(record_name, pn_dir) + annotation.fs = rec.fs + except: + pass + + annotation.record_name = os.path.split(record_name)[1] + annotation.extension = extension + + return annotation + + +def _rdann( + io, + sampfrom=0, + sampto=None, + shift_samps=False, + return_label_elements=["symbol"], + summarize_labels=False, +): + """ + Read a WFDB annotation file and return an Annotation object. + + Parameters + ---------- + io : io + The io to read the annotation file from. + sampfrom : int, optional + The minimum sample number for annotations to be returned. + sampto : int, optional + The maximum sample number for annotations to be returned. + shift_samps : bool, optional + Specifies whether to return the sample indices relative to `sampfrom` + (True), or sample 0 (False). + return_label_elements : list, optional + The label elements that are to be returned from reading the annotation + file. A list with at least one of the following options: 'symbol', + 'label_store', 'description'. + summarize_labels : bool, optional + If True, assign a summary table of the set of annotation labels + contained in the file to the 'contained_labels' attribute of the + returned object. This table will contain the columns: + ['label_store', 'symbol', 'description', 'n_occurrences']. + + Returns + ------- + annotation : Annotation + The Annotation object. Call help(wfdb.Annotation) for the attribute + descriptions. + """ # Read the file in byte pairs - filebytes = load_byte_pairs(record_name, extension, pn_dir) + filebytes = np.frombuffer(io.read(), " str: - """ - Stream the text of a remote header file. - - Parameters - ---------- - file_name : str - The name of the headerr file to be read. - pn_dir : str - The PhysioNet database directory from which to find the - required header file. eg. For file '100.hea' in - 'http://physionet.org/content/mitdb', pn_dir='mitdb'. - - Returns - ------- - N/A : str - The text contained in the header file - - """ - # Full url of header location - url = posixpath.join(config.db_index_url, pn_dir, file_name) - - # Get the content of the remote file - with _url.openurl(url, "rb") as f: - content = f.read() - - return content.decode("iso-8859-1") - - -def _stream_dat(file_name, pn_dir, byte_count, start_byte, dtype): - """ - Stream data from a remote dat file into a 1d numpy array. - - Parameters - ---------- - file_name : str - The name of the dat file to be read. - pn_dir : str - The PhysioNet directory where the dat file is located. - byte_count : int - The number of bytes to be read. - start_byte : int - The starting byte number to read from. - dtype : str - The numpy dtype to load the data into. - - Returns - ------- - sig_data : ndarray - The data read from the dat file. - - """ - # Full url of dat file - url = posixpath.join(config.db_index_url, pn_dir, file_name) - - # Get the content - with _url.openurl(url, "rb", buffering=0) as f: - f.seek(start_byte) - content = f.read(byte_count) - - # Convert to numpy array - sig_data = np.fromstring(content, dtype=dtype) - - return sig_data - - -def _stream_annotation(file_name, pn_dir): - """ - Stream an entire remote annotation file from Physionet. - - Parameters - ---------- - file_name : str - The name of the annotation file to be read. - pn_dir : str - The PhysioNet directory where the annotation file is located. - - Returns - ------- - ann_data : ndarray - The resulting data stream in numpy array format. - - """ - # Full url of annotation file - url = posixpath.join(config.db_index_url, pn_dir, file_name) - - # Get the content - with _url.openurl(url, "rb") as f: - content = f.read() - - # Convert to numpy array - ann_data = np.fromstring(content, dtype=np.dtype("