Skip to content

Utils API

obia.utils.image

obia.utils.image

apply_clahe(image)

Applies CLAHE (Contrast Limited Adaptive Histogram Equalization) to an image for improving contrast in both grayscale and color images.

CLAHE works by dividing the image into small tiles and enhancing each tile separately. It is particularly useful for improving the visibility of features in images with varying lighting conditions or shadows.

:param image: The input image to which CLAHE should be applied. This can be either a grayscale or a color image. If the image has three dimensions, it is assumed to be in the BGR color format where CLAHE is applied to each channel separately. :return: The image with applied CLAHE, having enhanced contrast compared to the original input.

Source code in obia/utils/image.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def apply_clahe(image):
    """
    Applies CLAHE (Contrast Limited Adaptive Histogram Equalization) to an
    image for improving contrast in both grayscale and color images.

    CLAHE works by dividing the image into small tiles and enhancing each
    tile separately. It is particularly useful for improving the visibility
    of features in images with varying lighting conditions or shadows.

    :param image: The input image to which CLAHE should be applied. This can
                  be either a grayscale or a color image. If the image has
                  three dimensions, it is assumed to be in the BGR color
                  format where CLAHE is applied to each channel separately.
    :return: The image with applied CLAHE, having enhanced contrast compared
             to the original input.
    """
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))

    if image.ndim == 3:
        channels = cv2.split(image)
        clahe_channels = [clahe.apply(ch) for ch in channels]
        image_clahe = cv2.merge(clahe_channels)
    else:
        image_clahe = clahe.apply(image)

    return image_clahe

apply_histogram_equalization(image)

Apply histogram equalization to the input image. Histogram equalization enhances the contrast of an image by effectively spreading out the most frequent intensity values. This function supports both grayscale and RGB images, converting them to grayscale before applying the equalization process. The equalized grayscale image is then stacked into a 3-channel image.

:param image: The image to be equalized. It can be a 2D array for grayscale or a 3D array for RGB images. :type image: numpy.ndarray

:return: A new image where the histogram has been equalized. The returned image will be a 3-channel RGB image irrespective of whether the input was grayscale or RGB. :rtype: numpy.ndarray

Source code in obia/utils/image.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def apply_histogram_equalization(image):
    """
    Apply histogram equalization to the input image. Histogram equalization
    enhances the contrast of an image by effectively spreading out the most
    frequent intensity values. This function supports both grayscale and
    RGB images, converting them to grayscale before applying the equalization
    process. The equalized grayscale image is then stacked into a 3-channel
    image.

    :param image:
        The image to be equalized. It can be a 2D array for grayscale or a
        3D array for RGB images.
    :type image: numpy.ndarray

    :return:
        A new image where the histogram has been equalized. The returned
        image will be a 3-channel RGB image irrespective of whether the
        input was grayscale or RGB.
    :rtype: numpy.ndarray
    """
    if image.ndim == 3:
        image_gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
    else:
        image_gray = image

    equalized_img = cv2.equalizeHist(image_gray)

    return np.stack((equalized_img,) * 3, axis=-1)

rescale_to_8bit(image, min=2, max=98)

Rescales an image to an 8-bit unsigned integer representation by stretching and clamping its intensity values between specified percentile ranges. This transformation enhances the dynamic range of the image and is useful for various image processing tasks.

:param image: A NumPy array representing the input image to be rescaled. This image should have intensity values that need adjustment. :param min: The lower percentile cutoff for rescaling, with a default value of 2, indicating that the intensity values below this percentile will be clipped. :param max: The upper percentile cutoff for rescaling, with a default value of 98, indicating that the intensity values above this percentile will be clipped. :return: A NumPy array of the same shape as the input image, with the intensity values rescaled to the range [0, 255] as 8-bit unsigned integers.

Source code in obia/utils/image.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def rescale_to_8bit(image, min=2, max=98):
    """
    Rescales an image to an 8-bit unsigned integer representation by stretching
    and clamping its intensity values between specified percentile ranges. This
    transformation enhances the dynamic range of the image and is useful for
    various image processing tasks.

    :param image: A NumPy array representing the input image to be rescaled.
                  This image should have intensity values that need adjustment.
    :param min: The lower percentile cutoff for rescaling, with a default
                value of 2, indicating that the intensity values below this
                percentile will be clipped.
    :param max: The upper percentile cutoff for rescaling, with a default
                value of 98, indicating that the intensity values above this
                percentile will be clipped.
    :return: A NumPy array of the same shape as the input image, with the
             intensity values rescaled to the range [0, 255] as 8-bit
             unsigned integers.
    """
    p_min, p_max = np.percentile(image, (min, max))

    if p_min == p_max:
        return np.zeros(image.shape, dtype=np.uint8)

    scaled_image = 255 * (image - p_min) / (p_max - p_min)

    scaled_image = np.clip(scaled_image, 0, 255)

    return scaled_image.astype(np.uint8)

