Cohort

class corr_vars.core.cohort.Cohort(conn_args={}, password_file=None, database='db_hypercapnia_prepared', extraction_end_date='2025-01-02', obs_level='icu_stay', project_vars={}, merge_consecutive=True, load_default_vars=True, filters='')[source]

Bases: object

Class to build a cohort in the CORR database.

Parameters:
  • conn_args (dict) – Dictionary of [remote_hostname, username]

  • password_file (Union[str, bool, None]) – Path to the password file or True if your file is in ~/password.txt.

  • database (Literal['db_hypercapnia_prepared', 'db_corror_prepared']) – Default: “db_hypercapnia_prepared”

  • extraction_end_date (str) – End date for the extraction in the format “YYYY-MM-DD” (default: today).

  • obs_level (Literal['icu_stay', 'hospital_stay', 'procedure']) – Observation level (default: “icu_stay”).

  • project_vars (dict) – Dictionary with local variable definitions.

  • merge_consecutive (bool) – Whether to merge consecutive ICU stays (default: True). Does not apply to any other obs_level.

  • load_default_vars (bool) – Whether to load the default variables (default: True).

  • filters (str) – Initial filters (must be a valid SQL WHERE clause for the it_ishmed_fall table).

obs

Static data for each observation. Contains one row per observation (e.g., ICU stay) with columns for static variables like demographics and outcomes.

Example

>>> cohort.obs
patient_id  case_id icu_stay_id            icu_admission        icu_discharge sex   ... inhospital_death
0  P001         C001    C001_1       2023-01-01 08:30:00  2023-01-03 12:00:00   M   ...  False
1  P001         C001    C001_2       2023-01-03 14:20:00  2023-01-05 16:30:00   M   ...  False
2  P002         C002    C002_1       2023-01-02 09:15:00  2023-01-04 10:30:00   F   ...  False
3  P003         C003    C003_1       2023-01-04 11:45:00  2023-01-07 13:20:00   F   ...  True
...
Type:

pd.DataFrame

obsm

Dynamic data stored as dictionary of DataFrames. Each DataFrame contains time-series data for a variable with columns:

  • recordtime: Timestamp of the measurement

  • value: Value of the measurement

  • recordtime_end: End time (only for duration-based variables like therapies)

  • description: Additional information (e.g., medication names)

Example

>>> cohort.obsm["blood_sodium"]
   icu_stay_id          recordtime  value
0  C001_1      2023-01-01 09:30:00   138
1  C001_1      2023-01-02 10:15:00   141
2  C001_2      2023-01-03 15:00:00   137
3  C002_1      2023-01-02 10:00:00   142
4  C003_1      2023-01-04 12:30:00   139
...
Type:

dict of pd.DataFrame

variables

Dictionary of all variable objects in the cohort. This is used to keep track of variable metadata.

Type:

dict of Variable

Notes

  • For large cohorts, set load_default_vars=False to speed up the extraction. You can use pre-extracted cohorts as starting points and load them using Cohort.load().

  • Variables can be added using cohort.add_variable(). Static variables will be added to obs, dynamic variables to obsm.

Examples

Create a new cohort:

>>> cohort = Cohort(obs_level="icu_stay",
...                 database="db_hypercapnia_prepared",
...                 load_default_vars=False,
...                 password_file=True)

Access static data:

>>> cohort.obs["age_on_admission"]  # Get age for all patients
>>> cohort.obs.loc[cohort.obs["sex"] == "M"]  # Filter for male patients

Access time-series data:

>>> cohort.obsm["blood_sodium"]  # Get all blood sodium measurements
>>> # Get blood sodium measurements for a specific observation
>>> cohort.obsm["blood_sodium"].loc[
...     cohort.obsm["blood_sodium"][cohort.primary_key] == "12345"
... ]
debug_print()[source]

Print debug information about the cohort. Please use this if you are creating a GitHub issue.

Returns:

None

save(filename)[source]

Save the cohort to a pickle file.

Parameters:

filename (str) – Path to the pickle file.

Returns:

None

classmethod load(filename, conn_args={})[source]

Load a cohort from a pickle file. If this file was saved by a different user, you need to pass your database credentials to the function.

