Skip to content

patch

RleEncoder

Bases: JSONEncoder

Custom encoder to convert numpy arrays to RLE.

default(o)

Customize standard encoder behaviour to convert numpy arrays to RLE.

Source code in quadra/utils/patch/model.py
148
149
150
151
152
def default(self, o: Any):
    """Customize standard encoder behaviour to convert numpy arrays to RLE."""
    if isinstance(o, np.ndarray):
        return mask2rle(o)
    return json.JSONEncoder.default(self, o)

compute_patch_metrics(test_img_info, test_results, overlap, idx_to_class, patch_num_h=None, patch_num_w=None, patch_w=None, patch_h=None, return_polygon=False, patch_reconstruction_method='priority', annotated_good=None)

Compute the metrics of a patch dataset.

Parameters:

  • test_img_info (List[PatchDatasetFileFormat]) –

    List of observation paths and mask paths

  • test_results (DataFrame) –

    Pandas dataframe containing the results of an SklearnClassificationTrainer utility

  • patch_num_h (Optional[int], default: None ) –

    Number of vertical patches (required if patch_w and patch_h are None)

  • patch_num_w (Optional[int], default: None ) –

    Number of horizontal patches (required if patch_w and patch_h are None)

  • patch_h (Optional[int], default: None ) –

    Patch height (required if patch_num_h and patch_num_w are None)

  • patch_w (Optional[int], default: None ) –

    Patch width (required if patch_num_h and patch_num_w are None)

  • overlap (float) –

    Percentage of overlap between the patches

  • idx_to_class (Dict) –

    Dict mapping an index to the corresponding class name

  • return_polygon (bool, default: False ) –

    if set to true convert the reconstructed mask into polygons, otherwise return the mask

  • patch_reconstruction_method (str, default: 'priority' ) –

    How to compute the label of overlapping patches, can either be: priority: Assign the top priority label (i.e the one with greater index) to overlapping regions major_voting: Assign the most present label among the patches label overlapping a pixel

  • annotated_good (Optional[List[int]], default: None ) –

    List of indices of annotations to be treated as good.

Returns:

  • Tuple[int, int, int, List[Dict]]

    Tuple containing: false_region_bad: Number of false bad regions detected in the dataset false_region_good: Number of missed defects true_region_bad: Number of correctly identified defects reconstructions: If polygon is true this is a List of dict containing { "file_path": image_path, "mask_path": mask_path, "file_name": observation_name, "prediction": [{ "label": predicted_label, "points": List of dict coordinates "x" and "y" representing the points of a polygon that surrounds an image area covered by patches of label = predicted_label }] } else its a list of dict containing { "file_path": image_path, "mask_path": mask_path, "file_name": observation_name, "prediction": numpy array containing the reconstructed mask }