rgb_to_gray(rgb)

ITU-R 601 grayscale (expects float32 array in [0,1]).

Source code in obia/utils/image.py
 97
 98
 99
100
def rgb_to_gray(rgb):
    """ITU-R 601 grayscale (expects float32 array in [0,1])."""
    coeffs = np.array([0.299, 0.587, 0.114], dtype=np.float32)
    return (rgb * coeffs).sum(axis=-1)

variance_of_laplacian(gray, win)

Local variance of the 3×3 Laplacian, window = win×win pixels.

Source code in obia/utils/image.py
103
104
105
106
107
108
def variance_of_laplacian(gray, win):
    """Local variance of the 3×3 Laplacian, window = win×win pixels."""
    lap = cv2.Laplacian(gray, cv2.CV_32F, ksize=3)
    mean   = uniform_filter(lap,        size=win)
    mean2  = uniform_filter(lap * lap,  size=win)
    return mean2 - mean ** 2           # σ² = E[X²] − E[X]²

obia.utils.tiling

obia.utils.tiling

create_tiled_segments(input_raster, output_dir, input_mask=None, method='slic', tile_size=200, buffer=30, crown_radius=5, **kwargs)

:param input_raster: Path to the input raster file. :param output_dir: Directory where output files will be saved. :param input_mask: Optional path to an input mask file for masking specific regions. :param method: Segmentation method to be used, defaults to "slic". Currently, only the 'slic' method is supported. :param tile_size: Size of the tiles into which the raster is divided, default is 200. :param buffer: Buffer size for tile overlap, default is 30. :param crown_radius: Radius used to compute the number of segments/crowns, default is 5. :param kwargs: Additional keyword arguments passed to the segmentation function. :return: None