Parameters:
  • filename (str) – Path to the pickle file.

  • conn_args (dict) – Database credentials [remote_hostname, username, password_file].

Returns:

A new Cohort object.

Return type:

Cohort

property adata

AnnotatedData representation of the cohort (NEW - Cached version)

Returns an AnnData object, which stores a data matrix together with annotations of observations, variables, and unstructured annotations.

Warning

This returns a copy of the data. Modifications to the returned object will not be reflected in the cohort. To modify the cohort through the Adata object, use Cohort._overwrite_from_adata()

to_adata()[source]

Convert the cohort to an AnnData object.

Returns:

An AnnData object.

Return type:

AnnData

to_csv(folder)[source]

Save the cohort to CSV files.

Parameters:

folder (str) – Path to the folder.

tableone(ignore_cols=[], groupby=None, filter=None, pval=False, **kwargs)[source]

Create a TableOne object for the cohort.

Parameters:
  • ignore_cols (list) – Columns to ignore.

  • groupby (Optional[str]) – Column to group by.

  • filter (Optional[str]) – Filter to apply to the data.

  • pval (bool) – Whether to calculate p-values.

  • **kwargs – Additional arguments to pass to TableOne.

Returns:

A TableOne object.

Return type:

TableOne

Examples

>>> tableone = cohort.tableone()
>>> print(tableone)
>>> tableone.to_csv("tableone.csv")
>>> tableone = cohort.tableone(groupby="sex", pval=False)
>>> print(tableone)
>>> tableone.to_csv("tableone_sex.csv")
load_default_vars()[source]

Load the default variables defined in vars.json. It is recommended to use this after filtering your cohort for eligibility to speed up the process.

Returns:

Variables are loaded into the cohort.

Return type:

None

property axiom_cache: dict

This is a deepcopy of cached native dynamic variables that are always stored at a hospital stay level (i.e. tmin=hospital_admission and tmax=hospital_discharge).

If you do not know what to do with this, you probably do not need it.

clear_axiom_cache()[source]

Delete the axiom cache. Can be useful to free up memory, or for debugging purposes.

Return type:

None

get_obsm_filtered(var_name, tmin, tmax)[source]

Filter a variable stored in obsm by tmin and tmax.

You may specify tmin and tmax as a tuple (e.g. (“hospital_admission”, “+1d”)), in which case it will be relative to the hospital admission time of the patient.

Parameters:
  • var_name (str) – Name of the variable to filter.

  • tmin (str) – Name of the column to use as tmin or tuple (see description).

  • tmax (str) – Name of the column to use as tmax or tuple (see description).

Return type:

DataFrame

Returns:

Filtered variable.

Examples

>>> var_data = cohort.get_obsm_filtered(
...    var_name="blood_sodium",
...    tmin=("hospital_admission", "+1d"),
...    tmax="hospital_discharge"
... )
add_variable(variable, save_as=None, tmin=None, tmax=None)[source]

Add a variable to the cohort.

You may specify tmin and tmax as a tuple (e.g. (“hospital_admission”, “+1d”)), in which case it will be relative to the hospital admission time of the patient.

Parameters:
  • variable (str | Variable) – Variable to add. Either a string with the variable name (from vars.json) or a Variable object.

  • save_as – Name of the column to save the variable as. Defaults to variable name.

  • tmin – Name of the column to use as tmin or tuple (see description).

  • tmax – Name of the column to use as tmax or tuple (see description).

Returns:

Variable is added to the cohort.

Return type:

None

Examples

>>> cohort.add_variable("blood_sodium")
>>> cohort.add_variable(
...    variable="anx_dx_covid_19",
...    tmin=("hospital_admission", "-1d"),
...    tmax=cohort.t_eligible
... )
>>> cohort.add_variable(
...    NativeStatic(
...        var_name="highest_hct_before_eligible",
...        select="!max value",
...        base_var='blood_hematokrit',
...        tmax=cohort.t_eligible
...    )
... )
>>> cohort.add_variable(
...    variable='any_med_glu',
...    save_as="glucose_prior_eligible",
...    tmin=(cohort.t_eligible, "-48h"),
...    tmax=cohort.t_eligible
... )
set_t_eligible(t_eligible, drop_ineligible=True)[source]