Source code in quadra/utils/patch/metrics.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def compute_patch_metrics(
    test_img_info: List[PatchDatasetFileFormat],
    test_results: pd.DataFrame,
    overlap: float,
    idx_to_class: Dict,
    patch_num_h: Optional[int] = None,
    patch_num_w: Optional[int] = None,
    patch_w: Optional[int] = None,
    patch_h: Optional[int] = None,
    return_polygon: bool = False,
    patch_reconstruction_method: str = "priority",
    annotated_good: Optional[List[int]] = None,
) -> Tuple[int, int, int, List[Dict]]:
    """Compute the metrics of a patch dataset.

    Args:
        test_img_info: List of observation paths and mask paths
        test_results: Pandas dataframe containing the results of an SklearnClassificationTrainer utility
        patch_num_h: Number of vertical patches (required if patch_w and patch_h are None)
        patch_num_w: Number of horizontal patches (required if patch_w and patch_h are None)
        patch_h: Patch height (required if patch_num_h and patch_num_w are None)
        patch_w: Patch width (required if patch_num_h and patch_num_w are None)
        overlap: Percentage of overlap between the patches
        idx_to_class: Dict mapping an index to the corresponding class name
        return_polygon: if set to true convert the reconstructed mask into polygons, otherwise return the mask
        patch_reconstruction_method: How to compute the label of overlapping patches, can either be:
            priority: Assign the top priority label (i.e the one with greater index) to overlapping regions
            major_voting: Assign the most present label among the patches label overlapping a pixel
        annotated_good: List of indices of annotations to be treated as good.

    Returns:
        Tuple containing:
            false_region_bad: Number of false bad regions detected in the dataset
            false_region_good: Number of missed defects
            true_region_bad: Number of correctly identified defects
            reconstructions: If polygon is true this is a List of dict containing
                {
                    "file_path": image_path,
                    "mask_path": mask_path,
                    "file_name": observation_name,
                    "prediction": [{
                        "label": predicted_label,
                        "points": List of dict coordinates "x" and "y" representing the points of a polygon that
                        surrounds an image area covered by patches of label = predicted_label
                    }]
                }
            else its a list of dict containing
                {
                    "file_path": image_path,
                    "mask_path": mask_path,
                    "file_name": observation_name,
                    "prediction": numpy array containing the reconstructed mask
                }
    """
    assert patch_reconstruction_method in [
        "priority",
        "major_voting",
    ], "Patch reconstruction method not recognized, valid values are priority, major_voting"

    if (patch_h is not None and patch_w is not None) and (patch_num_h is not None and patch_num_w is not None):
        raise ValueError("Either number of patches or patch size is required for reconstruction")

    assert (patch_h is not None and patch_w is not None) or (
        patch_num_h is not None and patch_num_w is not None
    ), "Either number of patches or patch size is required for reconstruction"

    if patch_h is not None and patch_w is not None and patch_num_h is not None and patch_num_w is not None:
        warnings.warn(
            "Both number of patches and patch dimension are specified, using number of patches by default", UserWarning
        )

    log.info("Computing patch metrics!")

    false_region_bad = 0
    false_region_good = 0
    true_region_bad = 0
    reconstructions = []
    test_results["filename"] = test_results["sample"].apply(
        lambda x: "_".join(os.path.basename(x).replace("#DISCARD#", "").split("_")[0:-1])
    )

    for info in tqdm(test_img_info):
        img_path = info.image_path
        mask_path = info.mask_path

        img_json_entry = {
            "image_path": img_path,
            "mask_path": mask_path,
            "file_name": os.path.basename(img_path),
            "prediction": None,
        }

        test_img = cv2.imread(img_path)

        img_name = os.path.basename(img_path)

        h = test_img.shape[0]
        w = test_img.shape[1]

        gt_img = None

        if mask_path is not None and os.path.exists(mask_path):
            gt_img = cv2.imread(mask_path, 0)
            if test_img.shape[0:2] != gt_img.shape:
                # Ensure that the mask has the same size as the image by padding it with zeros
                log.warning("Found mask with different size than the image, padding it with zeros!")
                gt_img = np.pad(
                    gt_img, ((0, test_img.shape[0] - gt_img.shape[0]), (0, test_img.shape[1] - gt_img.shape[1]))
                )
        if patch_num_h is not None and patch_num_w is not None:
            patch_size, step = compute_patch_info(h, w, patch_num_h, patch_num_w, overlap)
        elif patch_h is not None and patch_w is not None:
            [patch_num_h, patch_num_w], step = compute_patch_info_from_patch_dim(h, w, patch_h, patch_w, overlap)
            patch_size = (patch_h, patch_w)
        else:
            raise ValueError(
                "Either number of patches or patch size is required for reconstruction, this should not happen"
                " at this stage"
            )

        img_patches = get_sorted_patches_by_image(test_results, img_name)
        pred = img_patches["pred_label"].to_numpy().reshape(patch_num_h, patch_num_w)

        # Treat annotated good predictions as background, this is an optimistic assumption that assumes that the
        # remaining background is good, but it is not always true so maybe on non annotated areas we are missing
        # defects and it would be necessary to handle this in a different way.
        if annotated_good is not None:
            pred[np.isin(pred, annotated_good)] = 0
        if patch_num_h is not None and patch_num_w is not None:
            output_mask, predicted_defect = reconstruct_patch(
                input_img_shape=test_img.shape,
                patch_size=patch_size,
                pred=pred,
                patch_num_h=patch_num_h,
                patch_num_w=patch_num_w,
                idx_to_class=idx_to_class,
                step=step,
                return_polygon=return_polygon,
                method=patch_reconstruction_method,
            )
        else:
            raise ValueError("`patch_num_h` and `patch_num_w` cannot be None at this point")

        if return_polygon:
            img_json_entry["prediction"] = predicted_defect
        else:
            img_json_entry["prediction"] = output_mask

        reconstructions.append(img_json_entry)
        if gt_img is not None:
            if annotated_good is not None:
                gt_img[np.isin(gt_img, annotated_good)] = 0

            gt_img_binary = (gt_img > 0).astype(bool)
            regions_pred = label(output_mask).astype(np.uint8)

            for k in range(1, regions_pred.max() + 1):
                region = (regions_pred == k).astype(bool)
                # If there's no overlap with the gt
                if np.sum(np.bitwise_and(region, gt_img_binary)) == 0:
                    false_region_bad += 1

            output_mask = (output_mask > 0).astype(np.uint8)
            gt_img = label(gt_img)

            for i in range(1, gt_img.max() + 1):
                region = (gt_img == i).astype(bool)
                if np.sum(np.bitwise_and(region, output_mask)) == 0:
                    false_region_good += 1
                else:
                    true_region_bad += 1

    return false_region_bad, false_region_good, true_region_bad, reconstructions

generate_patch_dataset(data_dictionary, class_to_idx, val_size=0.3, test_size=0.0, seed=42, patch_number=None, patch_size=None, overlap=0.0, output_folder='extraction_data', save_original_images_and_masks=True, area_threshold=0.45, area_defect_threshold=0.2, mask_extension='_mask', mask_output_folder=None, save_mask=False, clear_output_folder=False, mask_preprocessing=None, train_filename='dataset.txt', repeat_good_images=1, balance_defects=True, annotated_good=None, num_workers=1)

Giving a data_dictionary as:

{ 'base_name': '163931_1_5.jpg', 'path': 'extraction_data/1/163931_1_5.jpg', 'mask': 'extraction_data/1/163931_1_5_mask.jpg' } This function will generate patches datasets based on the defined split number, one for training, one for validation and one for testing respectively under output_folder/train, output_folder/val and output_folder/test, the training dataset will contain h5 files and a txt file resulting from a call to the generate_classification_patch_train_dataset, while the test dataset will contain patches saved on disk divided in subfolders per class, patch extraction is done in a sliding window fashion. Original images and masks (preprocessed if mask_preprocessing is present) will also be saved under output_folder/original/images and output_folder/original/masks. If patch number is specified the patch size will be calculated accordingly, if the image is not divisible by the patch number two possible behaviours can occur: - if the patch reconstruction is smaller than the original image a new patch will be generated containing the pixels from the edge of the image (E.g the new patch will contain the last patch_size pixels of the original image) - if the patch reconstruction is bigger than the original image the last patch will contain the pixels from the edge of the image same as above, but without adding a new patch to the count.

Parameters:

  • data_dictionary (List[Dict]) –

    Dictionary as above

  • val_size (float, default: 0.3 ) –

    percentage of the dictionary entries to be used for validation

  • test_size (float, default: 0.0 ) –

    percentage of the dictionary entries to be used for testing

  • seed (int, default: 42 ) –

    seed for rng based operations

  • clear_output_folder (bool, default: False ) –

    flag used to delete all the data in subfolder

  • class_to_idx (Dict) –

    Dictionary {"defect": value in mask.. }

  • output_folder (str, default: 'extraction_data' ) –

    root_folder where to extract the data

  • save_original_images_and_masks (bool, default: True ) –

    If True, images and masks will be copied inside output_folder/original/

  • area_threshold (float, default: 0.45 ) –

    Minimum percentage of defected patch area present in the mask to classify the patch as defect

  • area_defect_threshold (float, default: 0.2 ) –

    Minimum percentage of single defect present in the patch to classify the patch as defect

  • mask_extension (str, default: '_mask' ) –

    Extension used to assign image to mask

  • mask_output_folder (Optional[str], default: None ) –

    Optional folder in which to save the masks

  • save_mask (bool, default: False ) –

    Flag to save the mask

  • patch_number (Optional[Tuple[int, int]], default: None ) –

    Optional number of patches for each side, required if patch_size is None

  • patch_size (Optional[Tuple[int, int]], default: None ) –

    Optional dimension of the patch, required if patch_number is None

  • overlap (float, default: 0.0 ) –

    Overlap of the patches [0, 1]

  • mask_preprocessing (Optional[Callable], default: None ) –

    Optional function applied to masks, this can be useful for example to convert an image in range [0-255] to the required [0-1]

  • train_filename (str, default: 'dataset.txt' ) –

    Name of the file containing mapping between h5 files and labels for training

  • repeat_good_images (int, default: 1 ) –

    Number of repetition for images with emtpy or None mask

  • balance_defects (bool, default: True ) –

    If true add one good entry for each defect extracted

  • annotated_good (Optional[List[str]], default: None ) –

    List of labels that are annotated but considered as good

  • num_workers (int, default: 1 ) –

    Number of workers used for the h5 creation

Returns:

  • Optional[Dict]

    None if data_dictionary is empty, otherwise return a dictionary containing informations about the dataset

