Skip to content

segger.data._utils

The _utils module provides core utility functions for data processing, filtering, and management in the Segger framework. This module contains essential functions for handling spatial transcriptomics data, including coordinate processing, data filtering, and settings management.

SpatialTranscriptomicsDataset

SpatialTranscriptomicsDataset(root, transform=None, pre_transform=None, pre_filter=None)

Bases: InMemoryDataset

A dataset class for handling SpatialTranscriptomics spatial transcriptomics data.

Attributes:

Name Type Description
root str

The root directory where the dataset is stored.

transform callable

A function/transform that takes in a Data object and returns a transformed version.

pre_transform callable

A function/transform that takes in a Data object and returns a transformed version.

pre_filter callable

A function that takes in a Data object and returns a boolean indicating whether to keep it.

Initialize the SpatialTranscriptomicsDataset.

Parameters:

Name Type Description Default
root str

Root directory where the dataset is stored.

required
transform callable

A function/transform that takes in a Data object and returns a transformed version. Defaults to None.

None
pre_transform callable

A function/transform that takes in a Data object and returns a transformed version. Defaults to None.

None
pre_filter callable

A function that takes in a Data object and returns a boolean indicating whether to keep it. Defaults to None.

None
Source code in src/segger/data/_utils.py
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
def __init__(
    self,
    root: str,
    transform: Callable = None,
    pre_transform: Callable = None,
    pre_filter: Callable = None,
):
    """Initialize the SpatialTranscriptomicsDataset.

    Args:
        root (str): Root directory where the dataset is stored.
        transform (callable, optional): A function/transform that takes in a Data object and returns a transformed version. Defaults to None.
        pre_transform (callable, optional): A function/transform that takes in a Data object and returns a transformed version. Defaults to None.
        pre_filter (callable, optional): A function that takes in a Data object and returns a boolean indicating whether to keep it. Defaults to None.
    """
    super().__init__(root, transform, pre_transform, pre_filter)

processed_file_names property

processed_file_names

Return a list of processed file names in the processed directory.

Returns:

Type Description
List[str]

List[str]: List of processed file names.

raw_file_names property

raw_file_names

Return a list of raw file names in the raw directory.

Returns:

Type Description
List[str]

List[str]: List of raw file names.

download

download()

Download the raw data. This method should be overridden if you need to download the data.

Source code in src/segger/data/_utils.py
957
958
959
def download(self) -> None:
    """Download the raw data. This method should be overridden if you need to download the data."""
    pass

get

get(idx)

Get a processed data object.

Parameters:

Name Type Description Default
idx int

Index of the data object to retrieve.

required

Returns:

Name Type Description
Data Data

The processed data object.

Source code in src/segger/data/_utils.py
973
974
975
976
977
978
979
980
981
982
983
984
985
986
def get(self, idx: int) -> Data:
    """Get a processed data object.

    Args:
        idx (int): Index of the data object to retrieve.

    Returns:
        Data: The processed data object.
    """
    data = torch.load(
        os.path.join(self.processed_dir, self.processed_file_names[idx])
    )
    data["tx"].x = data["tx"].x.to_dense()
    return data

len

len()

Return the number of processed files.

Returns:

Name Type Description
int int

Number of processed files.

Source code in src/segger/data/_utils.py
965
966
967
968
969
970
971
def len(self) -> int:
    """Return the number of processed files.

    Returns:
        int: Number of processed files.
    """
    return len(self.processed_file_names)

process

process()

Process the raw data and save it to the processed directory. This method should be overridden if you need to process the data.

Source code in src/segger/data/_utils.py
961
962
963
def process(self) -> None:
    """Process the raw data and save it to the processed directory. This method should be overridden if you need to process the data."""
    pass

add_transcript_ids

add_transcript_ids(transcripts_df, x_col, y_col, id_col='transcript_id', precision=1000)

Add unique transcript IDs to a DataFrame based on x,y coordinates.

Parameters:

Name Type Description Default
transcripts_df DataFrame

DataFrame containing transcript data with x,y coordinates.

required
x_col str

Name of the x-coordinate column.

required
y_col str

Name of the y-coordinate column.

required
id_col str

Name of the column to store the transcript IDs. Defaults to "transcript_id".

'transcript_id'
precision int

Precision multiplier for coordinate values to handle floating point precision. Defaults to 1000.

1000

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with added transcript_id column.

Source code in src/segger/data/_utils.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
def add_transcript_ids(
    transcripts_df: pd.DataFrame,
    x_col: str,
    y_col: str,
    id_col: str = "transcript_id",
    precision: int = 1000,
) -> pd.DataFrame:
    """Add unique transcript IDs to a DataFrame based on x,y coordinates.

    Args:
        transcripts_df: DataFrame containing transcript data with x,y coordinates.
        x_col: Name of the x-coordinate column.
        y_col: Name of the y-coordinate column.
        id_col: Name of the column to store the transcript IDs. Defaults to "transcript_id".
        precision: Precision multiplier for coordinate values to handle floating point precision.
            Defaults to 1000.

    Returns:
        pd.DataFrame: DataFrame with added transcript_id column.
    """
    # Create coordinate strings with specified precision
    x_coords = np.round(transcripts_df[x_col] * precision).astype(int).astype(str)
    y_coords = np.round(transcripts_df[y_col] * precision).astype(int).astype(str)
    coords_str = x_coords + "_" + y_coords

    # Generate unique IDs using a deterministic hash function
    def hash_coords(s):
        """Generate a deterministic hash for coordinate strings.

        Args:
            s: Coordinate string to hash.

        Returns:
            int: 8-digit integer hash value.
        """
        # Use a fixed seed for reproducibility
        seed = 1996
        # Combine string with seed and take modulo to get an 8-digit integer
        return abs(hash(s + str(seed))) % 100000000

    tx_ids = np.array([hash_coords(s) for s in coords_str], dtype=np.int32)

    # Add IDs to DataFrame
    transcripts_df = transcripts_df.copy()
    transcripts_df[id_col] = tx_ids

    return transcripts_df