Source code in obia/utils/tiling.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def create_tiled_segments(input_raster, output_dir, input_mask=None,
                          method="slic", tile_size=200, buffer=30, crown_radius=5,
                          **kwargs):
    """
    :param input_raster: Path to the input raster file.
    :param output_dir: Directory where output files will be saved.
    :param input_mask: Optional path to an input mask file for masking specific regions.
    :param method: Segmentation method to be used, defaults to "slic". Currently, only the 'slic' method is supported.
    :param tile_size: Size of the tiles into which the raster is divided, default is 200.
    :param buffer: Buffer size for tile overlap, default is 30.
    :param crown_radius: Radius used to compute the number of segments/crowns, default is 5.
    :param kwargs: Additional keyword arguments passed to the segmentation function.
    :return: None
    """
    if method != "slic":
        raise ValueError("Currently, only the 'slic' method is supported for segmentation.")
    # buffer = buffer * 2
    gdal = _require_gdal()
    dataset = gdal.Open(input_raster)
    if not dataset:
        raise ValueError(f"Unable to open {input_raster} or {input_mask}")

    mask_dataset = None
    # todo: mask should be optional
    if input_mask is not None:
        mask_dataset = gdal.Open(input_mask)
        if not mask_dataset:
            raise ValueError(f"Unable to open {input_mask}")

    # todo: dont mask segments that intersect with outer bbox
    outer_bbox = get_raster_bbox(dataset)

    width = dataset.RasterXSize
    height = dataset.RasterYSize

    os.makedirs(output_dir, exist_ok=True)

    tile_index = 0

    all_black_segments = gpd.GeoDataFrame(columns=["geometry"])
    all_white_segments = gpd.GeoDataFrame(columns=["geometry"])

    for j in range(0, height, tile_size):
        for i in range(0, width, tile_size):
            is_white_tile = (i // tile_size + j // tile_size) % 2 != 0

            if is_white_tile:
                continue

            i_offset = i
            j_offset = j

            w = min(tile_size, width - i_offset)
            h = min(tile_size, height - j_offset)

            if w == 0 or h == 0:
                continue

            image = _create_tile(dataset, i_offset, j_offset, w, h)

            mask = None

            if mask_dataset:
                mask = _create_tile(mask_dataset, i_offset, j_offset, w, h, binary_mask=True)

            n_segments = kwargs.get("n_segments", None)
            if n_segments is None:
                geo_transform = dataset.GetGeoTransform()
                pixel_width = geo_transform[1]
                pixel_height = abs(geo_transform[5])
                pixel_area = pixel_width * pixel_height
                crown_area = math.pi * (crown_radius ** 2)
                tree_area = mask.sum() * pixel_area
                n_crowns = round(tree_area / crown_area)
                n_segments = n_crowns
            try:
                segmented_image = create_segments(
                    image=image,
                    mask=mask,
                    n_segments=n_segments,
                    method="slic",
                    **kwargs
                )

                # bbox = segmented_image.total_bounds
                # tile_boundary = box(bbox[0], bbox[1], bbox[2], bbox[3])
                # segmented_image_filtered = segmented_image[~segmented_image.intersects(tile_boundary.boundary)]
                all_black_segments = pd.concat([all_black_segments, segmented_image], ignore_index=True)
            except ValueError:
                print(f"empty tile: ({j}) ({i})")

            tile_index += 1
        tile_index += 1

    tile_index = 0
    for j in range(0, height, tile_size):
        for i in range(0, width, tile_size):
            is_white_tile = (i // tile_size + j // tile_size) % 2 != 0

            if is_white_tile:
                # ---- horizontal offsets & width ----
                i_offset = max(0, i - buffer)  # shift left by buffer unless we hit 0
                right_edge = min(width, i + tile_size + buffer)  # shift right by buffer unless we hit width
                w = right_edge - i_offset  # final window width

                # ---- vertical offsets & height ----
                j_offset = max(0, j - buffer)
                bottom_edge = min(height, j + tile_size + buffer)
                h = bottom_edge - j_offset

                w = max(0, min(w, width - i_offset))
                h = max(0, min(h, height - j_offset))
                if w == 0 or h == 0:
                    continue

                # create tile mask and image
                image = _create_tile(dataset, i_offset, j_offset, w, h)
                mask = None
                if mask_dataset:
                    mask = _create_tile(mask_dataset, i_offset, j_offset, w, h, binary_mask=True)

                tile_transform = image.transform
                left, top = tile_transform * (0, 0)
                right, bottom = tile_transform * (w, h)
                bbox = (left, bottom, right, top)

                tile_polygon = box(*bbox)

                corner_length = buffer / 2
                minx, miny, maxx, maxy = tile_polygon.bounds
                bottom_left_square = Polygon([
                    (minx, miny),
                    (minx + corner_length, miny),
                    (minx + corner_length, miny + corner_length),
                    (minx, miny + corner_length)
                ])
                bottom_right_square = Polygon([
                    (maxx - corner_length, miny),
                    (maxx, miny),
                    (maxx, miny + corner_length),
                    (maxx - corner_length, miny + corner_length)
                ])
                tile_polygon = tile_polygon.difference(bottom_left_square).difference(bottom_right_square)

                intersecting_black_segments = all_black_segments[
                    all_black_segments.within(tile_polygon) | all_black_segments.overlaps(tile_polygon)
                    ]
                intersecting_white_segments = all_white_segments[
                    all_white_segments.within(tile_polygon) | all_white_segments.overlaps(tile_polygon)
                    ]

                if not intersecting_black_segments.empty or not intersecting_white_segments.empty:
                    overlapping_black_segments_for_mask = intersecting_black_segments[
                        intersecting_black_segments.overlaps(tile_polygon)
                    ]
                    overlapping_white_segments_for_mask = intersecting_white_segments[
                        intersecting_white_segments.overlaps(tile_polygon)
                    ]

                    overlapping_black_segments_for_delete = intersecting_black_segments[
                        intersecting_black_segments.within(tile_polygon)
                    ]
                    overlapping_white_segments_for_delete = intersecting_white_segments[
                        intersecting_white_segments.within(tile_polygon)
                    ]

                    indices_to_delete_black = overlapping_black_segments_for_delete.index
                    all_black_segments = all_black_segments.drop(indices_to_delete_black)

                    indices_to_delete_white = overlapping_white_segments_for_delete.index
                    all_white_segments = all_white_segments.drop(indices_to_delete_white)

                    combined_geometries = []

                    if not overlapping_white_segments_for_mask.empty:
                        white_geometries = [
                            (segment.geometry, 1) for _, segment in overlapping_white_segments_for_mask.iterrows()
                        ]
                        combined_geometries.extend(white_geometries)
                    if not overlapping_black_segments_for_mask.empty:
                        black_geometries = [
                            (segment.geometry, 1) for _, segment in overlapping_black_segments_for_mask.iterrows()
                        ]
                        combined_geometries.extend(black_geometries)
                    corner_geometries = [(bottom_left_square, 1), (bottom_right_square, 1)]
                    combined_geometries.extend(corner_geometries)

                    mask_rasterized = rasterize(
                        combined_geometries,
                        out_shape=(image.img_data.shape[0], image.img_data.shape[1]),
                        transform=image.transform,
                        fill=0,
                        default_value=1,
                        dtype=rasterio.uint8
                    )

                    if mask is not None:
                        mask[mask_rasterized == 1] = 0
                    else:
                        mask = mask_rasterized
                else:
                    print(f"No overlapping black segments found for tile ({i}, {j}).")

                n_segments = kwargs.get("n_segments", None)
                if n_segments is None:
                    geo_transform = dataset.GetGeoTransform()
                    pixel_width = geo_transform[1]
                    pixel_height = abs(geo_transform[5])
                    pixel_area = pixel_width * pixel_height
                    crown_area = math.pi * (crown_radius ** 2)
                    tree_area = mask.sum() * pixel_area
                    n_crowns = round(tree_area / crown_area)
                    n_segments = n_crowns
                try:
                    segmented_image = create_segments(
                        image=image,
                        mask=mask,
                        n_segments=n_segments,
                        method="slic",
                        **kwargs
                    )
                    all_white_segments = pd.concat([all_white_segments, segmented_image], ignore_index=True)
                except ValueError:
                    print(f"empty tile: ({i}, {j}).")

            tile_index += 1
        tile_index += 1

    all_segments = pd.concat([all_black_segments, all_white_segments], ignore_index=True)
    all_segments['segment_id'] = range(1, len(all_segments) + 1)
    all_segments.to_file(os.path.join(output_dir, "segments.gpkg"), driver="GPKG")

get_raster_bbox(dataset)

:param dataset: Input dataset from which to retrieve the bounding box. Must have GetGeoTransform, RasterXSize, and RasterYSize methods. :return: A tuple representing the bounding box (min_x, min_y, max_x, max_y).

Source code in obia/utils/tiling.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def get_raster_bbox(dataset):
    """
    :param dataset: Input dataset from which to retrieve the bounding box. Must have GetGeoTransform, RasterXSize, and RasterYSize methods.
    :return: A tuple representing the bounding box (min_x, min_y, max_x, max_y).
    """
    transform = dataset.GetGeoTransform()

    width = dataset.RasterXSize
    height = dataset.RasterYSize

    min_x = transform[0]
    max_y = transform[3]
    max_x = min_x + width * transform[1]
    min_y = max_y + height * transform[5]

    return (min_x, min_y, max_x, max_y)

obia.utils.training

obia.utils.training

generate_tiles(bounds, step, tile_size)

Generator that yields bounding boxes (minx, miny, maxx, maxy) by stepping through the full raster bounds in increments of 'step', producing tiles of size 'tile_size'.

Source code in obia/utils/training.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def generate_tiles(bounds, step, tile_size):
    """
    Generator that yields bounding boxes (minx, miny, maxx, maxy)
    by stepping through the full raster bounds in increments of 'step',
    producing tiles of size 'tile_size'.
    """
    minx, miny, maxx, maxy = bounds
    y = miny
    while y < maxy:
        x = minx
        tile_top = y + tile_size
        while x < maxx:
            tile_right = x + tile_size
            cur_maxx = min(tile_right, maxx)
            cur_maxy = min(tile_top, maxy)
            yield (x, y, cur_maxx, cur_maxy)
            x += step
        y += step

tile_and_process(raster_path, mask_path=None, boxes_gpkg_path=None, output_dir='output_tiles', tile_size=150.0, overlap=50.0, selected_bands=(4, 2, 1), feather_radius=0.0, blur_kernel=5, darken_factor=0.8, apply_clahe_flag=True, rescale=True)

Tiles an input raster (and corresponding binary mask if provided), optionally applies CLAHE, optionally feathers the canopy edges using a distance transform, and writes out per-tile images (JPEG). Also converts polygons to bounding-box annotations if 'boxes_gpkg_path' is provided.

Additionally, it saves a 'transforms.json' containing each tile's transform (as [a,b,c,d,e,f]) and its CRS in string form, so you can later reconstruct georeferencing for each tile.

Parameters

raster_path : str Path to the main image raster (e.g., "image.tif"). mask_path : str or None If provided, path to a binary canopy mask (0=background, 1=canopy). If None, no masking or feathering will be done. boxes_gpkg_path : str or None If provided, path to a GeoPackage with polygon annotations to be converted to bounding boxes per tile. If None, no annotations are generated. output_dir : str Directory to save output JPEG tiles, annotations.json, and transforms.json. tile_size : float Size of each tile in the raster's coordinate units (e.g., meters). overlap : float Overlap (in same units) between adjacent tiles. selected_bands : tuple of ints Which bands (1-based in raster) to extract from the raster. E.g., (4,2,1). feather_radius : float If > 0, a distance transform is used to create a soft alpha around canopy edges. 0 = no feathering. Only relevant if mask_path is not None. blur_kernel : int or tuple Size of the GaussianBlur kernel (must be an odd int or an odd tuple), e.g. 5 or (5,5). If 0, skip blur entirely. darken_factor : float Multiplier for background brightness, e.g., 0.8 = 80% brightness, 0.2 = 20% brightness. Only relevant if mask_path is not None. apply_clahe_flag : bool If True, apply CLAHE to the rescaled 8-bit tile before blending. If False, skip CLAHE and use the raw 8-bit image. rescale : bool If True, apply a percentile-based stretch to 8-bit. If False, do a min-max linear scaling (or no scaling if tile_min==tile_max).

Returns

None Writes: - Per-tile JPEG images in 'output_dir'. - 'annotations.json' (if polygons are provided). - 'transforms.json' with each tile's geotransform & CRS.

Source code in obia/utils/training.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
def tile_and_process(
    raster_path,
    mask_path=None,
    boxes_gpkg_path=None,
    output_dir="output_tiles",
    tile_size=150.0,
    overlap=50.0,
    selected_bands=(4, 2, 1),
    feather_radius=0.0,
    blur_kernel=5,
    darken_factor=0.8,
    apply_clahe_flag=True,
    rescale=True
):
    """
    Tiles an input raster (and corresponding binary mask if provided), optionally
    applies CLAHE, optionally feathers the canopy edges using a distance transform,
    and writes out per-tile images (JPEG). Also converts polygons to bounding-box
    annotations if 'boxes_gpkg_path' is provided.

    Additionally, it saves a 'transforms.json' containing each tile's
    transform (as [a,b,c,d,e,f]) and its CRS in string form, so you can
    later reconstruct georeferencing for each tile.

    Parameters
    ----------
    raster_path : str
        Path to the main image raster (e.g., "image.tif").
    mask_path : str or None
        If provided, path to a binary canopy mask (0=background, 1=canopy).
        If None, no masking or feathering will be done.
    boxes_gpkg_path : str or None
        If provided, path to a GeoPackage with polygon annotations to be converted
        to bounding boxes per tile. If None, no annotations are generated.
    output_dir : str
        Directory to save output JPEG tiles, annotations.json, and transforms.json.
    tile_size : float
        Size of each tile in the raster's coordinate units (e.g., meters).
    overlap : float
        Overlap (in same units) between adjacent tiles.
    selected_bands : tuple of ints
        Which bands (1-based in raster) to extract from the raster. E.g., (4,2,1).
    feather_radius : float
        If > 0, a distance transform is used to create a soft alpha around
        canopy edges. 0 = no feathering. Only relevant if mask_path is not None.
    blur_kernel : int or tuple
        Size of the GaussianBlur kernel (must be an odd int or an odd tuple),
        e.g. 5 or (5,5). If 0, skip blur entirely.
    darken_factor : float
        Multiplier for background brightness, e.g., 0.8 = 80% brightness, 0.2 = 20% brightness.
        Only relevant if mask_path is not None.
    apply_clahe_flag : bool
        If True, apply CLAHE to the rescaled 8-bit tile before blending.
        If False, skip CLAHE and use the raw 8-bit image.
    rescale : bool
        If True, apply a percentile-based stretch to 8-bit. If False, do a min-max
        linear scaling (or no scaling if tile_min==tile_max).

    Returns
    -------
    None
        Writes:
          - Per-tile JPEG images in 'output_dir'.
          - 'annotations.json' (if polygons are provided).
          - 'transforms.json' with each tile's geotransform & CRS.
    """

    os.makedirs(output_dir, exist_ok=True)
    step = tile_size - overlap

    # If we have polygons, read them; else set None
    if boxes_gpkg_path:
        gdf = gpd.read_file(boxes_gpkg_path)
    else:
        gdf = None
    print("reading raster...")
    with rasterio.open(raster_path) as src:
        # If we have a mask path, open it; else None
        mask_src = rasterio.open(mask_path) if mask_path else None

        # If GPKG was loaded, ensure CRS matches (if needed)
        if gdf is not None and gdf.crs != src.crs:
            gdf = gdf.to_crs(src.crs)

        all_annotations = {}
        tile_index = 0

        # Dictionary to store tile transforms for each tile_name
        transforms_dict = {}
        print("read raster")
        # Generate tiles

        width = src.bounds.right - src.bounds.left
        height = src.bounds.top - src.bounds.bottom

        num_tiles_x = math.ceil((width - overlap) / (tile_size - overlap))
        num_tiles_y = math.ceil((height - overlap) / (tile_size - overlap))
        total_tiles = num_tiles_x * num_tiles_y

        for tbox in tqdm(generate_tiles(src.bounds, step, tile_size), total=total_tiles):
        # for tbox in generate_tiles(src.bounds, step, tile_size):
            tile_index += 1
            minx, miny, maxx, maxy = tbox

            # Optional step: Filter polygons that lie within this tile
            if gdf is not None:
                possible = gdf.cx[minx:maxx, miny:maxy]
                tile_poly = box(minx, miny, maxx, maxy)
                tile_polygons = possible[possible.geometry.within(tile_poly)]
            else:
                tile_polygons = []  # empty if no GPKG

            # (1) Read the tile from main raster
            tile_window = from_bounds(minx, miny, maxx, maxy, src.transform)
            data = src.read(indexes=[b+1 for b in selected_bands], window=tile_window)
            # data shape => (num_bands, H, W)

            tile_img = np.moveaxis(data, 0, -1)  # shape => (H, W, num_bands)

            # (2) Rescale to 8-bit & optionally apply CLAHE
            if rescale:
                tile_img_8bit = rescale_to_8bit(tile_img)  # percentile-based approach
            else:
                tile_min, tile_max = tile_img.min(), tile_img.max()
                if tile_min == tile_max:
                    tile_img_8bit = np.zeros_like(tile_img, dtype=np.uint8)
                else:
                    tile_img_8bit = 255 * (tile_img - tile_min) / (tile_max - tile_min)
                tile_img_8bit = np.clip(tile_img_8bit, 0, 255).astype(np.uint8)

            if apply_clahe_flag:
                channels = cv2.split(tile_img_8bit)
                clahe_channels = [apply_clahe(ch) for ch in channels]
                tile_img_final = cv2.merge(clahe_channels)
            else:
                tile_img_final = tile_img_8bit

            # (3) If we have a mask, read the tile & do blending
            if mask_src:
                mask_data = mask_src.read(1, window=tile_window)

                # if mask_data.shape[0] == 0 or mask_data.shape[1] == 0:
                #     continue
                # if not np.any(mask_data):
                #     continue
                #
                # if mask_data.shape != tile_img_final.shape[:2]:
                #     # resample mask_data to match tile_img_final
                #     mask_data = cv2.resize(
                #         mask_data,
                #         (tile_img_final.shape[1], tile_img_final.shape[0]),
                #         interpolation=cv2.INTER_NEAREST
                #     )

                # (A) If blur_kernel == 0 => skip blur
                if isinstance(blur_kernel, int):
                    if blur_kernel == 0:
                        blurred_img = tile_img_final
                    else:
                        blur_kernel = (blur_kernel, blur_kernel)
                        blurred_img = cv2.GaussianBlur(tile_img_final, blur_kernel, 0)
                elif isinstance(blur_kernel, tuple):
                    if blur_kernel == (0,0):
                        blurred_img = tile_img_final
                    else:
                        blurred_img = cv2.GaussianBlur(tile_img_final, blur_kernel, 0)

                # Darken background
                if darken_factor == 0:
                    darkened_background = blurred_img
                else:
                    darkened_background = (blurred_img * darken_factor).astype(np.uint8)

                # (B) Feather if feather_radius > 0, else do hard blend
                if feather_radius > 0:
                    mask_8u = (mask_data * 255).astype(np.uint8)
                    inverse_mask = 255 - mask_8u
                    dist = cv2.distanceTransform(inverse_mask, cv2.DIST_L2, 3)

                    alpha = 1.0 - (dist / feather_radius)
                    alpha = np.clip(alpha, 0.0, 1.0)
                    alpha_3d = np.dstack([alpha]*3)  # (H, W, 3)

                    tile_f = tile_img_final.astype(np.float32)
                    darkened_bg_f = darkened_background.astype(np.float32)

                    out_img_f = alpha_3d * tile_f + (1.0 - alpha_3d) * darkened_bg_f
                    out_img = np.clip(out_img_f, 0, 255).astype(np.uint8)
                else:
                    # Hard blend with binary mask
                    mask_3d = np.stack([mask_data]*3, axis=-1)  # (H, W, 3)
                    blended_f = tile_img_final * mask_3d + darkened_background * (1 - mask_3d)
                    out_img = blended_f.astype(np.uint8)
            else:
                # No mask => just keep tile_img_final as is
                out_img = tile_img_final

            # (4) Save tile
            out_height, out_width = out_img.shape[:2]
            # Create a local transform for this tile
            tile_transform = rasterio.windows.transform(tile_window, src.transform)

            profile = src.profile.copy()
            profile.update({
                "driver": "JPEG",
                "height": out_height,
                "width": out_width,
                "count": out_img.shape[2],  # typically 3
                "dtype": "uint8",
                "transform": tile_transform,  # georeferencing for the tile
                "crs": src.crs
            })

            tile_name = f"img_{tile_index:03d}.jpg"
            tile_path = os.path.join(output_dir, tile_name)
            out_data = np.moveaxis(out_img, -1, 0)

            with rasterio.open(tile_path, "w", **profile) as dst:
                dst.write(out_data)

            # Store the transform & CRS in a dict
            # tile_transform is an Affine object with attributes (a,b,c,d,e,f)
            # We'll store them as a list [a,b,c,d,e,f] plus a CRS string
            transform_list = [
                tile_transform.a, tile_transform.b, tile_transform.c,
                tile_transform.d, tile_transform.e, tile_transform.f
            ]
            transforms_dict[tile_name] = {
                "transform": transform_list,
                "crs": str(src.crs)  # or src.crs.to_string() / to_wkt() if you prefer
            }

            # (5) Convert polygons -> bounding boxes
            if gdf is not None and len(tile_polygons) > 0:
                row_off = tile_window.row_off
                col_off = tile_window.col_off

                boxes_array = []
                labels_array = []

                for idx, row in tile_polygons.iterrows():
                    poly = row.geometry
                    pxmin, pymin, pxmax, pymax = poly.bounds

                    # Convert geospatial coords -> (row, col)
                    row_tl, col_tl = rowcol(src.transform, pxmin, pymax)
                    row_br, col_br = rowcol(src.transform, pxmax, pymin)

                    # Convert to tile coords
                    row_tl_local = row_tl - row_off
                    col_tl_local = col_tl - col_off
                    row_br_local = row_br - row_off
                    col_br_local = col_br - col_off

                    x_min_pix = col_tl_local
                    y_min_pix = row_tl_local
                    x_max_pix = col_br_local
                    y_max_pix = row_br_local

                    # Clamp
                    x_min_pix = max(0, min(x_min_pix, out_width - 1))
                    x_max_pix = max(0, min(x_max_pix, out_width - 1))
                    y_min_pix = max(0, min(y_min_pix, out_height - 1))
                    y_max_pix = max(0, min(y_max_pix, out_height - 1))

                    # Skip degenerate boxes
                    if x_min_pix >= x_max_pix or y_min_pix >= y_max_pix:
                        continue

                    boxes_array.append([x_min_pix, y_min_pix, x_max_pix, y_max_pix])
                    labels_array.append(1)  # or your class label

                if len(boxes_array) == 0:
                    all_annotations[f"img_{tile_index:03d}"] = {
                        "file_name": tile_name,
                        "boxes": [],
                        "labels": []
                    }
                else:
                    all_annotations[f"img_{tile_index:03d}"] = {
                        "file_name": tile_name,
                        "boxes": boxes_array,
                        "labels": labels_array
                    }

        # End of tile loop
        # Close mask if used
        if mask_src:
            mask_src.close()

    # (6) Write out the annotations JSON (only if we had a GPKG)
    if gdf is not None:
        json_path = os.path.join(output_dir, "annotations.json")
        with open(json_path, "w") as f:
            json.dump(all_annotations, f, indent=2)
        print(f"Annotations JSON written to: {json_path}")

    # (7) Write out the transforms JSON
    transforms_path = os.path.join(output_dir, "transforms.json")
    with open(transforms_path, "w") as ft:
        json.dump(transforms_dict, ft, indent=2)
    print(f"Transforms JSON written to: {transforms_path}")

    print("Done! Tiles written to:", output_dir)

obia.utils.utils

obia.utils.utils

crop_image_to_bbox(image, geom)

Crop the image data to the bounding box of the given geometry.

:param image: The Image object containing the image data and rasterio object. :param geom: The geometry (Polygon) used to derive the bounding box for cropping. :return: Cropped image data as a NumPy array and the updated transform.

Source code in obia/utils/utils.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def crop_image_to_bbox(image, geom):
    """
    Crop the image data to the bounding box of the given geometry.

    :param image: The Image object containing the image data and rasterio object.
    :param geom: The geometry (Polygon) used to derive the bounding box for cropping.
    :return: Cropped image data as a NumPy array and the updated transform.
    """
    xmin, ymin, xmax, ymax = geom.bounds
    window = from_bounds(xmin, ymin, xmax, ymax, transform=image.transform)
    cropped_img_data = image.rasterio_obj.read(window=window)
    cropped_transform = image.rasterio_obj.window_transform(window)

    return cropped_img_data, cropped_transform

label_segments(segments, labelled_points)

:param segments: A GeoDataFrame representing the segments to be labeled. :param labelled_points: A GeoDataFrame representing the labeled points used for segment labeling. :return: A tuple containing a GeoDataFrame with labeled segments and a list of segment IDs for mixed segments.

Source code in obia/utils/utils.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def label_segments(segments: gpd.GeoDataFrame, labelled_points: gpd.GeoDataFrame) -> Tuple[
    gpd.GeoDataFrame, List[str]]:
    """
    :param segments: A GeoDataFrame representing the segments to be labeled.
    :param labelled_points: A GeoDataFrame representing the labeled points used for segment labeling.
    :return: A tuple containing a GeoDataFrame with labeled segments and a list of segment IDs for mixed segments.
    """
    mixed_segments = []
    labelled_segments = segments.copy()
    intersections = gpd.sjoin(labelled_segments, labelled_points, how='inner', predicate='intersects')

    for polygon_id, group in intersections.groupby(intersections.index):
        classes = group['class'].unique()

        if len(classes) == 1:
            labelled_segments.loc[polygon_id, 'feature_class'] = classes[0]
        else:
            segment_id = group['segment_id'].values[0]
            mixed_segments.append(segment_id)

    labelled_segments = labelled_segments[labelled_segments['feature_class'].notna()]

    return labelled_segments, mixed_segments

mask_image_with_polygon(cropped_img_data, polygon, cropped_transform)

Masks all pixels outside the polygon for the given cropped image.

:param cropped_img_data: The cropped image data as a NumPy array. :param polygon: The geometry (Polygon) used for masking. :param cropped_transform: The affine transform for the cropped image. :return: Masked image data as a NumPy array.

Source code in obia/utils/utils.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def mask_image_with_polygon(cropped_img_data, polygon, cropped_transform):
    """
    Masks all pixels outside the polygon for the given cropped image.

    :param cropped_img_data: The cropped image data as a NumPy array.
    :param polygon: The geometry (Polygon) used for masking.
    :param cropped_transform: The affine transform for the cropped image.
    :return: Masked image data as a NumPy array.
    """
    height, width = cropped_img_data.shape[1], cropped_img_data.shape[2]
    mask = geometry_mask([polygon], transform=cropped_transform, invert=True, out_shape=(height, width))
    mask_expanded = np.expand_dims(mask, axis=0)  # Add an extra dimension for bands
    masked_img_data = np.where(mask_expanded, cropped_img_data, np.nan)

    return masked_img_data

save_deepforest_predictions_to_gpkg(df, tile_name, transforms_json, output_gpkg)

Convert DeepForest predictions (pixel coords) to georeferenced polygons and save them in a GeoPackage.

Parameters

df : pd.DataFrame Must have columns: ["xmin","ymin","xmax","ymax","label","score"]. Optional "image_path" if you prefer to verify or filter by tile_name. tile_name : str The tile's filename (e.g. "img_007.jpg") that matches the key in transforms.json. transforms_json : str File path to the transforms.json created by tile_and_process(...). We'll load the tile's Affine transform + CRS from there. output_gpkg : str Path to the output GeoPackage file to create/overwrite.

Source code in obia/utils/utils.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def save_deepforest_predictions_to_gpkg(
    df,               # DataFrame from DeepForest predict_image(...)
    tile_name,        # e.g. "img_007.jpg"
    transforms_json,  # path to transforms.json
    output_gpkg       # path to output .gpkg
):
    """
    Convert DeepForest predictions (pixel coords) to georeferenced polygons
    and save them in a GeoPackage.

    Parameters
    ----------
    df : pd.DataFrame
        Must have columns: ["xmin","ymin","xmax","ymax","label","score"].
        Optional "image_path" if you prefer to verify or filter by tile_name.
    tile_name : str
        The tile's filename (e.g. "img_007.jpg") that matches the key in transforms.json.
    transforms_json : str
        File path to the transforms.json created by tile_and_process(...).
        We'll load the tile's Affine transform + CRS from there.
    output_gpkg : str
        Path to the output GeoPackage file to create/overwrite.
    """
    # 1) Load transforms.json
    with open(transforms_json, "r") as f:
        transforms_dict = json.load(f)

    if tile_name not in transforms_dict:
        # todo: raise warning or something
        print(f"Tile '{tile_name}' not found in transforms.json. Skipping.")
        return

    # 2) Reconstruct the tile's Affine + CRS
    # transforms_dict[tile_name] = { "transform": [a,b,c,d,e,f], "crs": "EPSG:..." }
    tinfo = transforms_dict[tile_name]
    a, b, c, d, e, f = tinfo["transform"]
    tile_affine = Affine(a, b, c, d, e, f)
    crs_str = tinfo["crs"]  # e.g. "EPSG:32610"

    # 3) Convert each bounding box to a polygon in map coords
    records = []
    for idx, row in df.iterrows():
        # read pixel coords
        xmin_pix = row["xmin"]
        ymin_pix = row["ymin"]
        xmax_pix = row["xmax"]
        ymax_pix = row["ymax"]

        # top-left corner => (col=xmin_pix, row=ymin_pix)
        x1, y1 = tile_affine * (xmin_pix, ymin_pix)
        # top-right corner => (col=xmax_pix, row=ymin_pix)
        x2, y2 = tile_affine * (xmax_pix, ymin_pix)
        # bottom-right corner => (col=xmax_pix, row=ymax_pix)
        x3, y3 = tile_affine * (xmax_pix, ymax_pix)
        # bottom-left corner => (col=xmin_pix, row=ymax_pix)
        x4, y4 = tile_affine * (xmin_pix, ymax_pix)

        # build polygon (4-corner bounding box)
        geom = Polygon([(x1, y1), (x2, y2), (x3, y3), (x4, y4), (x1, y1)])

        # store relevant columns
        rec = {
            "label": row.get("label", "Tree"),   # default label if not present
            "score": row.get("score", None),
            "geometry": geom
        }
        records.append(rec)

    if not records:
        # todo: raise warning or something
        print(f"No predictions to save for tile {tile_name}")
        return

    gdf = gpd.GeoDataFrame(records, crs=crs_str)
    # 4) Save to GPKG
    gdf.to_file(output_gpkg, driver="GPKG")