Cohort
- class corr_vars.core.cohort.Cohort(conn_args={}, password_file=None, database='db_hypercapnia_prepared', extraction_end_date='2025-01-02', obs_level='icu_stay', project_vars={}, merge_consecutive=True, load_default_vars=True, filters='')[source]
Bases:
objectClass to build a cohort in the CORR database.
- Parameters:
conn_args (
dict) – Dictionary of [remote_hostname, username]password_file (
Union[str,bool,None]) – Path to the password file or True if your file is in ~/password.txt.database (
Literal['db_hypercapnia_prepared','db_corror_prepared']) – Default: “db_hypercapnia_prepared”extraction_end_date (
str) – End date for the extraction in the format “YYYY-MM-DD” (default: today).obs_level (
Literal['icu_stay','hospital_stay','procedure']) – Observation level (default: “icu_stay”).project_vars (
dict) – Dictionary with local variable definitions.merge_consecutive (
bool) – Whether to merge consecutive ICU stays (default: True). Does not apply to any other obs_level.load_default_vars (
bool) – Whether to load the default variables (default: True).filters (
str) – Initial filters (must be a valid SQL WHERE clause for the it_ishmed_fall table).
- obs
Static data for each observation. Contains one row per observation (e.g., ICU stay) with columns for static variables like demographics and outcomes.
Example
>>> cohort.obs patient_id case_id icu_stay_id icu_admission icu_discharge sex ... inhospital_death 0 P001 C001 C001_1 2023-01-01 08:30:00 2023-01-03 12:00:00 M ... False 1 P001 C001 C001_2 2023-01-03 14:20:00 2023-01-05 16:30:00 M ... False 2 P002 C002 C002_1 2023-01-02 09:15:00 2023-01-04 10:30:00 F ... False 3 P003 C003 C003_1 2023-01-04 11:45:00 2023-01-07 13:20:00 F ... True ...
- Type:
pd.DataFrame
- obsm
Dynamic data stored as dictionary of DataFrames. Each DataFrame contains time-series data for a variable with columns:
recordtime: Timestamp of the measurement
value: Value of the measurement
recordtime_end: End time (only for duration-based variables like therapies)
description: Additional information (e.g., medication names)
Example
>>> cohort.obsm["blood_sodium"] icu_stay_id recordtime value 0 C001_1 2023-01-01 09:30:00 138 1 C001_1 2023-01-02 10:15:00 141 2 C001_2 2023-01-03 15:00:00 137 3 C002_1 2023-01-02 10:00:00 142 4 C003_1 2023-01-04 12:30:00 139 ...
- Type:
dict of pd.DataFrame
- variables
Dictionary of all variable objects in the cohort. This is used to keep track of variable metadata.
- Type:
dict of Variable
Notes
For large cohorts, set
load_default_vars=Falseto speed up the extraction. You can use pre-extracted cohorts as starting points and load them usingCohort.load().Variables can be added using
cohort.add_variable(). Static variables will be added toobs, dynamic variables toobsm.
Examples
Create a new cohort:
>>> cohort = Cohort(obs_level="icu_stay", ... database="db_hypercapnia_prepared", ... load_default_vars=False, ... password_file=True)
Access static data:
>>> cohort.obs["age_on_admission"] # Get age for all patients >>> cohort.obs.loc[cohort.obs["sex"] == "M"] # Filter for male patients
Access time-series data:
>>> cohort.obsm["blood_sodium"] # Get all blood sodium measurements >>> # Get blood sodium measurements for a specific observation >>> cohort.obsm["blood_sodium"].loc[ ... cohort.obsm["blood_sodium"][cohort.primary_key] == "12345" ... ]
- debug_print()[source]
Print debug information about the cohort. Please use this if you are creating a GitHub issue.
- Returns:
None
- save(filename)[source]
Save the cohort to a pickle file.
- Parameters:
filename (
str) – Path to the pickle file.- Returns:
None
- classmethod load(filename, conn_args={})[source]
Load a cohort from a pickle file. If this file was saved by a different user, you need to pass your database credentials to the function.
- Parameters:
filename (
str) – Path to the pickle file.conn_args (
dict) – Database credentials [remote_hostname, username, password_file].
- Returns:
A new Cohort object.
- Return type:
- property adata
AnnotatedData representation of the cohort (NEW - Cached version)
Returns an AnnData object, which stores a data matrix together with annotations of observations, variables, and unstructured annotations.
Warning
This returns a copy of the data. Modifications to the returned object will not be reflected in the cohort. To modify the cohort through the Adata object, use Cohort._overwrite_from_adata()
- to_adata()[source]
Convert the cohort to an AnnData object.
- Returns:
An AnnData object.
- Return type:
AnnData
- to_csv(folder)[source]
Save the cohort to CSV files.
- Parameters:
folder (
str) – Path to the folder.
- tableone(ignore_cols=[], groupby=None, filter=None, pval=False, **kwargs)[source]
Create a TableOne object for the cohort.
- Parameters:
ignore_cols (
list) – Columns to ignore.groupby (
Optional[str]) – Column to group by.filter (
Optional[str]) – Filter to apply to the data.pval (
bool) – Whether to calculate p-values.**kwargs – Additional arguments to pass to TableOne.
- Returns:
A TableOne object.
- Return type:
TableOne
Examples
>>> tableone = cohort.tableone() >>> print(tableone) >>> tableone.to_csv("tableone.csv")
>>> tableone = cohort.tableone(groupby="sex", pval=False) >>> print(tableone) >>> tableone.to_csv("tableone_sex.csv")
- load_default_vars()[source]
Load the default variables defined in
vars.json. It is recommended to use this after filtering your cohort for eligibility to speed up the process.- Returns:
Variables are loaded into the cohort.
- Return type:
None
- property axiom_cache: dict
This is a deepcopy of cached native dynamic variables that are always stored at a hospital stay level (i.e. tmin=hospital_admission and tmax=hospital_discharge).
If you do not know what to do with this, you probably do not need it.
- clear_axiom_cache()[source]
Delete the axiom cache. Can be useful to free up memory, or for debugging purposes.
- Return type:
None
- get_obsm_filtered(var_name, tmin, tmax)[source]
Filter a variable stored in obsm by tmin and tmax.
You may specify tmin and tmax as a tuple (e.g. (“hospital_admission”, “+1d”)), in which case it will be relative to the hospital admission time of the patient.
- Parameters:
var_name (
str) – Name of the variable to filter.tmin (
str) – Name of the column to use as tmin or tuple (see description).tmax (
str) – Name of the column to use as tmax or tuple (see description).
- Return type:
DataFrame- Returns:
Filtered variable.
Examples
>>> var_data = cohort.get_obsm_filtered( ... var_name="blood_sodium", ... tmin=("hospital_admission", "+1d"), ... tmax="hospital_discharge" ... )
- add_variable(variable, save_as=None, tmin=None, tmax=None)[source]
Add a variable to the cohort.
You may specify tmin and tmax as a tuple (e.g. (“hospital_admission”, “+1d”)), in which case it will be relative to the hospital admission time of the patient.
- Parameters:
variable (
str|Variable) – Variable to add. Either a string with the variable name (from vars.json) or a Variable object.save_as – Name of the column to save the variable as. Defaults to variable name.
tmin – Name of the column to use as tmin or tuple (see description).
tmax – Name of the column to use as tmax or tuple (see description).
- Returns:
Variable is added to the cohort.
- Return type:
None
Examples
>>> cohort.add_variable("blood_sodium")
>>> cohort.add_variable( ... variable="anx_dx_covid_19", ... tmin=("hospital_admission", "-1d"), ... tmax=cohort.t_eligible ... )
>>> cohort.add_variable( ... NativeStatic( ... var_name="highest_hct_before_eligible", ... select="!max value", ... base_var='blood_hematokrit', ... tmax=cohort.t_eligible ... ) ... )
>>> cohort.add_variable( ... variable='any_med_glu', ... save_as="glucose_prior_eligible", ... tmin=(cohort.t_eligible, "-48h"), ... tmax=cohort.t_eligible ... )
- set_t_eligible(t_eligible, drop_ineligible=True)[source]
Set the time anchor for eligibility. This can be referenced as cohort.t_eligible throughout the process and is required to add inclusion or exclusion criteria.
- Parameters:
t_eligible (
str) – Name of the column to use as t_eligible.drop_ineligible (
bool) – Whether to drop ineligible patients. Defaults to True.
- Returns:
t_eligible is set.
- Return type:
None
Examples
>>> # Add a suitable time-anchor variable >>> cohort.add_variable(NativeStatic( ... var_name="spo2_lt_90", ... base_var="spo2", ... select="!first recordtime", ... where="value < 90", ... )) >>> # Set the time anchor for eligibility >>> cohort.set_t_eligible("spo2_lt_90")
- set_t_outcome(t_outcome)[source]
Set the time anchor for outcome. This can be referenced as cohort.t_outcome throughout the process and is recommended to specify for your study.
- Parameters:
t_outcome (str) – Name of the column to use as t_outcome.
- Returns:
t_outcome is set.
- Return type:
None
Examples
>>> cohort.set_t_outcome("hospital_discharge")
- include(*args, **kwargs)[source]
Add an inclusion criterion to the cohort. It is recommended to use
Cohort.add_inclusion()and add all of your inclusion criteria at once. However, if you need to specify criteria at a later stage, you can use this method.Warning
You must call
Cohort.add_inclusion()before callingCohort.include()to ensure that the inclusion criteria are properly tracked.- Parameters:
variable (str | Variable)
operation (str)
label (str)
operations_done (str)
[Optional – tmin, tmax]
- Returns:
Criterion is added to the cohort.
- Return type:
None
Examples
>>> cohort.include( ... variable="age_on_admission", ... operation=">= 18", ... label="Adult", ... operations_done="Include only adult patients" ... )
- exclude(*args, **kwargs)[source]
Add an exclusion criterion to the cohort. It is recommended to use
Cohort.add_exclusion()and add all of your exclusion criteria at once. However, if you need to specify criteria at a later stage, you can use this method.Warning
You must call
Cohort.add_exclusion()before callingCohort.exclude()to ensure that the exclusion criteria are properly tracked.- Parameters:
variable (str | Variable)
operation (str)
label (str)
operations_done (str)
[Optional – tmin, tmax]
- Returns:
Criterion is added to the cohort.
- Return type:
None
Examples
>>> cohort.exclude( ... variable="elix_total", ... operation="> 20", ... operations_done="Exclude patients with high Elixhauser score" ... )
- add_inclusion(inclusion_list=[])[source]
Add an inclusion criteria to the cohort.
- Parameters:
inclusion_list (list) –
- List of inclusion criteria. Must include a dictionary with keys:
variable(str | Variable): Variable to use for exclusion
operation(str): Operation to apply (e.g., “> 5”, “== True”)label(str): Short label for the exclusion stepoperations_done(str): Detailed description of what this exclusion doestmin(str, optional): Start time for variable extractiontmax(str, optional): End time for variable extraction
- Returns:
CohortTracker object, can be used to plot inclusion chart
- Return type:
ct (CohortTracker)
Note
Per default, all inclusion criteria are applied from
tmin=cohort.tmintotmax=cohort.t_eligible. This is recommended to avoid introducing immortality biases. However, in some cases you might want to set custom time bounds.Examples
>>> ct = cohort.add_inclusion([ ... { ... "variable": "age_on_admission", ... "operation": ">= 18", ... "label": "Adult patients", ... "operations_done": "Excluded patients under 18 years old" ... } ... ])
>>> ct.plot_flowchart()
- add_exclusion(exclusion_list=[])[source]
Add an exclusion criteria to the cohort.
- Parameters:
exclusion_list (list) –
List of exclusion criteria. Each criterion is a dictionary containing:
variable(str | Variable): Variable to use for exclusionoperation(str): Operation to apply (e.g., “> 5”, “== True”)label(str): Short label for the exclusion stepoperations_done(str): Detailed description of what this exclusion doestmin(str, optional): Start time for variable extractiontmax(str, optional): End time for variable extraction
- Returns:
CohortTracker object, can be used to plot exclusion chart
- Return type:
ct (CohortTracker)
Note
Per default, all exclusion criteria are applied from
tmin=cohort.tmintotmax=cohort.t_eligible. This is recommended to avoid introducing immortality biases. However, in some cases you might want to set custom time bounds.Examples
>>> ct = cohort.add_exclusion([ ... { ... "variable": "any_rrt_icu", ... "operation": "true", ... "label": "No RRT", ... "operations_done": "Excluded RRT before hypernatremia" ... }, ... { ... "variable": "any_dx_tbi", ... "operation": "true", ... "label": "No TBI", ... "operations_done": "Excluded TBI before hypernatremia" ... }, ... { ... "variable": NativeStatic( ... var_name="sodium_count", ... select="!count value", ... base_var="blood_sodium"), ... "operation": "< 1", ... "label": "Final cohort", ... "operations_done": "Excluded cases with less than 1 sodium measurement after hypernatremia", ... "tmin": cohort.t_eligible, ... "tmax": "hospital_discharge" ... } ... ]) >>> ct.plot_flowchart() # Plot the exclusion flowchart
- add_variable_definition(var_name, var_dict)[source]
Add or update a local variable definition.
- Parameters:
var_name (
str) – Name of the variablevar_dict (
dict) – Dictionary containing variable definition. Can be partial - missing fields will be inherited from global definition.
- Return type:
None
Examples
>>> # Add completely new variable >>> cohort.add_variable_definition("my_new_var", { ... "type": "native_dynamic", ... "table": "it_ishmed_labor", ... "where": "c_katalog_leistungtext LIKE '%new%'", ... "value_dtype": "DOUBLE", ... "cleaning": {"value": {"low": 100, "high": 150}} ... })
>>> # Partially override existing variable >>> cohort.add_variable_definition("blood_sodium", { ... "where": "c_katalog_leistungtext LIKE '%custom_sodium%'" ... })
Cohort Workflow
Initialization
cohort = Cohort(
obs_level="icu_stay", # One of: "hospital_stay", "icu_stay", "procedure"
database="db_hypercapnia_prepared",
merge_consecutive=True, # Optional, defaults to True. Only for obs_level="icu_stay"
load_default_vars=False # Optional, defaults to True
)
Adding Variables
# Use pre-defined variables
cohort.add_variable("pf_ratio")
# Create a custom variable on the fly
cohort.add_variable(
NativeStatic(
var_name="median_sodium_before_hn",
select="!median value",
base_var="blood_sodium",
tmin="hospital_admission",
tmax=cohort.t_eligible
)
)
# Perform manual operations on the dataframe
cohort.obs['idx_hypernatremia_was_on_admission'] = (
cohort.obs['first_sodium_recordtime'] == cohort.obs['first_severe_hypernatremia_recordtime']
)
cohort.obs['hn_origin'] = np.where(
cohort.obs['idx_hypernatremia_was_on_admission'],
'community_acquired',
'hospital_acquired'
)
Inclusion/Exclusion
# Add multiple inclusion criteria
cohort.add_inclusion([
{
"variable": "age",
"operation": ">= 18",
"label": "Adult patients"
},
{
"variable": "icu_length_of_stay",
"operation": "> 2",
"label": "ICU stay > 2 days"
}
])
# Add exclusion criteria
cohort.add_exclusion([
{
"variable": "any_dx_covid_19",
"operation": "== True",
"label": "Exclude COVID-19 patients"
}
])
# Visualize the exclusion criteria
cohort.exclude_ct.plot_flowchart()
Exploration
# Display the cohort dataframe
print(cohort.obs)
# Create a TableOne object
tableone = cohort.tableone()
print(tableone)
Data Export
# Save to file
cohort.save("my_cohort.corr")
# Load from file
cohort = Cohort.load("my_cohort.corr")
# Export to CSV
cohort.to_csv("path/to/output_folder")
# Convert to AnnData
adata = cohort.to_adata()