calculate_gene_celltype_abundance_embedding

calculate_gene_celltype_abundance_embedding(adata, celltype_column)

Calculate the cell type abundance embedding for each gene based on the fraction of cells in each cell type that express the gene (non-zero expression).

Parameters:

Name Type Description Default
adata AnnData

An AnnData object containing gene expression data and cell type information.

required
celltype_column str

The column name in adata.obs that contains the cell type information.

required

Returns:

Type Description
DataFrame

pd.DataFrame: A DataFrame where rows are genes and columns are cell types, with each value representing the fraction of cells in that cell type expressing the gene.

Example

adata = AnnData(...) # Load your scRNA-seq AnnData object celltype_column = 'celltype_major' abundance_df = calculate_gene_celltype_abundance_embedding(adata, celltype_column) abundance_df.head()

Source code in src/segger/data/_utils.py
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
def calculate_gene_celltype_abundance_embedding(
    adata: ad.AnnData, celltype_column: str
) -> pd.DataFrame:
    """Calculate the cell type abundance embedding for each gene based on the fraction of cells in each cell type
    that express the gene (non-zero expression).

    Parameters:
        adata (ad.AnnData): An AnnData object containing gene expression data and cell type information.
        celltype_column (str): The column name in `adata.obs` that contains the cell type information.

    Returns:
        pd.DataFrame: A DataFrame where rows are genes and columns are cell types, with each value representing
            the fraction of cells in that cell type expressing the gene.

    Example:
        >>> adata = AnnData(...)  # Load your scRNA-seq AnnData object
        >>> celltype_column = 'celltype_major'
        >>> abundance_df = calculate_gene_celltype_abundance_embedding(adata, celltype_column)
        >>> abundance_df.head()
    """
    # Extract expression data (cells x genes) and cell type information (cells)
    expression_data = adata.X.toarray() if hasattr(adata.X, "toarray") else adata.X
    cell_types = adata.obs[celltype_column].values
    # Create a binary matrix for gene expression (1 if non-zero, 0 otherwise)
    gene_expression_binary = (expression_data > 0).astype(int)
    # Convert the binary matrix to a DataFrame
    gene_expression_df = pd.DataFrame(
        gene_expression_binary, index=adata.obs_names, columns=adata.var_names
    )
    # Perform one-hot encoding on the cell types
    encoder = OneHotEncoder(sparse_output=False)
    cell_type_encoded = encoder.fit_transform(cell_types.reshape(-1, 1))
    # Calculate the fraction of cells expressing each gene per cell type
    cell_type_abundance_list = []
    for i in range(cell_type_encoded.shape[1]):
        # Extract cells of the current cell type
        cell_type_mask = cell_type_encoded[:, i] == 1
        # Calculate the abundance: sum of non-zero expressions in this cell type / total cells in this cell type
        abundance = gene_expression_df[cell_type_mask].mean(axis=0)
        cell_type_abundance_list.append(abundance)
    # Create a DataFrame for the cell type abundance with gene names as rows and cell types as columns
    cell_type_abundance_df = pd.DataFrame(
        cell_type_abundance_list, columns=adata.var_names, index=encoder.categories_[0]
    ).T
    return cell_type_abundance_df

compute_nuclear_transcripts

compute_nuclear_transcripts(polygons, transcripts, x_col, y_col, nuclear_column=None, nuclear_value=None)

Compute which transcripts are nuclear based on their coordinates and the nuclear polygons.

Parameters:

Name Type Description Default
polygons GeoSeries

The nuclear polygons.

required
transcripts DataFrame

The transcripts DataFrame.

required
x_col str

The x-coordinate column name.

required
y_col str

The y-coordinate column name.

required
nuclear_column str

The column name that indicates if a transcript is nuclear. Defaults to None.

None
nuclear_value str

The value in nuclear_column that indicates a nuclear transcript. Defaults to None.

None

Returns:

Type Description
Series

pd.Series: A boolean series indicating which transcripts are nuclear.

Source code in src/segger/data/_utils.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def compute_nuclear_transcripts(
    polygons: gpd.GeoSeries,
    transcripts: pd.DataFrame,
    x_col: str,
    y_col: str,
    nuclear_column: str = None,
    nuclear_value: str = None,
) -> pd.Series:
    """Compute which transcripts are nuclear based on their coordinates and the
    nuclear polygons.

    Args:
        polygons: The nuclear polygons.
        transcripts: The transcripts DataFrame.
        x_col: The x-coordinate column name.
        y_col: The y-coordinate column name.
        nuclear_column: The column name that indicates if a transcript is nuclear. Defaults to None.
        nuclear_value: The value in nuclear_column that indicates a nuclear transcript. Defaults to None.

    Returns:
        pd.Series: A boolean series indicating which transcripts are nuclear.
    """
    # If nuclear_column and nuclear_value are provided, use them
    if nuclear_column is not None and nuclear_value is not None:
        if nuclear_column in transcripts.columns:
            return transcripts[nuclear_column].eq(nuclear_value)

    # Otherwise compute based on coordinates
    points = gpd.GeoSeries(gpd.points_from_xy(transcripts[x_col], transcripts[y_col]))
    return points.apply(lambda p: any(p.within(poly) for poly in polygons))

compute_transcript_metrics

compute_transcript_metrics(df, qv_threshold=30, cell_id_col='cell_id')

Computes various metrics for a given dataframe of transcript data filtered by quality value threshold.

Parameters:

Name Type Description Default
df DataFrame

The dataframe containing transcript data.

required
qv_threshold float

The quality value threshold for filtering transcripts.

30
cell_id_col str

The name of the column representing the cell ID.

