Skip to content

QTMpandas module

QTMPandas

Source code in vgridpandas/qtmpandas.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
@pd.api.extensions.register_dataframe_accessor("qtm")
class QTMPandas:
    def __init__(self, df: DataFrame):
        self._df = df

    def latlon2qtm(
        self,
        resolution: int,
        lat_col: str = "lat",
        lon_col: str = "lon",
        set_index: bool = False,
    ) -> AnyDataFrame:
        """Adds qtm ID to (Geo)DataFrame.

        pd.DataFrame: uses `lat_col` and `lon_col` (default `lat` and `lon`)
        gpd.GeoDataFrame: uses `geometry`

        Assumes coordinates in epsg=4326.
        """
        if isinstance(self._df, gpd.GeoDataFrame):
            lons = self._df.geometry.x
            lats = self._df.geometry.y
        else:
            lons = self._df[lon_col]
            lats = self._df[lat_col]

        qtm_ids = [latlon_to_qtm(lat, lon, resolution) for lat, lon in zip(lats, lons)]

        qtm_col = QTM_COL
        assign_arg = {qtm_col: qtm_ids, f"{qtm_col}_res": resolution}
        df = self._df.assign(**assign_arg)
        if set_index:
            return df.set_index(qtm_col)
        return df

    def qtm2geo(self, qtm_col: str = None) -> GeoDataFrame:
        """Add geometry with QTM geometry to the DataFrame."""
        if qtm_col is not None:
            if qtm_col not in self._df.columns:
                raise ValueError(f"Column '{qtm_col}' not found in DataFrame")
            ids = self._df[qtm_col]
        else:
            if QTM_COL not in self._df.columns:
                raise ValueError(f"Column '{QTM_COL}' not found in DataFrame")
            ids = self._df[QTM_COL]
        return dggs_ids_to_geodataframe(self._df, ids, qtm_to_geo)

    def polyfill(
        self,
        resolution: int,
        predicate: str = None,
        compact: bool = False,
        explode: bool = False,
    ) -> AnyDataFrame:
        """
        Fill geometries with QTM cell ids at the target resolution.

        When ``compact=True``, ids may span multiple resolutions after compaction
        (same as ``vector2qtm``). Use ``explode=True`` before ``qtm2geo`` for one
        cell geometry per row.
        """
        result = self._df.geometry.apply(
            lambda geom: polyfill_row(geom, resolution, predicate, compact)
        )

        if not explode:
            assign_args = {QTM_COL: result}
            return self._df.assign(**assign_args)

        result = result.explode().to_frame(QTM_COL)
        return self._df.join(result)

    def qtmbin(
        self,
        resolution: int,
        stats: str = "count",
        numeric_col: str = None,
        category_col: str = None,
        lat_col: str = "lat",
        lon_col: str = "lon",
    ) -> GeoDataFrame:
        """Bin points into qtm cells and compute statistics."""
        qtm_col = QTM_COL
        df = self.latlon2qtm(resolution, lat_col, lon_col)
        result = aggregate_bin(df, qtm_col, stats, numeric_col, category_col)
        return result.qtm.qtm2geo(qtm_col=qtm_col)

latlon2qtm(resolution, lat_col='lat', lon_col='lon', set_index=False)

Adds qtm ID to (Geo)DataFrame.

pd.DataFrame: uses lat_col and lon_col (default lat and lon) gpd.GeoDataFrame: uses geometry

Assumes coordinates in epsg=4326.

Source code in vgridpandas/qtmpandas.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def latlon2qtm(
    self,
    resolution: int,
    lat_col: str = "lat",
    lon_col: str = "lon",
    set_index: bool = False,
) -> AnyDataFrame:
    """Adds qtm ID to (Geo)DataFrame.

    pd.DataFrame: uses `lat_col` and `lon_col` (default `lat` and `lon`)
    gpd.GeoDataFrame: uses `geometry`

    Assumes coordinates in epsg=4326.
    """
    if isinstance(self._df, gpd.GeoDataFrame):
        lons = self._df.geometry.x
        lats = self._df.geometry.y
    else:
        lons = self._df[lon_col]
        lats = self._df[lat_col]

    qtm_ids = [latlon_to_qtm(lat, lon, resolution) for lat, lon in zip(lats, lons)]

    qtm_col = QTM_COL
    assign_arg = {qtm_col: qtm_ids, f"{qtm_col}_res": resolution}
    df = self._df.assign(**assign_arg)
    if set_index:
        return df.set_index(qtm_col)
    return df

polyfill(resolution, predicate=None, compact=False, explode=False)

Fill geometries with QTM cell ids at the target resolution.

When compact=True, ids may span multiple resolutions after compaction (same as vector2qtm). Use explode=True before qtm2geo for one cell geometry per row.

Source code in vgridpandas/qtmpandas.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def polyfill(
    self,
    resolution: int,
    predicate: str = None,
    compact: bool = False,
    explode: bool = False,
) -> AnyDataFrame:
    """
    Fill geometries with QTM cell ids at the target resolution.

    When ``compact=True``, ids may span multiple resolutions after compaction
    (same as ``vector2qtm``). Use ``explode=True`` before ``qtm2geo`` for one
    cell geometry per row.
    """
    result = self._df.geometry.apply(
        lambda geom: polyfill_row(geom, resolution, predicate, compact)
    )

    if not explode:
        assign_args = {QTM_COL: result}
        return self._df.assign(**assign_args)

    result = result.explode().to_frame(QTM_COL)
    return self._df.join(result)