Source code in quadra/utils/patch/dataset.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
def generate_patch_dataset(
    data_dictionary: List[Dict],
    class_to_idx: Dict,
    val_size: float = 0.3,
    test_size: float = 0.0,
    seed: int = 42,
    patch_number: Optional[Tuple[int, int]] = None,
    patch_size: Optional[Tuple[int, int]] = None,
    overlap: float = 0.0,
    output_folder: str = "extraction_data",
    save_original_images_and_masks: bool = True,
    area_threshold: float = 0.45,
    area_defect_threshold: float = 0.2,
    mask_extension: str = "_mask",
    mask_output_folder: Optional[str] = None,
    save_mask: bool = False,
    clear_output_folder: bool = False,
    mask_preprocessing: Optional[Callable] = None,
    train_filename: str = "dataset.txt",
    repeat_good_images: int = 1,
    balance_defects: bool = True,
    annotated_good: Optional[List[str]] = None,
    num_workers: int = 1,
) -> Optional[Dict]:
    """Giving a data_dictionary as:
    >>> {
    >>>     'base_name': '163931_1_5.jpg',
    >>>     'path': 'extraction_data/1/163931_1_5.jpg',
    >>>     'mask': 'extraction_data/1/163931_1_5_mask.jpg'
    >>>}
    This function will generate patches datasets based on the defined split number, one for training, one for validation
    and one for testing respectively under output_folder/train, output_folder/val and output_folder/test, the training
    dataset will contain h5 files and a txt file resulting from a call to the
    generate_classification_patch_train_dataset, while the test dataset will contain patches saved on disk divided
    in subfolders per class, patch extraction is done in a sliding window fashion.
    Original images and masks (preprocessed if mask_preprocessing is present) will also be saved under
    output_folder/original/images and output_folder/original/masks.
    If patch number is specified the patch size will be calculated accordingly, if the image is not divisible by the
    patch number two possible behaviours can occur:
        - if the patch reconstruction is smaller than the original image a new patch will be generated containing the
        pixels from the edge of the image (E.g the new patch will contain the last patch_size pixels of the original
        image)
        - if the patch reconstruction is bigger than the original image the last patch will contain the pixels from the
        edge of the image same as above, but without adding a new patch to the count.

    Args:
        data_dictionary: Dictionary as above
        val_size: percentage of the dictionary entries to be used for validation
        test_size: percentage of the dictionary entries to be used for testing
        seed: seed for rng based operations
        clear_output_folder: flag used to delete all the data in subfolder
        class_to_idx: Dictionary {"defect": value in mask.. }
        output_folder: root_folder where to extract the data
        save_original_images_and_masks: If True, images and masks will be copied inside output_folder/original/
        area_threshold: Minimum percentage of defected patch area present in the mask to classify the patch as defect
        area_defect_threshold: Minimum percentage of single defect present in the patch to classify the patch as defect
        mask_extension: Extension used to assign image to mask
        mask_output_folder: Optional folder in which to save the masks
        save_mask: Flag to save the mask
        patch_number: Optional number of patches for each side, required if patch_size is None
        patch_size: Optional dimension of the patch, required if patch_number is None
        overlap: Overlap of the patches [0, 1]
        mask_preprocessing: Optional function applied to masks, this can be useful for example to convert an image in
            range [0-255] to the required [0-1]
        train_filename: Name of the file containing mapping between h5 files and labels for training
        repeat_good_images: Number of repetition for images with emtpy or None mask
        balance_defects: If true add one good entry for each defect extracted
        annotated_good: List of labels that are annotated but considered as good
        num_workers: Number of workers used for the h5 creation

    Returns:
        None if data_dictionary is empty, otherwise return a dictionary containing informations about the dataset

    """
    if len(data_dictionary) == 0:
        warnings.warn("Input data dictionary is empty!", UserWarning)
        return None

    if val_size < 0 or test_size < 0 or (val_size + test_size) > 1:
        raise ValueError("Validation and Test size must be greater or equal than zero and sum up to maximum 1")
    if clear_output_folder:
        if os.path.exists(output_folder):
            shutil.rmtree(output_folder)
    os.makedirs(output_folder, exist_ok=True)
    os.makedirs(os.path.join(output_folder, "original"), exist_ok=True)
    if save_original_images_and_masks:
        log.info("Moving original images and masks to dataset folder...")
        os.makedirs(os.path.join(output_folder, "original", "images"), exist_ok=True)
        os.makedirs(os.path.join(output_folder, "original", "masks"), exist_ok=True)

        for i, item in enumerate(data_dictionary):
            img_new_path = os.path.join("original", "images", item["base_name"])
            shutil.copy(item["path"], os.path.join(output_folder, img_new_path))
            data_dictionary[i]["path"] = img_new_path

            if item["mask"] is not None:
                mask = cv2.imread(item["mask"])
                if mask_preprocessing is not None:
                    mask = mask_preprocessing(mask).astype(np.uint8)
                mask_new_path = os.path.join("original", "masks", os.path.splitext(item["base_name"])[0] + ".png")
                cv2.imwrite(os.path.join(output_folder, mask_new_path), mask)
                data_dictionary[i]["mask"] = mask_new_path

    shuffled_indices = np.random.default_rng(seed).permutation(len(data_dictionary))
    data_dictionary = [data_dictionary[i] for i in shuffled_indices]
    log.info("Performing multilabel stratification...")
    train_data_dictionary, val_data_dictionary, test_data_dictionary = multilabel_stratification(
        output_folder=output_folder,
        data_dictionary=data_dictionary,
        num_classes=len(class_to_idx.values()),
        val_size=val_size,
        test_size=test_size,
    )

    log.info("Train set size: %d", len(train_data_dictionary))
    log.info("Validation set size: %d", len(val_data_dictionary))
    log.info("Test set size: %d", len(test_data_dictionary))

    idx_to_class = {v: k for (k, v) in class_to_idx.items()}

    os.makedirs(output_folder, exist_ok=True)

    dataset_info = {
        "patch_size": patch_size,
        "patch_number": patch_number,
        "overlap": overlap,
        "annotated_good": annotated_good,
        "train_files": [{"image_path": x["path"], "mask_path": x["mask"]} for x in train_data_dictionary],
        "val_files": [{"image_path": x["path"], "mask_path": x["mask"]} for x in val_data_dictionary],
        "test_files": [{"image_path": x["path"], "mask_path": x["mask"]} for x in test_data_dictionary],
    }

    with open(os.path.join(output_folder, "info.json"), "w") as f:
        json.dump(dataset_info, f)

    if len(train_data_dictionary) > 0:
        log.info("Generating train set")
        generate_patch_sampling_dataset(
            data_dictionary=train_data_dictionary,
            patch_number=patch_number,
            patch_size=patch_size,
            overlap=overlap,
            idx_to_class=idx_to_class,
            balance_defects=balance_defects,
            repeat_good_images=repeat_good_images,
            output_folder=output_folder,
            subfolder_name="train",
            train_filename=train_filename,
            annotated_good=annotated_good if annotated_good is None else [class_to_idx[x] for x in annotated_good],
            num_workers=num_workers,
        )

    for phase, split_dict in zip(["val", "test"], [val_data_dictionary, test_data_dictionary]):
        if len(split_dict) > 0:
            log.info("Generating %s set", phase)
            generate_patch_sliding_window_dataset(
                data_dictionary=split_dict,
                patch_number=patch_number,
                patch_size=patch_size,
                overlap=overlap,
                output_folder=output_folder,
                subfolder_name=phase,
                area_threshold=area_threshold,
                area_defect_threshold=area_defect_threshold,
                mask_extension=mask_extension,
                mask_output_folder=mask_output_folder,
                save_mask=save_mask,
                class_to_idx=class_to_idx,
            )

    log.info("All done! Datasets saved to %s", output_folder)

    return dataset_info