'cell_id'

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: A dictionary containing various transcript metrics: - 'percent_assigned' (float): The percentage of assigned transcripts. - 'percent_cytoplasmic' (float): The percentage of cytoplasmic transcripts among assigned transcripts. - 'percent_nucleus' (float): The percentage of nucleus transcripts among assigned transcripts. - 'percent_non_assigned_cytoplasmic' (float): The percentage of non-assigned cytoplasmic transcripts. - 'gene_metrics' (pd.DataFrame): A dataframe containing gene-level metrics.

Source code in src/segger/data/_utils.py
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
def compute_transcript_metrics(
    df: pd.DataFrame, qv_threshold: float = 30, cell_id_col: str = "cell_id"
) -> Dict[str, Any]:
    """
    Computes various metrics for a given dataframe of transcript data filtered by quality value threshold.

    Parameters:
        df (pd.DataFrame): The dataframe containing transcript data.
        qv_threshold (float): The quality value threshold for filtering transcripts.
        cell_id_col (str): The name of the column representing the cell ID.

    Returns:
        Dict[str, Any]: A dictionary containing various transcript metrics:
            - 'percent_assigned' (float): The percentage of assigned transcripts.
            - 'percent_cytoplasmic' (float): The percentage of cytoplasmic transcripts among assigned transcripts.
            - 'percent_nucleus' (float): The percentage of nucleus transcripts among assigned transcripts.
            - 'percent_non_assigned_cytoplasmic' (float): The percentage of non-assigned cytoplasmic transcripts.
            - 'gene_metrics' (pd.DataFrame): A dataframe containing gene-level metrics.
    """
    df_filtered = df[df["qv"] > qv_threshold]
    total_transcripts = len(df_filtered)
    assigned_transcripts = df_filtered[df_filtered[cell_id_col] != -1]
    percent_assigned = len(assigned_transcripts) / (total_transcripts + 1) * 100
    cytoplasmic_transcripts = assigned_transcripts[
        assigned_transcripts["overlaps_nucleus"] != 1
    ]
    percent_cytoplasmic = (
        len(cytoplasmic_transcripts) / (len(assigned_transcripts) + 1) * 100
    )
    percent_nucleus = 100 - percent_cytoplasmic
    non_assigned_transcripts = df_filtered[df_filtered[cell_id_col] == -1]
    non_assigned_cytoplasmic = non_assigned_transcripts[
        non_assigned_transcripts["overlaps_nucleus"] != 1
    ]
    percent_non_assigned_cytoplasmic = (
        len(non_assigned_cytoplasmic) / (len(non_assigned_transcripts) + 1) * 100
    )
    gene_group_assigned = assigned_transcripts.groupby("feature_name")
    gene_group_all = df_filtered.groupby("feature_name")
    gene_percent_assigned = (
        gene_group_assigned.size() / (gene_group_all.size() + 1) * 100
    ).reset_index(names="percent_assigned")
    cytoplasmic_gene_group = cytoplasmic_transcripts.groupby("feature_name")
    gene_percent_cytoplasmic = (
        cytoplasmic_gene_group.size() / (len(cytoplasmic_transcripts) + 1) * 100
    ).reset_index(name="percent_cytoplasmic")
    gene_metrics = pd.merge(
        gene_percent_assigned, gene_percent_cytoplasmic, on="feature_name", how="outer"
    ).fillna(0)
    results = {
        "percent_assigned": percent_assigned,
        "percent_cytoplasmic": percent_cytoplasmic,
        "percent_nucleus": percent_nucleus,
        "percent_non_assigned_cytoplasmic": percent_non_assigned_cytoplasmic,
        "gene_metrics": gene_metrics,
    }
    return results

create_anndata

create_anndata(df, panel_df=None, min_transcripts=5, cell_id_col='cell_id', qv_threshold=30, min_cell_area=10.0, max_cell_area=1000.0)

Generates an AnnData object from a dataframe of segmented transcriptomics data.

Parameters:

Name Type Description Default
df DataFrame

The dataframe containing segmented transcriptomics data.

required
panel_df Optional[DataFrame]

The dataframe containing panel information.

None
min_transcripts int

The minimum number of transcripts required for a cell to be included.

5
cell_id_col str

The column name representing the cell ID in the input dataframe.

'cell_id'
qv_threshold float

The quality value threshold for filtering transcripts.

30
min_cell_area float

The minimum cell area to include a cell.

10.0
max_cell_area float

The maximum cell area to include a cell.

1000.0

Returns:

Type Description
AnnData

ad.AnnData: The generated AnnData object containing the transcriptomics data and metadata.