qtm2geo(qtm_col=None)

Add geometry with QTM geometry to the DataFrame.

Source code in vgridpandas/qtmpandas.py
171
172
173
174
175
176
177
178
179
180
181
def qtm2geo(self, qtm_col: str = None) -> GeoDataFrame:
    """Add geometry with QTM geometry to the DataFrame."""
    if qtm_col is not None:
        if qtm_col not in self._df.columns:
            raise ValueError(f"Column '{qtm_col}' not found in DataFrame")
        ids = self._df[qtm_col]
    else:
        if QTM_COL not in self._df.columns:
            raise ValueError(f"Column '{QTM_COL}' not found in DataFrame")
        ids = self._df[QTM_COL]
    return dggs_ids_to_geodataframe(self._df, ids, qtm_to_geo)

qtmbin(resolution, stats='count', numeric_col=None, category_col=None, lat_col='lat', lon_col='lon')

Bin points into qtm cells and compute statistics.

Source code in vgridpandas/qtmpandas.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def qtmbin(
    self,
    resolution: int,
    stats: str = "count",
    numeric_col: str = None,
    category_col: str = None,
    lat_col: str = "lat",
    lon_col: str = "lon",
) -> GeoDataFrame:
    """Bin points into qtm cells and compute statistics."""
    qtm_col = QTM_COL
    df = self.latlon2qtm(resolution, lat_col, lon_col)
    result = aggregate_bin(df, qtm_col, stats, numeric_col, category_col)
    return result.qtm.qtm2geo(qtm_col=qtm_col)

poly2qtm(geometry, resolution, predicate=None, compact=False)

Convert polygon or line geometries to QTM cells.

Mirrors polygon2qtm / polyline2qtm in vgrid: walk the facet tree at the target resolution, then optionally compact the id set (same as vector2qtm).

Source code in vgridpandas/qtmpandas.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def poly2qtm(
    geometry: Union[MultiPolyOrPoly, MultiLineOrLine],
    resolution: int,
    predicate: str = None,
    compact: bool = False,
) -> List[str]:
    """
    Convert polygon or line geometries to QTM cells.

    Mirrors ``polygon2qtm`` / ``polyline2qtm`` in vgrid: walk the facet tree at the
    target resolution, then optionally compact the id set (same as ``vector2qtm``).
    """
    resolution = validate_qtm_resolution(resolution)
    qtm_ids = []
    if isinstance(geometry, (Polygon, LineString)):
        polys = [geometry]
    elif isinstance(geometry, (MultiPolygon, MultiLineString)):
        polys = list(geometry.geoms)
    else:
        return []

    is_line = isinstance(geometry, (LineString, MultiLineString))
    for poly in polys:
        level_facets = {}
        qtm_id_by_level = {}
        for lvl in range(resolution):
            level_facets[lvl] = []
            qtm_id_by_level[lvl] = []
            if lvl == 0:
                for i, facet in enumerate(INITIAL_FACETS):
                    qtm_id_by_level[0].append(str(i + 1))
                    level_facets[0].append(facet)
                    facet_geom = constructGeometry(facet)
                    if Polygon(facet_geom).intersects(poly) and resolution == 1:
                        qtm_ids.append(qtm_id_by_level[0][i])
                        break
                if resolution == 1:
                    continue
            else:
                for i, parent_facet in enumerate(level_facets[lvl - 1]):
                    for j, subfacet in enumerate(divideFacet(parent_facet)):
                        subfacet_geom = constructGeometry(subfacet)
                        if not Polygon(subfacet_geom).intersects(poly):
                            continue
                        new_id = qtm_id_by_level[lvl - 1][i] + str(j)
                        qtm_id_by_level[lvl].append(new_id)
                        level_facets[lvl].append(subfacet)
                        if lvl == resolution - 1:
                            if is_line or check_predicate(
                                Polygon(subfacet_geom), poly, predicate
                            ):
                                qtm_ids.append(new_id)

    if compact and qtm_ids:
        return qtm_compact(qtm_ids)
    return qtm_ids

polyfill_row(geometry, resolution, predicate=None, compact=False)

Return cell ids covering a single row geometry.

Source code in vgridpandas/qtmpandas.py
123
124
125
126
127
128
129
130
131
132
133
def polyfill_row(geometry, resolution, predicate=None, compact=False) -> list:
    """Return cell ids covering a single row geometry."""
    if isinstance(geometry, (Polygon, MultiPolygon)):
        tokens = set(poly2qtm(geometry, resolution, predicate, compact))
    elif isinstance(geometry, (LineString, MultiLineString)):
        tokens = set(
            poly2qtm(geometry, resolution, predicate="intersect", compact=False)
        )
    else:
        raise TypeError(f"Unknown type {type(geometry)}")
    return list(tokens)