Set the time anchor for eligibility. This can be referenced as cohort.t_eligible throughout the process and is required to add inclusion or exclusion criteria.

Parameters:
  • t_eligible (str) – Name of the column to use as t_eligible.

  • drop_ineligible (bool) – Whether to drop ineligible patients. Defaults to True.

Returns:

t_eligible is set.

Return type:

None

Examples

>>> # Add a suitable time-anchor variable
>>> cohort.add_variable(NativeStatic(
...    var_name="spo2_lt_90",
...    base_var="spo2",
...    select="!first recordtime",
...    where="value < 90",
... ))
>>> # Set the time anchor for eligibility
>>> cohort.set_t_eligible("spo2_lt_90")
set_t_outcome(t_outcome)[source]

Set the time anchor for outcome. This can be referenced as cohort.t_outcome throughout the process and is recommended to specify for your study.

Parameters:

t_outcome (str) – Name of the column to use as t_outcome.

Returns:

t_outcome is set.

Return type:

None

Examples

>>> cohort.set_t_outcome("hospital_discharge")
include(*args, **kwargs)[source]

Add an inclusion criterion to the cohort. It is recommended to use Cohort.add_inclusion() and add all of your inclusion criteria at once. However, if you need to specify criteria at a later stage, you can use this method.

Warning

You must call Cohort.add_inclusion() before calling Cohort.include() to ensure that the inclusion criteria are properly tracked.

Parameters:
  • variable (str | Variable)

  • operation (str)

  • label (str)

  • operations_done (str)

  • [Optional – tmin, tmax]

Returns:

Criterion is added to the cohort.

Return type:

None

Examples

>>> cohort.include(
...    variable="age_on_admission",
...    operation=">= 18",
...    label="Adult",
...    operations_done="Include only adult patients"
... )
exclude(*args, **kwargs)[source]

Add an exclusion criterion to the cohort. It is recommended to use Cohort.add_exclusion() and add all of your exclusion criteria at once. However, if you need to specify criteria at a later stage, you can use this method.

Warning

You must call Cohort.add_exclusion() before calling Cohort.exclude() to ensure that the exclusion criteria are properly tracked.

Parameters:
  • variable (str | Variable)

  • operation (str)

  • label (str)

  • operations_done (str)

  • [Optional – tmin, tmax]

Returns:

Criterion is added to the cohort.

Return type:

None

Examples

>>> cohort.exclude(
...    variable="elix_total",
...    operation="> 20",
...    operations_done="Exclude patients with high Elixhauser score"
... )
add_inclusion(inclusion_list=[])[source]

Add an inclusion criteria to the cohort.

Parameters:

inclusion_list (list) –

List of inclusion criteria. Must include a dictionary with keys:
  • variable (str | Variable): Variable to use for exclusion

  • operation (str): Operation to apply (e.g., “> 5”, “== True”)

  • label (str): Short label for the exclusion step

  • operations_done (str): Detailed description of what this exclusion does

  • tmin (str, optional): Start time for variable extraction

  • tmax (str, optional): End time for variable extraction

Returns:

CohortTracker object, can be used to plot inclusion chart

Return type:

ct (CohortTracker)

Note

Per default, all inclusion criteria are applied from tmin=cohort.tmin to tmax=cohort.t_eligible. This is recommended to avoid introducing immortality biases. However, in some cases you might want to set custom time bounds.

Examples

>>> ct = cohort.add_inclusion([
...    {
...        "variable": "age_on_admission",
...        "operation": ">= 18",
...        "label": "Adult patients",
...        "operations_done": "Excluded patients under 18 years old"
...    }
...  ])
>>> ct.plot_flowchart()
add_exclusion(exclusion_list=[])[source]

Add an exclusion criteria to the cohort.

Parameters:

exclusion_list (list) –

List of exclusion criteria. Each criterion is a dictionary containing:

  • variable (str | Variable): Variable to use for exclusion

  • operation (str): Operation to apply (e.g., “> 5”, “== True”)

  • label (str): Short label for the exclusion step

  • operations_done (str): Detailed description of what this exclusion does

  • tmin (str, optional): Start time for variable extraction

  • tmax (str, optional): End time for variable extraction

