Source code for pyllars.physionet_utils

"""
This module contains functions for working with datasets from physionet,
including MIMIC and the Computing in Cardiology Challenge 2012 datasets.

In the future, this module may be renamed and updated to also work with
the eICU dataset.

Please see the respective documentation for more details:

* MIMIC-III: https://mimic.physionet.org/about/mimic/
* CINC 2012: https://physionet.org/challenge/2012/
* eICU: https://eicu-crd.mit.edu/about/eicu/
"""

import logging
logger = logging.getLogger(__name__)

import datetime
import os

import dask
import joblib
import numpy as np
import pandas as pd
from dask import dataframe as dd

import pyllars.pandas_utils as pd_utils
import pyllars.utils as utils

import typing
from typing import Iterable, List

###
# MIMIC
###

[docs]def _fix_mimic_icd(icd): """ Add the decimal to the correct location for the ICD code From the mimic documentation (https://mimic.physionet.org/mimictables/diagnoses_icd/): > The code field for the ICD-9-CM Principal and Other Diagnosis Codes > is six characters in length, with the decimal point implied between > the third and fourth digit for all diagnosis codes other than the V > codes. The decimal is implied for V codes between the second and third > digit. """ icd = str(icd) if len(icd) == 3: # then we just have the major chapter icd = icd else: icd = [ icd[:3], ".", icd[3:] ] icd = "".join(icd) return icd
[docs]def fix_mimic_icds(icds:Iterable[str]) -> List[str]: """ Add the decimal to the correct location for the given ICD codes Since adding the decimals is a string-based operation, it can be somewhat slow. Thus, it may make sense to perform any filtering before fixing the (possibly much smaller number of) ICD codes. Parameters ---------- icds : typing.Iterable[str] ICDs from the various mimic ICD columns Returns ------- fixed_icds: List[str] The ICD codes with decimals in the correct location """ fixed_icds = [ _fix_mimic_icd(icd) for icd in icds ] return fixed_icds
[docs]def get_admissions(mimic_base:str, **kwargs) -> pd.DataFrame: """ Load the ADMISSIONS table This function automatically treats the following columns as date-times: * `ADMITTIME` * `DISCHTIME` * `DEATHTIME` * `EDREGTIME` * `EDOUTTIME` Parameters ---------- mimic_base : str The path to the main MIMIC folder kwargs: <key>=<value> pairs Additional key words to pass to `read_df` Returns ------- admissions : pandas.DataFrame The admissions table as a pandas data frame """ date_cols = [ 'ADMITTIME', 'DISCHTIME', 'DEATHTIME', 'EDREGTIME', 'EDOUTTIME' ] admissions = os.path.join(mimic_base, "ADMISSIONS.csv.gz") admissions = pd_utils.read_df(admissions, parse_dates=date_cols, **kwargs) return admissions
[docs]def get_diagnosis_icds( mimic_base:str, drop_incomplete_records:bool=False, fix_icds:bool=False) -> pd.DataFrame: """ Load the `DIAGNOSES_ICDS` table Parameters ---------- mimic_base : str The path to the main MIMIC folder drop_incomplete_records : bool Some of the ICD codes are missing. If this flag is `True`, then those records will be removed. fix_icds : bool Whether to add the decimal point in the correct position for the ICD codes Returns ------- diagnosis_icds : pandas.DataFrame The diagnosis ICDs table as a pandas data frame """ diagnosis_icds = os.path.join(mimic_base, "DIAGNOSES_ICD.csv.gz") diagnosis_icds = pd_utils.read_df(diagnosis_icds) if fix_icds: msg = "[mimic_utils]: Adding decimals to ICD codes" logger.debug(msg) fixed_icds = fix_mimic_icds(diagnosis_icds['ICD9_CODE']) diagnosis_icds['ICD9_CODE'] = fixed_icd if drop_incomplete_records: msg = "[mimic_utils]: removing incomplete diagnosis ICD records" logger.debug(msg) diagnosis_icds = diagnosis_icds.dropna() return diagnosis_icds
[docs]def get_followups(mimic_base:str) -> pd.DataFrame: """ Load the (constructed) FOLLOWUPS table Parameters ---------- mimic_base : str The path to the main MIMIC folder Returns ------- df_followups : pandas.DataFrame A data frame containing the followup information """ followups = os.path.join(mimic_base, 'FOLLOWUPS.jpkl.gz') df_followups = joblib.load(followups) return df_followups
[docs]def get_icu_stays(mimic_base, to_pandas=True, **kwargs): """ Load the ICUSTAYS table Parameters ---------- mimic_base: path-like The path to the main MIMIC folder to_pandas: bool Whether to read the table as a pandas (True) or dask (False) data frame kwargs: key=value pairs Additional keywords to pass to the appropriate `read` function Returns ------- patients: pd.DataFrame or dd.DataFrame The patients table as either a pandas or dask data frame, depending on the value of to_pandas """ date_cols = [ 'INTIME', 'OUTTIME' ] icu_stays = os.path.join(mimic_base, "ICUSTAYS.csv.gz") if to_pandas: icu_stays = pd_utils.read_df(icu_stays, parse_dates=date_cols, **kwargs) else: icu_stays = dd.read_csv(icu_stays, parse_dates=date_cols, **kwargs) return icu_stays
[docs]def get_lab_events(mimic_base, to_pandas=True, drop_missing_admission=False, parse_dates=True, **kwargs): """ Load the LABEVENTS table Parameters ---------- mimic_base: path-like The path to the main MIMIC folder to_pandas: bool Whether to read the table as a pandas (True) or dask (False) data frame drop_missing_admission: bool About 20% of the lab events do not have an associated HADM_ID. If this flag is True, then those will be removed. parse_dates: bool Whether to directly parse `CHARTTIME` as a date. The main reason to skip this (when `parse_dates` is `False`) is if the `CHARTTIME` column is skipped (using the `usecols` parameter). kwargs: key=value pairs Additional keywords to pass to the appropriate `read` function Returns ------- lab_events: pd.DataFrame or dd.DataFrame The notes table as either a pandas or dask data frame, depending on the value of to_pandas """ date_cols = [] if parse_dates: date_cols = ['CHARTTIME'] lab_events = os.path.join(mimic_base, "LABEVENTS.csv.gz") if to_pandas: lab_events = pd_utils.read_df(lab_events, parse_dates=date_cols, **kwargs) else: lab_events = dd.read_csv(lab_events, parse_dates=date_cols, **kwargs) if drop_missing_admission: msg = ("[physionet.get_lab_events] removing lab events with no " "associated hospital admission") logger.debug(msg) lab_events = lab_events.dropna(subset=['HADM_ID']) return lab_events
[docs]def get_lab_items(mimic_base, to_pandas=True, **kwargs): """ Load the D_LABITEMS table Parameters ---------- mimic_base: path-like The path to the main MIMIC folder to_pandas: bool Whether to read the table as a pandas (True) or dask (False) data frame kwargs: key=value pairs Additional keywords to pass to the appropriate `read` function Returns ------- diagnosis_icds: pd.DataFrame or dd.DataFrame The notes table as either a pandas or dask data frame, depending on the value of to_pandas """ lab_items = os.path.join(mimic_base, "D_LABITEMS.csv.gz") if to_pandas: lab_items = pd_utils.read_df(lab_items, **kwargs) else: lab_items = dd.read_csv(lab_items, **kwargs) return lab_items
[docs]def get_notes(mimic_base, to_pandas=True, **kwargs): """ Load the NOTEEVENTS table Parameters ---------- mimic_base: path-like The path to the main MIMIC folder to_pandas: bool Whether to read the table as a pandas (True) or dask (False) data frame kwargs: key=value pairs Additional keywords to pass to the appropriate `read` function Returns ------- diagnosis_icds: pd.DataFrame or dd.DataFrame The notes table as either a pandas or dask data frame, depending on the value of to_pandas """ note_events = os.path.join(mimic_base, "NOTEEVENTS.csv.gz") if to_pandas: note_events = pd_utils.read_df(note_events, **kwargs) else: note_events = dd.read_csv(note_events, **kwargs) return note_events
[docs]def get_patients(mimic_base, to_pandas=True, **kwargs): """ Load the PATIENTS table Parameters ---------- mimic_base: path-like The path to the main MIMIC folder to_pandas: bool Whether to read the table as a pandas (True) or dask (False) data frame kwargs: key=value pairs Additional keywords to pass to the appropriate `read` function Returns ------- patients: pd.DataFrame or dd.DataFrame The patients table as either a pandas or dask data frame, depending on the value of to_pandas """ date_cols = [ "DOB", "DOD", "DOD_HOSP", "DOD_SSN" ] patients = os.path.join(mimic_base, "PATIENTS.csv.gz") if to_pandas: patients = pd_utils.read_df(patients, parse_dates=date_cols, **kwargs) else: patients = dd.read_csv(patients, parse_dates=date_cols, **kwargs) return patients
[docs]def get_procedure_icds(mimic_base, to_pandas=True): """ Load the PROCEDURES_ICD table Parameters ---------- mimic_base: path-like The path to the main MIMIC folder to_pandas: bool Whether to read the table as a pandas (True) or dask (False) data frame Returns ------- procedure_icds: pd.DataFrame or dd.DataFrame The procedure ICDs table as either a pandas or dask data frame, depending on the value of to_pandas """ procedure_icds = os.path.join(mimic_base, "PROCEDURES_ICD.csv.gz") if to_pandas: procedure_icds = pd_utils.read_df(procedure_icds) else: procedure_icds = dd.read_csv(procedure_icds) return procedure_icds
[docs]def get_transfers(mimic_base, to_pandas=True, **kwargs): """ Load the TRANSFERS table Parameters ---------- mimic_base: path-like The path to the main MIMIC folder to_pandas: bool Whether to read the table as a pandas (True) or dask (False) data frame kwargs: key=value pairs Additional keywords to pass to the appropriate `read` function Returns ------- transfers: pd.DataFrame or dd.DataFrame The transfers table as either a pandas or dask data frame, depending on the value of to_pandas """ date_cols = [ "INTIME", "OUTTIME" ] transfers = os.path.join(mimic_base, "TRANSFERS.csv.gz") if to_pandas: transfers = pd_utils.read_df(transfers, parse_dates=date_cols, **kwargs) else: transfers = dd.read_csv(transfers, parse_dates=date_cols, **kwargs) return transfers
### # Creating the FOLLOWUPS table ### def _get_followups(g): subject_id = g.iloc[0]['SUBJECT_ID'] g = g.sort_values('ADMITTIME') # just move the id's up one followup_hadm_ids = g['HADM_ID'].shift(-1) # set the followup of the last admission to -1 followup_hadm_ids = followup_hadm_ids.fillna(-1) # and make them integral followup_hadm_ids = followup_hadm_ids.astype(int) df_followups = pd.DataFrame() df_followups['HADM_ID'] = g['HADM_ID'].copy() df_followups['FOLLOWUP_HADM_ID'] = followup_hadm_ids df_followups['FOLLOWUP_TIME'] = g['ADMITTIME'].shift(-1) - g['DISCHTIME'] df_followups['SUBJECT_ID'] = subject_id return df_followups
[docs]def create_followups_table(mimic_base, progress_bar=True): """ Create the FOLLOWUPS table, based on the admissions In particular, the table has the following columns: * HADM_ID * FOLLOWUP_HADM_ID * FOLLOWUP_TIME: the difference between the discharge time of the first admission and the admit time of the second admission * SUBJECT_ID Parameters ---------- mimic_base: path-like The path to the main MIMIC folder progress_bar: bool Whether to show a progress bar for creating the table Returns ------- df_followups: pd.DataFrame The data frame constructed as described above. Currently, there is no need to create this table more than once. It can just be written to disk and loaded using `get_followups` after the initial creation. """ df_admissions = physionet_utils.get_admissions(mimic_basepath) g_admissions = df_admissions.groupby('SUBJECT_ID') all_followup_dfs = pd_utils.apply_groups( g_admissions, _get_followups, progress_bar=progress_bar ) df_followups = pd.concat(all_followup_dfs) return df_followups
### # waveform database ###
[docs]def parse_rdsamp_datetime(fname, version=2): """ Extract the identifying information from the filename of the MIMIC-III header (\*hea) files In this project, we refer to each of these files as an "episode". Parameters ---------- fname: string The name of the file. It should be of the form: version 1: /path/to/my/s09870-2111-11-04-12-36.hea /path/to/my/s09870-2111-11-04-12-36n.hea version 2: /path/to/my/p000020-2183-04-28-17-47.hea /path/to/my/p000020-2183-04-28-17-47n.hea Returns ------- episode_timestap: dict A dictionary containing the time stamp and subject id for this episode. Specifically, it includes the following keys: * SUBJECT_ID: the patient identifier * EPISODE_ID: the identifier for this episode * EPISODE_BEGIN_TIME: the beginning time for this episode """ if version == 1: end = 6 elif version == 2: end = 7 else: msg = "[parse_rdsamp_datetime] unknown version: {}".format(version) raise ValueError(msg) dt_fmt = "%Y-%m-%d-%H-%M" basename = utils.get_basename(fname) episode_id = basename subject_id = basename[1:end] subject_id = int(subject_id) dt = basename[end+1:] # in case dt still has the "n" at the end, remove it dt = dt.replace("n", "") dt_p = datetime.datetime.strptime(dt, dt_fmt) ret = { "SUBJECT_ID": subject_id, "EPISODE_BEGIN_TIME": dt_p, "EPISODE_ID": episode_id } return ret
### # CinC 2012: https://physionet.org/challenge/2012/ # # "cinc_2012" ### CinC_2012_BOOKKEEPING_FIELDS = { 'HADM_ID' } _CinC_2012_DESCRIPTOR_FIELDS = { 'RecordID', 'Age', 'Gender', 'Height', 'ICUType', 'Weight' } CinC_2012_DESCRIPTOR_FIELDS = { 'AGE', 'GENDER', 'HEIGHT', 'ICU_TYPE', 'WEIGHT' } CinC_2012_GENDER_MAP = { 0: 'female', 1: 'male', 2: np.nan, None: np.nan, np.nan: np.nan, 'female': 0, 'FEMALE': 0, 'male': 1, 'MALE': 1 } CinC_2012_ICU_TYPE_MAP = { 1: "coronary_care_unit", 2: "cardiac_surgery_recovery_unit", 3: "medical_icu", 4: "surgical_icu", None: np.nan, np.nan: np.nan, "coronary_care_unit": 1, "cardiac_surgery_recovery_unit": 2, "medical_icu": 3, "surgical_icu": 4 } CinC_2012_TIME_SERIES_MEASUREMENTS = [ 'ALP', 'ALT', 'AST', 'Albumin', 'BUN', 'Bilirubin', 'Creatinine', 'DiasABP', 'FiO2', 'GCS', 'Glucose', 'HCO3', 'HCT', 'HR', 'K', 'Lactate', 'MAP', 'MechVent', 'Mg', 'NIDiasABP', 'NIMAP', 'NISysABP', 'Na', 'PaCO2', 'PaO2', 'Platelets', 'SaO2', 'SysABP', 'Temp', 'Urine', 'WBC', 'pH' ] CinC_2012_OUTCOME_FIELDS = { 'ADMISSION_ELAPSED_TIME', 'EXPIRED', 'SAPS-I', 'SOFA', 'SURVIVAL' }
[docs]def get_cinc_2012_outcomes(cinc_2012_base, to_pandas=True): """ Load the Outcomes-a.txt file. N.B. This file is assumed to be named "Outcomes-a.txt" and located directly in the cinc_2012_base directory Parameters ---------- cinc_2012_base: path-like The path to the main folder for this CinC challenge to_pandas: bool Whether to read the table as a pandas (True) or dask (False) data frame Returns ------- cinc_2012_base: pd.DataFrame or dd.DataFrame The "Outcomes-a" table as either a pandas or dask data frame, depending on the value of to_pandas. It comtains the following columns: * HADM_ID: string, a key into the record table * SAPS-I: integer, the SAPS-I score * SOFA: integer, the SOFA score * ADMISSION_ELAPSED_TIME: pd.timedelta, time in the hospital, in days * SURVIVAL: time between ICU admission and observed death. If the patient survived (or the death was not recorded), then the value is np.nan. * EXPIRED: bool, whether the patient died in the hospital """ outcomes_df = os.path.join(cinc_2012_base, "Outcomes-a.txt") if to_pandas: outcomes_df = pd_utils.read_df(outcomes_df) else: outcomes_df = dd.read_csv(outcomes_df) new_columns = { 'RecordID': 'HADM_ID', 'Length_of_stay': 'ADMISSION_ELAPSED_TIME', 'Survival': 'SURVIVAL', 'In-hospital_death': 'EXPIRED' } outcomes_df = outcomes_df.rename(columns=new_columns) # convert expired to a boolean outcomes_df['EXPIRED'] = (outcomes_df['EXPIRED'] == 1) # and survival to an elapsed time outcomes_df['SURVIVAL'] = outcomes_df['SURVIVAL'].replace(-1, np.nan) s = pd.to_timedelta(outcomes_df['SURVIVAL'], unit='d') outcomes_df['SURVIVAL'] = s # and the time of the episode to a timedelta e = pd.to_timedelta(outcomes_df['ADMISSION_ELAPSED_TIME'], unit='d') outcomes_df['ADMISSION_ELAPSED_TIME'] = e return outcomes_df
[docs]def _get_cinc_2012_record_descriptor(record_file_df): """ Given the record file data frame, use the first six rows to extract the descriptor information. See the documentation (https://physionet.org/challenge/2012/, "General descriptors") for more details. """ # first, only look for the specified fields m_icu_fields = record_file_df['Parameter'].isin(_CinC_2012_DESCRIPTOR_FIELDS) # and at time "00:00" m_time = record_file_df['Time'] == "00:00" m_descriptors = m_icu_fields & m_time record_descriptor = record_file_df[m_descriptors] record_descriptor = pd_utils.dataframe_to_dict(record_descriptor, "Parameter", "Value") # handle Gender as a special case if np.isnan(record_descriptor.get('Gender')): record_descriptor['Gender'] = 2 # now, fix the data types record_descriptor = { "HADM_ID": int(record_descriptor.get('RecordID', 0)), "ICU_TYPE": CinC_2012_ICU_TYPE_MAP[record_descriptor.get('ICUType')], "GENDER": CinC_2012_GENDER_MAP[record_descriptor.get('Gender')], "AGE": record_descriptor.get('Age'), "HEIGHT": record_descriptor.get('Height'), "WEIGHT": record_descriptor.get('Weight') } return record_descriptor
[docs]def get_cinc_2012_record(cinc_2012_base, record_id, wide=True): """ Load the record file for the given id. N.B. This file is assumed to be named "<record_id>.txt" and located in the "<cinc_2012_base>/set-a" directory. Parameters ---------- cinc_2012_base: path-like The path to the main folder for this CinC challenge record_id: string-like The identifier for this record, e.g., "132539" wide: bool Whether to return a "long" or "wide" data frame N.B. According to the specification (https://physionet.org/challenge/2012/, "General descriptors"), six descriptors are recorded only when the patients are admitted to the ICU and are included only once at the beginning of the record. Returns ------- record_descriptors: dictionary The six descriptors: * HADM_ID: string, the record id. We call it "HADM_ID" to keep the nomenclature consistent with the MIMIC data * ICU_TYPE: string ["coronary_care_unit", "cardiac_surgery_recovery_unit", "medical_icu","surgical_icu"] * GENDER: string ['female', 'male'] * AGE: float (or np.nan for missing) * WEIGHT: float (or np.nan for missing) * HEIGHT: float (or np.nan for missing) observations: pd.DataFrame The remaining time series entries for this record. This is returned as either a "long" or "wide" data frame with columns: * HADM_ID: string (added for easy joining, etc.) * ELAPSED_TIME: timedelta64[ns] * MEASUREMENT: the name of the measurement * VALUE: the value of the measurement For a wide data frame, there is instead one column for each measurement. """ record_file = "{}.txt".format(record_id) record_file = os.path.join(cinc_2012_base, "set-a", record_file) observations = pd.read_csv(record_file) # from documentation: # # "A value of -1 indicates missing or unknown data" observations = observations.replace(-1, np.nan) # first, get the descriptor descriptor = _get_cinc_2012_record_descriptor(observations) # we do not want to expose this function, but we know the elapsed times # are of the form "HH:MM" def parse_timedelta(hhmm): hours, minutes = hhmm.split(":") delta = datetime.timedelta(hours=int(hours), minutes=int(minutes)) return delta # and now the observations # first, discard the descriptor fields m_d = observations['Parameter'].isin(_CinC_2012_DESCRIPTOR_FIELDS) observations = observations[~m_d] # It is possible we actually do not have any observations besides the # descriptors if len(observations) == 0: msg = "Did not have any observations for record: {}".format(record_id) raise ValueError(msg) observations['Time'] = observations['Time'].apply(parse_timedelta) # rename the columns to match the MIMIC columns a bit better new_columns = { 'Time': 'ELAPSED_TIME', 'Parameter': 'MEASUREMENT', 'Value': 'VALUE' } observations = observations.rename(columns=new_columns) # pivot, if needed if wide: observations = pd.pivot_table( observations, values='VALUE', index='ELAPSED_TIME', columns='MEASUREMENT', fill_value=np.nan ) observations = observations.reset_index() observations.columns.name = None # either way, add the episode id observations['HADM_ID'] = descriptor['HADM_ID'] # and return the descriptor and observations return descriptor, observations