get_image_mask_association(data_folder, mask_folder=None, mask_extension='', warning_on_missing_mask=True)

Function used to match images and mask from a folder or sub-folders.

Parameters:

  • data_folder (str) –

    root data folder containing images or images and masks

  • mask_folder (Optional[str], default: None ) –

    Optional root directory used to search only the masks

  • mask_extension (str, default: '' ) –

    extension used to identify the mask file, it's mandatory if mask_folder is not specified warning_on_missing_mask: if set to True a warning will be raised if a mask is missing, disable if you know that many images do not have a mask.

  • warning_on_missing_mask (bool, default: True ) –

    if set to True a warning will be raised if a mask is missing, disable if you know

Returns:

  • List[Dict]

    List of dict like:

  • List[Dict]

    [

  • List[Dict]

    { 'base_name': '161927.tiff', 'path': 'test_dataset_patch/images/161927.tiff', 'mask': 'test_dataset_patch/masks/161927_mask.tiff'

  • List[Dict]

    }, ...

  • List[Dict]

    ]

Source code in quadra/utils/patch/dataset.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def get_image_mask_association(
    data_folder: str,
    mask_folder: Optional[str] = None,
    mask_extension: str = "",
    warning_on_missing_mask: bool = True,
) -> List[Dict]:
    """Function used to match images and mask from a folder or sub-folders.

    Args:
        data_folder: root data folder containing images or images and masks
        mask_folder: Optional root directory used to search only the masks
        mask_extension: extension used to identify the mask file, it's mandatory if mask_folder is not specified
            warning_on_missing_mask: if set to True a warning will be raised if a mask is missing, disable if you know
            that many images do not have a mask.
        warning_on_missing_mask: if set to True a warning will be raised if a mask is missing, disable if you know

    Returns:
        List of dict like:
        [
        {
            'base_name': '161927.tiff',
            'path': 'test_dataset_patch/images/161927.tiff',
            'mask': 'test_dataset_patch/masks/161927_mask.tiff'
        }, ...
        ]
    """
    # get all the images from the data folder
    data_images = glob.glob(os.path.join(data_folder, "**", "*"), recursive=True)

    basenames = [os.path.splitext(os.path.basename(image))[0] for image in data_images]

    if len(set(basenames)) != len(basenames):
        raise ValueError("Found multiple images with the same name and different extension, this is not supported.")

    log.info("Found: %d images in %s", len(data_images), data_folder)
    # divide images and mask if in the same folder
    # if mask folder is specified search mask in that folder
    if mask_folder:
        masks_images = []
        for basename in basenames:
            mask_path = os.path.join(mask_folder, f"{basename}{mask_extension}.*")
            mask_path_list = glob.glob(mask_path)

            if len(mask_path_list) == 1:
                masks_images.append(mask_path_list[0])
            elif warning_on_missing_mask:
                log.warning("Mask for %s not found", basename)
    else:
        if mask_extension == "":
            raise ValueError("If no mask folder is provided, mask extension is mandatory it cannot be empty.")

        masks_images = [image for image in data_images if mask_extension in image]
        data_images = [image for image in data_images if mask_extension not in image]

    # build support dictionary
    unique_images = [{"base_name": os.path.basename(image), "path": image, "mask": None} for image in data_images]

    images_stem = [os.path.splitext(str(image["base_name"]))[0] + mask_extension for image in unique_images]
    masks_stem = [os.path.splitext(os.path.basename(mask))[0] for mask in masks_images]

    # search corrispondency between file or folders
    for i, image_stem in enumerate(images_stem):
        if image_stem in masks_stem:
            unique_images[i]["mask"] = masks_images[masks_stem.index(image_stem)]

    log.info("Unique images with mask: %d", len([uni for uni in unique_images if uni.get("mask") is not None]))
    log.info("Unique images with no mask: %d", len([uni for uni in unique_images if uni.get("mask") is None]))

    return unique_images

plot_patch_reconstruction(reconstruction, idx_to_class, class_to_idx, ignore_classes=None, is_polygon=True)