Source code in src/segger/data/_utils.py
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
def create_anndata(
    df: pd.DataFrame,
    panel_df: Optional[pd.DataFrame] = None,
    min_transcripts: int = 5,
    cell_id_col: str = "cell_id",
    qv_threshold: float = 30,
    min_cell_area: float = 10.0,
    max_cell_area: float = 1000.0,
) -> ad.AnnData:
    """Generates an AnnData object from a dataframe of segmented transcriptomics data.

    Parameters:
        df (pd.DataFrame): The dataframe containing segmented transcriptomics data.
        panel_df (Optional[pd.DataFrame]): The dataframe containing panel information.
        min_transcripts (int): The minimum number of transcripts required for a cell to be included.
        cell_id_col (str): The column name representing the cell ID in the input dataframe.
        qv_threshold (float): The quality value threshold for filtering transcripts.
        min_cell_area (float): The minimum cell area to include a cell.
        max_cell_area (float): The maximum cell area to include a cell.

    Returns:
        ad.AnnData: The generated AnnData object containing the transcriptomics data and metadata.
    """
    # Filter out unassigned cells
    df_filtered = df[df[cell_id_col].astype(str) != "UNASSIGNED"]

    # Create pivot table for gene expression counts per cell
    pivot_df = df_filtered.rename(
        columns={cell_id_col: "cell", "feature_name": "gene"}
    )[["cell", "gene"]].pivot_table(
        index="cell", columns="gene", aggfunc="size", fill_value=0
    )
    pivot_df = pivot_df[pivot_df.sum(axis=1) >= min_transcripts]

    # Summarize cell metrics
    cell_summary = []
    for cell_id, cell_data in df_filtered.groupby(cell_id_col):
        if len(cell_data) < min_transcripts:
            continue
        cell_convex_hull = ConvexHull(
            cell_data[["x_location", "y_location"]], qhull_options="QJ"
        )
        cell_area = cell_convex_hull.area
        if cell_area < min_cell_area or cell_area > max_cell_area:
            continue
        cell_summary.append(
            {
                "cell": cell_id,
                "cell_centroid_x": cell_data["x_location"].mean(),
                "cell_centroid_y": cell_data["y_location"].mean(),
                "cell_area": cell_area,
            }
        )
    cell_summary = pd.DataFrame(cell_summary).set_index("cell")

    # Add genes from panel_df (if provided) to the pivot table
    if panel_df is not None:
        panel_df = panel_df.sort_values("gene")
        genes = panel_df["gene"].values
        for gene in genes:
            if gene not in pivot_df:
                pivot_df[gene] = 0
        pivot_df = pivot_df[genes.tolist()]

    # Create var DataFrame
    if panel_df is None:
        var_df = pd.DataFrame(
            [
                {"gene": gene, "feature_types": "Gene Expression", "genome": "Unknown"}
                for gene in np.unique(pivot_df.columns.values)
            ]
        ).set_index("gene")
    else:
        var_df = panel_df[["gene", "ensembl"]].rename(columns={"ensembl": "gene_ids"})
        var_df["feature_types"] = "Gene Expression"
        var_df["genome"] = "Unknown"
        var_df = var_df.set_index("gene")

    # Compute total assigned and unassigned transcript counts for each gene
    assigned_counts = df_filtered.groupby("feature_name")["feature_name"].count()
    unassigned_counts = (
        df[df[cell_id_col].astype(str) == "UNASSIGNED"]
        .groupby("feature_name")["feature_name"]
        .count()
    )
    var_df["total_assigned"] = var_df.index.map(assigned_counts).fillna(0).astype(int)
    var_df["total_unassigned"] = (
        var_df.index.map(unassigned_counts).fillna(0).astype(int)
    )

    # Filter cells and create the AnnData object
    cells = list(set(pivot_df.index) & set(cell_summary.index))
    pivot_df = pivot_df.loc[cells, :]
    cell_summary = cell_summary.loc[cells, :]
    adata = ad.AnnData(pivot_df.values)
    adata.var = var_df
    adata.obs["transcripts"] = pivot_df.sum(axis=1).values
    adata.obs["unique_transcripts"] = (pivot_df > 0).sum(axis=1).values
    adata.obs_names = pivot_df.index.values.tolist()
    adata.obs = pd.merge(
        adata.obs,
        cell_summary.loc[adata.obs_names, :],
        left_index=True,
        right_index=True,
    )

    return adata

ensure_transcript_ids

ensure_transcript_ids(parquet_path, x_col, y_col, id_col='transcript_id', precision=1000)

Ensure that a parquet file has transcript IDs by adding them if missing.

Parameters:

Name Type Description Default
parquet_path PathLike

Path to the parquet file.

required
x_col str

Name of the x-coordinate column.

required
y_col str

Name of the y-coordinate column.

required
id_col str

Name of the column to store the transcript IDs. Defaults to "transcript_id".

'transcript_id'
precision int

Precision multiplier for coordinate values to handle floating point precision. Defaults to 1000.

1000
Source code in src/segger/data/_utils.py
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
def ensure_transcript_ids(
    parquet_path: os.PathLike,
    x_col: str,
    y_col: str,
    id_col: str = "transcript_id",
    precision: int = 1000,
) -> None:
    """Ensure that a parquet file has transcript IDs by adding them if missing.

    Args:
        parquet_path: Path to the parquet file.
        x_col: Name of the x-coordinate column.
        y_col: Name of the y-coordinate column.
        id_col: Name of the column to store the transcript IDs. Defaults to "transcript_id".
        precision: Precision multiplier for coordinate values to handle floating point precision.
            Defaults to 1000.
    """
    # First check metadata to see if column exists
    metadata = pq.read_metadata(parquet_path)
    schema_idx = dict(map(reversed, enumerate(metadata.schema.names)))

    # Only proceed if the column doesn't exist
    if id_col not in schema_idx:
        # Read the parquet file
        df = pd.read_parquet(parquet_path)

        # Add transcript IDs
        df = add_transcript_ids(df, x_col, y_col, id_col, precision)

        # Convert DataFrame to Arrow table
        table = pa.Table.from_pandas(df)

        # Write back to parquet
        pq.write_table(
            table,
            parquet_path,
            version="2.6",  # Use latest stable version
            write_statistics=True,  # Ensure statistics are written
            compression="snappy",  # Use snappy compression for better performance
        )

filter_boundaries

filter_boundaries(boundaries, inset, outset, x, y, label)

Filter boundary polygons based on their overlap with specified inset and outset regions.

Parameters:

Name Type Description Default
boundaries DataFrame

A DataFrame containing the boundary data with x and y coordinates and identifiers.

required
inset Polygon

A polygon representing the inner region to filter the boundaries.

required
outset Polygon

A polygon representing the outer region to filter the boundaries.

required
x str

The name of the column representing the x-coordinate.

required
y str

The name of the column representing the y-coordinate.

required
label str

The name of the column representing the cell or nucleus label.

required

Returns:

Type Description

pd.DataFrame: A DataFrame containing the filtered boundary polygons.

Note

The function determines overlaps of boundary polygons with the specified inset and outset regions. It creates boolean masks for overlaps with the top, left, right, and bottom sides of the outset region, as well as the center region defined by the inset polygon. The filtering logic includes polygons that: - Are completely within the center region. - Overlap with the center and the left side, but not the bottom side. - Overlap with the center and the top side, but not the right side.