Returns:

CohortTracker object, can be used to plot exclusion chart

Return type:

ct (CohortTracker)

Note

Per default, all exclusion criteria are applied from tmin=cohort.tmin to tmax=cohort.t_eligible. This is recommended to avoid introducing immortality biases. However, in some cases you might want to set custom time bounds.

Examples

>>> ct = cohort.add_exclusion([
...    {
...        "variable": "any_rrt_icu",
...        "operation": "true",
...        "label": "No RRT",
...        "operations_done": "Excluded RRT before hypernatremia"
...    },
...    {
...        "variable": "any_dx_tbi",
...        "operation": "true",
...        "label": "No TBI",
...        "operations_done": "Excluded TBI before hypernatremia"
...    },
...    {
...        "variable": NativeStatic(
...            var_name="sodium_count",
...            select="!count value",
...            base_var="blood_sodium"),
...        "operation": "< 1",
...        "label": "Final cohort",
...        "operations_done": "Excluded cases with less than 1 sodium measurement after hypernatremia",
...        "tmin": cohort.t_eligible,
...        "tmax": "hospital_discharge"
...    }
...  ])
>>> ct.plot_flowchart() # Plot the exclusion flowchart
add_variable_definition(var_name, var_dict)[source]

Add or update a local variable definition.

Parameters:
  • var_name (str) – Name of the variable

  • var_dict (dict) – Dictionary containing variable definition. Can be partial - missing fields will be inherited from global definition.

Return type:

None

Examples

>>> # Add completely new variable
>>> cohort.add_variable_definition("my_new_var", {
...     "type": "native_dynamic",
...     "table": "it_ishmed_labor",
...     "where": "c_katalog_leistungtext LIKE '%new%'",
...     "value_dtype": "DOUBLE",
...     "cleaning": {"value": {"low": 100, "high": 150}}
... })
>>> # Partially override existing variable
>>> cohort.add_variable_definition("blood_sodium", {
...     "where": "c_katalog_leistungtext LIKE '%custom_sodium%'"
... })

Cohort Workflow

Initialization

cohort = Cohort(
    obs_level="icu_stay",  # One of: "hospital_stay", "icu_stay", "procedure"
    database="db_hypercapnia_prepared",
    merge_consecutive=True,  # Optional, defaults to True. Only for obs_level="icu_stay"
    load_default_vars=False  # Optional, defaults to True
)

Adding Variables

# Use pre-defined variables
cohort.add_variable("pf_ratio")

# Create a custom variable on the fly
cohort.add_variable(
    NativeStatic(
        var_name="median_sodium_before_hn",
        select="!median value",
        base_var="blood_sodium",
        tmin="hospital_admission",
        tmax=cohort.t_eligible
    )
)

# Perform manual operations on the dataframe
cohort.obs['idx_hypernatremia_was_on_admission'] = (
    cohort.obs['first_sodium_recordtime'] == cohort.obs['first_severe_hypernatremia_recordtime']
)

cohort.obs['hn_origin'] = np.where(
    cohort.obs['idx_hypernatremia_was_on_admission'],
    'community_acquired',
    'hospital_acquired'
)

Inclusion/Exclusion

# Add multiple inclusion criteria
cohort.add_inclusion([
    {
        "variable": "age",
        "operation": ">= 18",
        "label": "Adult patients"
    },
    {
        "variable": "icu_length_of_stay",
        "operation": "> 2",
        "label": "ICU stay > 2 days"
    }
])

# Add exclusion criteria
cohort.add_exclusion([
    {
        "variable": "any_dx_covid_19",
        "operation": "== True",
        "label": "Exclude COVID-19 patients"
    }
])

# Visualize the exclusion criteria
cohort.exclude_ct.plot_flowchart()

Exploration

# Display the cohort dataframe
print(cohort.obs)


# Create a TableOne object
tableone = cohort.tableone()
print(tableone)

Data Export

# Save to file
cohort.save("my_cohort.corr")

# Load from file
cohort = Cohort.load("my_cohort.corr")

# Export to CSV
cohort.to_csv("path/to/output_folder")

# Convert to AnnData
adata = cohort.to_adata()