Helper function for plotting the patch reconstruction.

Parameters:

  • reconstruction (Dict) –

    Dict following this structure { "file_path": str, "mask_path": str, "prediction": { "label": str, "points": [{"x": int, "y": int}] } } if is_polygon else { "file_path": str, "mask_path": str, "prediction": np.ndarray }

  • idx_to_class (Dict[int, str]) –

    Dictionary mapping indices to label names

  • class_to_idx (Dict[str, int]) –

    Dictionary mapping class names to indices

  • ignore_classes (Optional[List[int]], default: None ) –

    Eventually the classes to not plot

  • is_polygon (bool, default: True ) –

    Boolean indicating if the prediction is a polygon or a mask.

Returns:

  • Figure

    Matplotlib plot showing predicted patch regions and eventually gt

Source code in quadra/utils/patch/visualization.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def plot_patch_reconstruction(
    reconstruction: Dict,
    idx_to_class: Dict[int, str],
    class_to_idx: Dict[str, int],
    ignore_classes: Optional[List[int]] = None,
    is_polygon: bool = True,
) -> Figure:
    """Helper function for plotting the patch reconstruction.

    Args:
        reconstruction: Dict following this structure
            {
                "file_path": str,
                "mask_path": str,
                "prediction": {
                    "label": str,
                    "points": [{"x": int, "y": int}]
                }
            } if is_polygon else
            {
                "file_path": str,
                "mask_path": str,
                "prediction": np.ndarray
            }
        idx_to_class: Dictionary mapping indices to label names
        class_to_idx: Dictionary mapping class names to indices
        ignore_classes: Eventually the classes to not plot
        is_polygon: Boolean indicating if the prediction is a polygon or a mask.

    Returns:
        Matplotlib plot showing predicted patch regions and eventually gt

    """
    cmap_name = "tab10"

    # 10 classes + good
    if len(idx_to_class.values()) > 11:
        cmap_name = "tab20"

    cmap = get_cmap(cmap_name)
    test_img = cv2.imread(reconstruction["image_path"])
    test_img = cv2.cvtColor(test_img, cv2.COLOR_BGR2RGB)
    gt_img = None

    if reconstruction["mask_path"] is not None and os.path.isfile(reconstruction["mask_path"]):
        gt_img = cv2.imread(reconstruction["mask_path"], 0)

    out = np.zeros((test_img.shape[0], test_img.shape[1]), dtype=np.uint8)

    if is_polygon:
        for _, region in enumerate(reconstruction["prediction"]):
            points = [[item["x"], item["y"]] for item in region["points"]]
            c_label = region["label"]

            out = cv2.drawContours(
                out,
                np.array([points], np.int32),
                -1,
                class_to_idx[c_label],
                thickness=cv2.FILLED,
            )
    else:
        out = reconstruction["prediction"]

    fig = plot_patch_results(
        image=test_img,
        prediction_image=out,
        ground_truth_image=gt_img,
        plot_original=True,
        ignore_classes=ignore_classes,
        save_path=None,
        class_to_idx=class_to_idx,
        cmap=cmap,
    )

    return fig

plot_patch_results(image, prediction_image, ground_truth_image, class_to_idx, plot_original=True, ignore_classes=None, image_height=10, save_path=None, cmap=get_cmap('tab20'))

Function used to plot the image predicted Args: prediction_image: The prediction image image: The original image to plot ground_truth_image: The ground truth image class_to_idx: Dictionary mapping class names to indices plot_original: Boolean to plot the original image ignore_classes: The classes to ignore, default is 0 image_height: The height of the output figure save_path: The path to save the figure cmap: The colormap to use.

Returns:

  • Figure

    The matplotlib figure