Source code in src/segger/data/_utils.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def filter_boundaries(
    boundaries: pd.DataFrame,
    inset: shapely.Polygon,
    outset: shapely.Polygon,
    x: str,
    y: str,
    label: str,
):
    """Filter boundary polygons based on their overlap with specified inset and
    outset regions.

    Args:
        boundaries: A DataFrame containing the boundary data with x and y coordinates and
            identifiers.
        inset: A polygon representing the inner region to filter the boundaries.
        outset: A polygon representing the outer region to filter the boundaries.
        x: The name of the column representing the x-coordinate.
        y: The name of the column representing the y-coordinate.
        label: The name of the column representing the cell or nucleus label.

    Returns:
        pd.DataFrame: A DataFrame containing the filtered boundary polygons.

    Note:
        The function determines overlaps of boundary polygons with the specified
        inset and outset regions. It creates boolean masks for overlaps with the
        top, left, right, and bottom sides of the outset region, as well as the
        center region defined by the inset polygon. The filtering logic includes
        polygons that:
        - Are completely within the center region.
        - Overlap with the center and the left side, but not the bottom side.
        - Overlap with the center and the top side, but not the right side.
    """

    # Determine overlaps of boundary polygons
    def in_region(region):
        """Check if boundaries are within a specified region.

        Args:
            region: Shapely polygon defining the region to check.

        Returns:
            pd.Series: Boolean mask indicating which boundaries are in the region.
        """
        in_x = boundaries[x].between(region.bounds[0], region.bounds[2])
        in_y = boundaries[y].between(region.bounds[1], region.bounds[3])
        return in_x & in_y

    x1, y1, x4, y4 = outset.bounds
    x2, y2, x3, y3 = inset.bounds
    boundaries["top"] = in_region(shapely.box(x1, y1, x4, y2))
    boundaries["left"] = in_region(shapely.box(x1, y1, x2, y4))
    boundaries["right"] = in_region(shapely.box(x3, y1, x4, y4))
    boundaries["bottom"] = in_region(shapely.box(x1, y3, x4, y4))
    boundaries["center"] = in_region(inset)

    # Filter boundary polygons
    # Include overlaps with top and left, not bottom and right
    gb = boundaries.groupby(label, sort=False)
    total = gb["center"].transform("size")
    in_top = gb["top"].transform("sum")
    in_left = gb["left"].transform("sum")
    in_right = gb["right"].transform("sum")
    in_bottom = gb["bottom"].transform("sum")
    in_center = gb["center"].transform("sum")
    keep = in_center == total
    keep |= (in_center > 0) & (in_left > 0) & (in_bottom == 0)
    keep |= (in_center > 0) & (in_top > 0) & (in_right == 0)
    inset_boundaries = boundaries.loc[keep]
    return inset_boundaries

filter_transcripts

filter_transcripts(transcripts_df, label=None, filter_substrings=None, qv_column=None, min_qv=None)

Filter transcripts based on quality value and remove unwanted transcripts.

Parameters:

Name Type Description Default
transcripts_df DataFrame

The dataframe containing transcript data.

required
label Optional[str]

The label of transcript features. Defaults to None.

None
filter_substrings Optional[List[str]]

The list of feature substrings to remove. Defaults to None.

None
qv_column Optional[str]

The name of the column representing the quality value. Defaults to None.

None
min_qv Optional[float]

The minimum quality value threshold for filtering transcripts. Defaults to None.

None

Returns:

Type Description
DataFrame

pd.DataFrame: The filtered dataframe.

Source code in src/segger/data/_utils.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def filter_transcripts(
    transcripts_df: pd.DataFrame,
    label: Optional[str] = None,
    filter_substrings: Optional[List[str]] = None,
    qv_column: Optional[str] = None,
    min_qv: Optional[float] = None,
) -> pd.DataFrame:
    """Filter transcripts based on quality value and remove unwanted transcripts.

    Args:
        transcripts_df: The dataframe containing transcript data.
        label: The label of transcript features. Defaults to None.
        filter_substrings: The list of feature substrings to remove. Defaults to None.
        qv_column: The name of the column representing the quality value. Defaults to None.
        min_qv: The minimum quality value threshold for filtering transcripts. Defaults to None.

    Returns:
        pd.DataFrame: The filtered dataframe.
    """
    mask = pd.Series(True, index=transcripts_df.index)
    if filter_substrings is not None and label is not None:
        mask &= ~transcripts_df[label].str.startswith(tuple(filter_substrings))
    if min_qv is not None and qv_column is not None:
        mask &= transcripts_df[qv_column].ge(min_qv)
    return transcripts_df[mask]

find_markers

find_markers(adata, cell_type_column, pos_percentile=5, neg_percentile=10, percentage=50)

Identify positive and negative marker genes for each cell type in an AnnData object.

Positive markers are top-ranked genes that are expressed in at least percentage percent of cells in the given cell type.

Parameters:

Name Type Description Default
adata AnnData

Annotated data object containing gene expression data and cell type annotations.

required
cell_type_column str

Name of the column in adata.obs specifying cell type identity for each cell.

required
pos_percentile float

Percentile threshold for selecting top highly expressed genes as positive markers. Defaults to 5.

5
neg_percentile float

Percentile threshold for selecting lowest expressed genes as negative markers. Defaults to 10.

10
percentage float

Minimum percent of cells (0-100) in a cell type expressing a gene for it to be a marker. Defaults to 50.

50

Returns:

Name Type Description
dict Dict[str, Dict[str, List[str]]]

Dictionary mapping cell type names to: { 'positive': [list of positive marker gene names], 'negative': [list of negative marker gene names] }

Source code in src/segger/data/_utils.py
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
def find_markers(
    adata: ad.AnnData,
    cell_type_column: str,
    pos_percentile: float = 5,
    neg_percentile: float = 10,
    percentage: float = 50,
) -> Dict[str, Dict[str, List[str]]]:
    """Identify positive and negative marker genes for each cell type in an AnnData object.

    Positive markers are top-ranked genes that are expressed in at least
    `percentage` percent of cells in the given cell type.

    Args:
        adata: Annotated data object containing gene expression data and cell type annotations.
        cell_type_column: Name of the column in `adata.obs` specifying cell type identity for each cell.
        pos_percentile: Percentile threshold for selecting top highly expressed genes as positive markers. Defaults to 5.
        neg_percentile: Percentile threshold for selecting lowest expressed genes as negative markers. Defaults to 10.
        percentage: Minimum percent of cells (0-100) in a cell type expressing a gene for it to be a marker. Defaults to 50.

    Returns:
        dict: Dictionary mapping cell type names to:
            {
                'positive': [list of positive marker gene names],
                'negative': [list of negative marker gene names]
            }
    """
    markers = {}
    sc.tl.rank_genes_groups(adata, groupby=cell_type_column)
    genes = np.array(adata.var_names)
    n_genes = adata.shape[1]

    # Work with a dense matrix for expression fraction calculation
    # (convert sparse to dense if needed)
    if not isinstance(adata.X, np.ndarray):
        expr_matrix = adata.X.toarray()
    else:
        expr_matrix = adata.X

    for cell_type in adata.obs[cell_type_column].unique():
        mask = (adata.obs[cell_type_column] == cell_type).values
        gene_names = np.array(adata.uns['rank_genes_groups']['names'][cell_type])

        n_pos = max(1, int(n_genes * pos_percentile // 100))
        n_neg = max(1, int(n_genes * neg_percentile // 100))

        # Calculate percent of cells in this cell type expressing each gene
        expr_frac = (expr_matrix[mask] > 0).mean(axis=0) * 100  # as percent

        # Filter positive markers by expression fraction
        pos_indices = []
        for idx in range(n_pos):
            gene = gene_names[idx]
            gene_idx = np.where(genes == gene)[0][0]
            if expr_frac[gene_idx] >= percentage:
                pos_indices.append(idx)
        positive_markers = list(gene_names[pos_indices])

        # Negative markers are the lowest-ranked
        negative_markers = list(gene_names[-n_neg:])

        markers[cell_type] = {
            "positive": positive_markers,
            "negative": negative_markers
        }
    return markers

find_mutually_exclusive_genes

find_mutually_exclusive_genes(adata, markers, cell_type_column)

Identify mutually exclusive genes based on expression criteria.

Parameters:

Name Type Description Default
adata AnnData

Annotated data object containing gene expression data.

required
markers Dict[str, Dict[str, List[str]]]

Dictionary where keys are cell types and values are dictionaries containing: 'positive': list of top x% highly expressed genes 'negative': list of top x% lowly expressed genes.

required
cell_type_column str

Column name in adata.obs that specifies cell types.

required

Returns:

Name Type Description
list List[Tuple[str, str]]

List of mutually exclusive gene pairs.

Source code in src/segger/data/_utils.py
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
def find_mutually_exclusive_genes(
    adata: ad.AnnData, markers: Dict[str, Dict[str, List[str]]], cell_type_column: str
) -> List[Tuple[str, str]]:
    """Identify mutually exclusive genes based on expression criteria.

    Args:
        adata: Annotated data object containing gene expression data.
        markers: Dictionary where keys are cell types and values are dictionaries containing:
            'positive': list of top x% highly expressed genes
            'negative': list of top x% lowly expressed genes.
        cell_type_column: Column name in `adata.obs` that specifies cell types.

    Returns:
        list: List of mutually exclusive gene pairs.
    """
    exclusive_genes = {}
    all_exclusive = []
    gene_expression = adata.to_df()
    for cell_type, marker_sets in markers.items():
        positive_markers = marker_sets["positive"]
        exclusive_genes[cell_type] = []
        for gene in positive_markers:
            gene_expr = adata[:, gene].X
            cell_type_mask = adata.obs[cell_type_column] == cell_type
            non_cell_type_mask = ~cell_type_mask
            if (gene_expr[cell_type_mask] > 0).mean() > 0.2 and (
                gene_expr[non_cell_type_mask] > 0
            ).mean() < 0.05:
                exclusive_genes[cell_type].append(gene)
                all_exclusive.append(gene)
    unique_genes = list(
        {
            gene
            for i in exclusive_genes.keys()
            for gene in exclusive_genes[i]
            if gene in all_exclusive
        }
    )
    filtered_exclusive_genes = {
        i: [gene for gene in exclusive_genes[i] if gene in unique_genes]
        for i in exclusive_genes.keys()
    }
    mutually_exclusive_gene_pairs = [
        tuple(sorted((gene1, gene2)))
        for key1, key2 in combinations(filtered_exclusive_genes.keys(), 2)
        if key1 != key2
        for gene1 in filtered_exclusive_genes[key1]
        for gene2 in filtered_exclusive_genes[key2]
    ]
    return set(mutually_exclusive_gene_pairs)

format_time

format_time(elapsed)

Format elapsed time to hâ“‚s.

Parameters:

elapsed : float Elapsed time in seconds.

Returns:

str Formatted time in hâ“‚s.

Source code in src/segger/data/_utils.py
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
def format_time(elapsed: float) -> str:
    """
    Format elapsed time to h:m:s.

    Parameters:
    ----------
    elapsed : float
        Elapsed time in seconds.

    Returns:
    -------
    str
        Formatted time in h:m:s.
    """
    return str(timedelta(seconds=int(elapsed)))

get_edge_index

get_edge_index(coords_1, coords_2, k=5, dist=10, method='kd_tree', workers=1)

Computes edge indices using KD-Tree.

Parameters:

Name Type Description Default
coords_1 ndarray

First set of coordinates.

required
coords_2 ndarray

Second set of coordinates.

required
k int

Number of nearest neighbors.

5
dist int

Distance threshold.

10
method str

The method to use. Only 'kd_tree' is supported now.

'kd_tree'

Returns:

Type Description
Tensor

torch.Tensor: Edge indices.

Source code in src/segger/data/_utils.py
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
def get_edge_index(
    coords_1: np.ndarray,
    coords_2: np.ndarray,
    k: int = 5,
    dist: int = 10,
    method: str = "kd_tree",
    workers: int = 1,
) -> torch.Tensor:
    """Computes edge indices using KD-Tree.

    Parameters:
        coords_1 (np.ndarray): First set of coordinates.
        coords_2 (np.ndarray): Second set of coordinates.
        k (int, optional): Number of nearest neighbors.
        dist (int, optional): Distance threshold.
        method (str, optional): The method to use. Only 'kd_tree' is supported now.

    Returns:
        torch.Tensor: Edge indices.
    """
    if method == "kd_tree":
        return get_edge_index_kdtree(
            coords_1, coords_2, k=k, dist=dist, workers=workers
        )
    # elif method == "cuda":
    #     return get_edge_index_cuda(coords_1, coords_2, k=k, dist=dist)
    else:
        msg = f"Unknown method {method}. The only supported method is 'kd_tree' now."
        raise ValueError(msg)

get_edge_index_kdtree

get_edge_index_kdtree(coords_1, coords_2, k=5, dist=10, workers=1)

Computes edge indices using KDTree.

Parameters:

Name Type Description Default
coords_1 ndarray

First set of coordinates.

required
coords_2 ndarray

Second set of coordinates.

required
k int

Number of nearest neighbors.

5
dist int

Distance threshold.

10

Returns:

Type Description
Tensor

torch.Tensor: Edge indices.

Source code in src/segger/data/_utils.py
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
def get_edge_index_kdtree(
    coords_1: np.ndarray,
    coords_2: np.ndarray,
    k: int = 5,
    dist: int = 10,
    workers: int = 1,
) -> torch.Tensor:
    """Computes edge indices using KDTree.

    Parameters:
        coords_1 (np.ndarray): First set of coordinates.
        coords_2 (np.ndarray): Second set of coordinates.
        k (int, optional): Number of nearest neighbors.
        dist (int, optional): Distance threshold.

    Returns:
        torch.Tensor: Edge indices.
    """
    if isinstance(coords_1, torch.Tensor):
        coords_1 = coords_1.cpu().numpy()
    if isinstance(coords_2, torch.Tensor):
        coords_2 = coords_2.cpu().numpy()
    tree = cKDTree(coords_1)
    d_kdtree, idx_out = tree.query(
        coords_2, k=k, distance_upper_bound=dist, workers=workers
    )
    valid_mask = d_kdtree < dist
    edges = []

    for idx, valid in enumerate(valid_mask):
        valid_indices = idx_out[idx][valid]
        if valid_indices.size > 0:
            edges.append(
                np.vstack((np.full(valid_indices.shape, idx), valid_indices)).T
            )

    edge_index = torch.tensor(np.vstack(edges), dtype=torch.long).contiguous()
    return edge_index

get_polygons_from_xy

get_polygons_from_xy(boundaries, x, y, label, scale_factor=1.0)

Convert boundary coordinates from a DataFrame to a GeoSeries of polygons.

Parameters:

Name Type Description Default
boundaries DataFrame

A DataFrame containing the boundary data with x and y coordinates and identifiers.

required
x str

The name of the column representing the x-coordinate.

required
y str

The name of the column representing the y-coordinate.

required
label str

The name of the column representing the cell or nucleus label.

required
scale_factor float

A ratio to scale the polygons. A value of 1.0 means no change, greater than 1.0 expands the polygons, and less than 1.0 shrinks the polygons. Defaults to 1.0.

1.0

Returns:

Type Description
GeoSeries

gpd.GeoSeries: A GeoSeries containing the polygons created from the boundary coordinates.

Source code in src/segger/data/_utils.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def get_polygons_from_xy(
    boundaries: pd.DataFrame,
    x: str,
    y: str,
    label: str,
    scale_factor: float = 1.0,
) -> gpd.GeoSeries:
    """Convert boundary coordinates from a DataFrame to a GeoSeries of polygons.

    Args:
        boundaries: A DataFrame containing the boundary data with x and y coordinates
            and identifiers.
        x: The name of the column representing the x-coordinate.
        y: The name of the column representing the y-coordinate.
        label: The name of the column representing the cell or nucleus label.
        scale_factor: A ratio to scale the polygons. A value of 1.0 means no change,
            greater than 1.0 expands the polygons, and less than 1.0 shrinks the polygons.
            Defaults to 1.0.

    Returns:
        gpd.GeoSeries: A GeoSeries containing the polygons created from the boundary
            coordinates.
    """
    # Polygon offsets in coords
    ids = boundaries[label].values
    splits = np.where(ids[:-1] != ids[1:])[0] + 1
    geometry_offset = np.hstack([0, splits, len(ids)])
    part_offset = np.arange(len(np.unique(ids)) + 1)

    # Convert to GeoSeries of polygons
    polygons = shapely.from_ragged_array(
        shapely.GeometryType.POLYGON,
        coords=boundaries[[x, y]].values.copy(order="C"),
        offsets=(geometry_offset, part_offset),
    )
    gs = gpd.GeoSeries(polygons, index=np.unique(ids))

    # print(gs)

    if scale_factor != 1.0:
        # Scale polygons around their centroid
        gs = gpd.GeoSeries(
            [
                scale(geom, xfact=scale_factor, yfact=scale_factor, origin='centroid')
                for geom in gs
            ],
            index=gs.index,
        )
        # print(gs)

    return gs

get_xy_extents

get_xy_extents(filepath, x, y)

Get the bounding box of the x and y coordinates from a Parquet file.

Parameters

filepath : str The path to the Parquet file. x : str The name of the column representing the x-coordinate. y : str The name of the column representing the y-coordinate.

Returns

shapely.Polygon A polygon representing the bounding box of the x and y coordinates.

Source code in src/segger/data/_utils.py
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
def get_xy_extents(
    filepath,
    x: str,
    y: str,
) -> Tuple[int]:
    """
    Get the bounding box of the x and y coordinates from a Parquet file.

    Parameters
    ----------
    filepath : str
        The path to the Parquet file.
    x : str
        The name of the column representing the x-coordinate.
    y : str
        The name of the column representing the y-coordinate.

    Returns
    -------
    shapely.Polygon
        A polygon representing the bounding box of the x and y coordinates.
    """
    # Get index of columns of parquet file
    metadata = pq.read_metadata(filepath)
    schema_idx = dict(map(reversed, enumerate(metadata.schema.names)))

    # Find min and max values across all row groups
    x_max = -1
    x_min = sys.maxsize
    y_max = -1
    y_min = sys.maxsize
    for i in range(metadata.num_row_groups):
        group = metadata.row_group(i)
        x_min = min(x_min, group.column(schema_idx[x]).statistics.min)
        x_max = max(x_max, group.column(schema_idx[x]).statistics.max)
        y_min = min(y_min, group.column(schema_idx[y]).statistics.min)
        y_max = max(y_max, group.column(schema_idx[y]).statistics.max)
    return x_min, y_min, x_max, y_max

load_settings

load_settings(sample_type)

Load a matching YAML file from the _settings/ directory and convert its contents into a SimpleNamespace.

Parameters:

Name Type Description Default
sample_type str

Name of the sample type to load (case-insensitive).

required

Returns:

Name Type Description
SimpleNamespace SimpleNamespace

The settings loaded from the YAML file as a SimpleNamespace.

Raises:

Type Description
FileNotFoundError

If sample_type does not match any filenames.

Source code in src/segger/data/_utils.py
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
def load_settings(sample_type: str) -> SimpleNamespace:
    """Load a matching YAML file from the _settings/ directory and convert its
    contents into a SimpleNamespace.

    Args:
        sample_type: Name of the sample type to load (case-insensitive).

    Returns:
        SimpleNamespace: The settings loaded from the YAML file as a SimpleNamespace.

    Raises:
        FileNotFoundError: If `sample_type` does not match any filenames.
    """
    settings_dir = Path(__file__).parent.resolve() / "_settings"
    # Get a list of YAML filenames (without extensions) in the _settings dir
    filenames = [file.stem for file in settings_dir.glob("*.yaml")]
    # Convert sample_type to lowercase and check if it matches any filename
    sample_type = sample_type.lower()
    if sample_type not in filenames:
        msg = (
            f"Sample type '{sample_type}' not found in settings. "
            f"Available options: {', '.join(filenames)}"
        )
        raise FileNotFoundError(msg)
    # Load the matching YAML file
    yaml_file_path = settings_dir / f"{sample_type}.yaml"
    with yaml_file_path.open("r") as file:
        data = yaml.safe_load(file)

    # Convert the YAML data into a SimpleNamespace recursively
    return _dict_to_namespace(data)

read_parquet_region

read_parquet_region(filepath, x, y, bounds=None, extra_columns=[], extra_filters=[], row_group_chunksize=None)

Read a region from a Parquet file based on x and y coordinates and optional filters.

Parameters:

Name Type Description Default
filepath

The path to the Parquet file.

required
x str

The name of the column representing the x-coordinate.

required
y str

The name of the column representing the y-coordinate.

required
bounds Polygon

A polygon representing the bounding box to filter the data. If None, no bounding box filter is applied. Defaults to None.

None
extra_columns list[str]

A list of additional columns to include in the output DataFrame. Defaults to [].

[]
extra_filters list[str]

A list of additional filters to apply to the data. Defaults to [].

[]
row_group_chunksize Optional[int]

Chunk size for row group processing. Defaults to None.

None

Returns:

Type Description

pd.DataFrame: A DataFrame containing the filtered data from the Parquet file.

Source code in src/segger/data/_utils.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def read_parquet_region(
    filepath,
    x: str,
    y: str,
    bounds: shapely.Polygon = None,
    extra_columns: list[str] = [],
    extra_filters: list[str] = [],
    row_group_chunksize: Optional[int] = None,
):
    """Read a region from a Parquet file based on x and y coordinates and optional
    filters.

    Args:
        filepath: The path to the Parquet file.
        x: The name of the column representing the x-coordinate.
        y: The name of the column representing the y-coordinate.
        bounds: A polygon representing the bounding box to filter the data. If None,
            no bounding box filter is applied. Defaults to None.
        extra_columns: A list of additional columns to include in the output DataFrame. Defaults to [].
        extra_filters: A list of additional filters to apply to the data. Defaults to [].
        row_group_chunksize: Chunk size for row group processing. Defaults to None.

    Returns:
        pd.DataFrame: A DataFrame containing the filtered data from the Parquet file.
    """
    # Check backend and load dependencies if not already loaded

    # Find bounds of full file if not supplied
    if bounds is None:
        bounds = get_xy_extents(filepath, x, y)

    # Load pre-filtered data from Parquet file
    filters = [
        [
            (x, ">", bounds.bounds[0]),
            (y, ">", bounds.bounds[1]),
            (x, "<", bounds.bounds[2]),
            (y, "<", bounds.bounds[3]),
        ]
        + extra_filters
    ]

    columns = list({x, y} | set(extra_columns))

    # Check if 'Geometry', 'geometry', 'polygon', or 'Polygon' is in the columns
    if any(col in columns for col in ['Geometry', 'geometry', 'polygon', 'Polygon']):
        import geopandas as gpd
        # If geometry columns are present, read with geopandas
        region = gpd.read_parquet(
            filepath,
            filters=filters,
            columns=columns,
        )
    else:
        # Otherwise, read with pandas
        region = pd.read_parquet(
            filepath,
            filters=filters,
            columns=columns,
        )
    return region