Source code in quadra/utils/patch/visualization.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def plot_patch_results(
    image: np.ndarray,
    prediction_image: np.ndarray,
    ground_truth_image: Optional[np.ndarray],
    class_to_idx: Dict[str, int],
    plot_original: bool = True,
    ignore_classes: Optional[List[int]] = None,
    image_height: int = 10,
    save_path: Optional[str] = None,
    cmap: Colormap = get_cmap("tab20"),
) -> Figure:
    """Function used to plot the image predicted
    Args:
        prediction_image: The prediction image
        image: The original image to plot
        ground_truth_image: The ground truth image
        class_to_idx: Dictionary mapping class names to indices
        plot_original: Boolean to plot the original image
        ignore_classes: The classes to ignore, default is 0
        image_height: The height of the output figure
        save_path: The path to save the figure
        cmap: The colormap to use.

    Returns:
        The matplotlib figure
    """
    if ignore_classes is None:
        ignore_classes = [0]

    image = image[0 : prediction_image.shape[0], 0 : prediction_image.shape[1], :]
    idx_to_class = {v: k for k, v in class_to_idx.items()}

    if ignore_classes is not None:
        class_to_idx = {k: v for k, v in class_to_idx.items() if v not in ignore_classes}

    class_idxs = list(class_to_idx.values())

    cmap = {str(c): tuple(int(i * 255) for i in cmap(c / len(class_idxs))[:-1]) for c in class_idxs}
    output_images = []
    titles = []

    if plot_original:
        output_images.append(image)
        titles.append("Original Image")

    if ground_truth_image is not None:
        ground_truth_image = ground_truth_image[0 : prediction_image.shape[0], 0 : prediction_image.shape[1]]
        ground_truth_mask = create_rgb_mask(ground_truth_image, cmap, ignore_classes=ignore_classes)
        output_images.append(ground_truth_mask)
        titles.append("Ground Truth Mask")

    prediction_mask = create_rgb_mask(
        prediction_image,
        cmap,
        ignore_classes=ignore_classes,
    )

    output_images.append(prediction_mask)
    titles.append("Prediction Mask")
    if ignore_classes is not None and ground_truth_image is not None:
        prediction_mask = create_rgb_mask(
            prediction_image, cmap, ignore_classes=ignore_classes, ground_truth_mask=ground_truth_image
        )

        ignored_classes_str = [idx_to_class[c] for c in ignore_classes]
        prediction_title = f"Prediction Mask \n (Ignoring Ground Truth Class: {ignored_classes_str})"
        output_images.append(prediction_mask)
        titles.append(prediction_title)

    fig, axs = plt.subplots(
        ncols=len(output_images),
        nrows=1,
        figsize=(len(output_images) * image_height, image_height),
        squeeze=False,
        facecolor="white",
    )

    for i, output_image in enumerate(output_images):
        axs[0, i].imshow(show_mask_on_image(image, output_image))
        axs[0, i].set_title(titles[i])
        axs[0, i].axis("off")

    custom_lines = [Line2D([0], [0], color=tuple(i / 255.0 for i in cmap[str(c)]), lw=4) for c in class_idxs]
    custom_labels = list(class_to_idx.keys())
    axs[0, -1].legend(custom_lines, custom_labels, loc="center left", bbox_to_anchor=(1.01, 0.81), borderaxespad=0)
    if save_path is not None:
        plt.savefig(save_path, bbox_inches="tight")
        plt.close()

    return fig

reconstruct_patch(input_img_shape, patch_size, pred, patch_num_h, patch_num_w, idx_to_class, step, return_polygon=True, method='priority')

Reconstructs the prediction image from the patches.

Parameters:

  • input_img_shape (Tuple[int, ...]) –

    The size of the reconstructed image

  • patch_size (Tuple[int, int]) –

    Array defining the patch size

  • pred (ndarray) –

    Numpy array containing reconstructed prediction (patch_num_h x patch_num_w)

  • patch_num_h (int) –

    Number of vertical patches

  • patch_num_w (int) –

    Number of horizontal patches

  • idx_to_class (Dict) –

    Dictionary mapping indices to labels

  • step (Tuple[int, int]) –

    Array defining the step size to be used for reconstruction

  • return_polygon (bool, default: True ) –

    If true compute predicted polygons. Defaults to True.

  • method (str, default: 'priority' ) –

    Reconstruction method to be used. Currently supported: "priority" and "major_voting"

Returns:

  • Tuple[ndarray, List[Dict]]

    (reconstructed_prediction_image, predictions) where predictions is an array of objects [{ "label": Predicted_label, "points": List of dict coordinates "x" and "y" representing the points of a polygon that surrounds an image area covered by patches of label = predicted_label }]

Source code in quadra/utils/patch/metrics.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def reconstruct_patch(
    input_img_shape: Tuple[int, ...],
    patch_size: Tuple[int, int],
    pred: np.ndarray,
    patch_num_h: int,
    patch_num_w: int,
    idx_to_class: Dict,
    step: Tuple[int, int],
    return_polygon: bool = True,
    method: str = "priority",
) -> Tuple[np.ndarray, List[Dict]]:
    """Reconstructs the prediction image from the patches.

    Args:
        input_img_shape: The size of the reconstructed image
        patch_size: Array defining the patch size
        pred: Numpy array containing reconstructed prediction (patch_num_h x patch_num_w)
        patch_num_h: Number of vertical patches
        patch_num_w: Number of horizontal patches
        idx_to_class: Dictionary mapping indices to labels
        step: Array defining the step size to be used for reconstruction
        return_polygon: If true compute predicted polygons. Defaults to True.
        method: Reconstruction method to be used. Currently supported: "priority" and "major_voting"

    Returns:
        (reconstructed_prediction_image, predictions) where predictions is an array of objects
            [{
                "label": Predicted_label,
                "points": List of dict coordinates "x" and "y" representing the points of a polygon that
                    surrounds an image area covered by patches of label = predicted_label
            }]
    """
    if method == "priority":
        return _reconstruct_patch_priority(
            input_img_shape,
            patch_size,
            pred,
            patch_num_h,
            patch_num_w,
            idx_to_class,
            step,
            return_polygon,
        )
    if method == "major_voting":
        return _reconstruct_patch_major_voting(
            input_img_shape,
            patch_size,
            pred,
            patch_num_h,
            patch_num_w,
            idx_to_class,
            step,
            return_polygon,
        )

    raise ValueError(f"Invalid reconstruction method {method}")

save_classification_result(results, output_folder, confusion_matrix, accuracy, test_dataloader, reconstructions, config, output, ignore_classes=None)

Save classification results.

Parameters:

  • results (DataFrame) –

    Dataframe containing the classification results

  • output_folder (str) –

    Folder where to save the results

  • confusion_matrix (Optional[DataFrame]) –

    Confusion matrix

  • accuracy (float) –

    Accuracy of the model

  • test_dataloader (DataLoader) –

    Dataloader used for testing

  • reconstructions (List[Dict]) –

    List of dictionaries containing polygons or masks

  • config (DictConfig) –

    Experiment configuration

  • output (DictConfig) –

    Output configuration

  • ignore_classes (Optional[List[int]], default: None ) –

    Eventual classes to ignore during reconstruction plot. Defaults to None.

Source code in quadra/utils/patch/model.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def save_classification_result(
    results: pd.DataFrame,
    output_folder: str,
    confusion_matrix: Optional[pd.DataFrame],
    accuracy: float,
    test_dataloader: DataLoader,
    reconstructions: List[Dict],
    config: DictConfig,
    output: DictConfig,
    ignore_classes: Optional[List[int]] = None,
):
    """Save classification results.

    Args:
        results: Dataframe containing the classification results
        output_folder: Folder where to save the results
        confusion_matrix: Confusion matrix
        accuracy: Accuracy of the model
        test_dataloader: Dataloader used for testing
        reconstructions: List of dictionaries containing polygons or masks
        config: Experiment configuration
        output: Output configuration
        ignore_classes: Eventual classes to ignore during reconstruction plot. Defaults to None.
    """
    # Save csv
    results.to_csv(os.path.join(output_folder, "test_results.csv"), index_label="index")

    if confusion_matrix is not None:
        # Save confusion matrix
        disp = ConfusionMatrixDisplay(
            confusion_matrix=np.array(confusion_matrix),
            display_labels=[x.replace("pred:", "") for x in confusion_matrix.columns.to_list()],
        )
        disp.plot(include_values=True, cmap=plt.cm.Greens, ax=None, colorbar=False, xticks_rotation=90)
        plt.title(f"Confusion Matrix (Accuracy: {(accuracy * 100):.2f}%)")
        plt.savefig(
            os.path.join(output_folder, "test_confusion_matrix.png"),
            bbox_inches="tight",
            pad_inches=0,
            dpi=300,
        )
        plt.close()

    if output.example:
        if not hasattr(test_dataloader.dataset, "idx_to_class"):
            raise ValueError("The provided dataset does not have an attribute 'idx_to_class")

        idx_to_class = test_dataloader.dataset.idx_to_class

        # Get misclassified samples
        example_folder = os.path.join(output_folder, "example")
        if not os.path.isdir(example_folder):
            os.makedirs(example_folder)

        # Skip if no no ground truth is available
        if not all(results["real_label"] == -1):
            for v in np.unique([results["real_label"], results["pred_label"]]):
                if v == -1:
                    continue

                k = idx_to_class[v]

                if ignore_classes is not None and v in ignore_classes:
                    continue

                plot_classification_results(
                    test_dataloader.dataset,
                    unorm=UnNormalize(mean=config.transforms.mean, std=config.transforms.std),
                    pred_labels=results["pred_label"].to_numpy(),
                    test_labels=results["real_label"].to_numpy(),
                    class_name=k,
                    original_folder=example_folder,
                    idx_to_class=idx_to_class,
                    pred_class_to_plot=v,
                    what="con",
                    rows=output.get("rows", 3),
                    cols=output.get("cols", 2),
                    figsize=output.get("figsize", (20, 20)),
                )

                plot_classification_results(
                    test_dataloader.dataset,
                    unorm=UnNormalize(mean=config.transforms.mean, std=config.transforms.std),
                    pred_labels=results["pred_label"].to_numpy(),
                    test_labels=results["real_label"].to_numpy(),
                    class_name=k,
                    original_folder=example_folder,
                    idx_to_class=idx_to_class,
                    pred_class_to_plot=v,
                    what="dis",
                    rows=output.get("rows", 3),
                    cols=output.get("cols", 2),
                    figsize=output.get("figsize", (20, 20)),
                )

        for counter, reconstruction in enumerate(reconstructions):
            is_polygon = True
            if isinstance(reconstruction["prediction"], np.ndarray):
                is_polygon = False

            if is_polygon:
                if len(reconstruction["prediction"]) == 0:
                    continue
            else:
                if reconstruction["prediction"].sum() == 0:
                    continue

            if counter > 5:
                break

            to_plot = plot_patch_reconstruction(
                reconstruction,
                idx_to_class,
                class_to_idx=test_dataloader.dataset.class_to_idx,  # type: ignore[attr-defined]
                ignore_classes=ignore_classes,
                is_polygon=is_polygon,
            )

            if to_plot:
                output_name = f"reconstruction_{os.path.splitext(os.path.basename(reconstruction['file_name']))[0]}.png"
                plt.savefig(os.path.join(example_folder, output_name), bbox_inches="tight", pad_inches=0)

            plt.close()