Source code for aif360.algorithms.postprocessing.reject_option_classification

import numpy as np
from warnings import warn

from aif360.algorithms import Transformer
from aif360.metrics import utils
from aif360.metrics import BinaryLabelDatasetMetric, ClassificationMetric


[docs]class RejectOptionClassification(Transformer): """Reject option classification is a postprocessing technique that gives favorable outcomes to unpriviliged groups and unfavorable outcomes to priviliged groups in a confidence band around the decision boundary with the highest uncertainty [10]_. References: .. [10] F. Kamiran, A. Karim, and X. Zhang, "Decision Theory for Discrimination-Aware Classification," IEEE International Conference on Data Mining, 2012. """ def __init__(self, unprivileged_groups, privileged_groups, low_class_thresh=0.01, high_class_thresh=0.99, num_class_thresh=100, num_ROC_margin=50, metric_name="Statistical parity difference", metric_ub=0.05, metric_lb=-0.05): """ Args: unprivileged_groups (dict or list(dict)): Representation for unprivileged group. privileged_groups (dict or list(dict)): Representation for privileged group. low_class_thresh (float): Smallest classification threshold to use in the optimization. Should be between 0. and 1. high_class_thresh (float): Highest classification threshold to use in the optimization. Should be between 0. and 1. num_class_thresh (int): Number of classification thresholds between low_class_thresh and high_class_thresh for the optimization search. Should be > 0. num_ROC_margin (int): Number of relevant ROC margins to be used in the optimization search. Should be > 0. metric_name (str): Name of the metric to use for the optimization. Allowed options are "Statistical parity difference", "Average odds difference", "Equal opportunity difference". metric_ub (float): Upper bound of constraint on the metric value metric_lb (float): Lower bound of constraint on the metric value """ super(RejectOptionClassification, self).__init__( unprivileged_groups=unprivileged_groups, privileged_groups=privileged_groups, low_class_thresh=low_class_thresh, high_class_thresh=high_class_thresh, num_class_thresh=num_class_thresh, num_ROC_margin=num_ROC_margin, metric_name=metric_name) allowed_metrics = ["Statistical parity difference", "Average odds difference", "Equal opportunity difference"] self.unprivileged_groups = unprivileged_groups self.privileged_groups = privileged_groups self.low_class_thresh = low_class_thresh self.high_class_thresh = high_class_thresh self.num_class_thresh = num_class_thresh self.num_ROC_margin = num_ROC_margin self.metric_name = metric_name self.metric_ub = metric_ub self.metric_lb = metric_lb self.classification_threshold = None self.ROC_margin = None if ((self.low_class_thresh < 0.0) or (self.low_class_thresh > 1.0) or\ (self.high_class_thresh < 0.0) or (self.high_class_thresh > 1.0) or\ (self.low_class_thresh >= self.high_class_thresh) or\ (self.num_class_thresh < 1) or (self.num_ROC_margin < 1)): raise ValueError("Input parameter values out of bounds") if metric_name not in allowed_metrics: raise ValueError("metric name not in the list of allowed metrics")
[docs] def fit(self, dataset_true, dataset_pred): """Estimates the optimal classification threshold and margin for reject option classification that optimizes the metric provided. Note: The `fit` function is a no-op for this algorithm. Args: dataset_true (BinaryLabelDataset): Dataset containing the true `labels`. dataset_pred (BinaryLabelDataset): Dataset containing the predicted `scores`. Returns: RejectOptionClassification: Returns self. """ fair_metric_arr = np.zeros(self.num_class_thresh*self.num_ROC_margin) balanced_acc_arr = np.zeros_like(fair_metric_arr) ROC_margin_arr = np.zeros_like(fair_metric_arr) class_thresh_arr = np.zeros_like(fair_metric_arr) cnt = 0 # Iterate through class thresholds for class_thresh in np.linspace(self.low_class_thresh, self.high_class_thresh, self.num_class_thresh): self.classification_threshold = class_thresh if class_thresh <= 0.5: low_ROC_margin = 0.0 high_ROC_margin = class_thresh else: low_ROC_margin = 0.0 high_ROC_margin = (1.0-class_thresh) # Iterate through ROC margins for ROC_margin in np.linspace( low_ROC_margin, high_ROC_margin, self.num_ROC_margin): self.ROC_margin = ROC_margin # Predict using the current threshold and margin dataset_transf_pred = self.predict(dataset_pred) dataset_transf_metric_pred = BinaryLabelDatasetMetric( dataset_transf_pred, unprivileged_groups=self.unprivileged_groups, privileged_groups=self.privileged_groups) classified_transf_metric = ClassificationMetric( dataset_true, dataset_transf_pred, unprivileged_groups=self.unprivileged_groups, privileged_groups=self.privileged_groups) ROC_margin_arr[cnt] = self.ROC_margin class_thresh_arr[cnt] = self.classification_threshold # Balanced accuracy and fairness metric computations balanced_acc_arr[cnt] = 0.5*(classified_transf_metric.true_positive_rate()\ +classified_transf_metric.true_negative_rate()) if self.metric_name == "Statistical parity difference": fair_metric_arr[cnt] = dataset_transf_metric_pred.mean_difference() elif self.metric_name == "Average odds difference": fair_metric_arr[cnt] = classified_transf_metric.average_odds_difference() elif self.metric_name == "Equal opportunity difference": fair_metric_arr[cnt] = classified_transf_metric.equal_opportunity_difference() cnt += 1 rel_inds = np.logical_and(fair_metric_arr >= self.metric_lb, fair_metric_arr <= self.metric_ub) if any(rel_inds): best_ind = np.where(balanced_acc_arr[rel_inds] == np.max(balanced_acc_arr[rel_inds]))[0][0] else: warn("Unable to satisy fairness constraints") rel_inds = np.ones(len(fair_metric_arr), dtype=bool) best_ind = np.where(fair_metric_arr[rel_inds] == np.min(fair_metric_arr[rel_inds]))[0][0] self.ROC_margin = ROC_margin_arr[rel_inds][best_ind] self.classification_threshold = class_thresh_arr[rel_inds][best_ind] return self
[docs] def predict(self, dataset): """Obtain fair predictions using the ROC method. Args: dataset (BinaryLabelDataset): Dataset containing scores that will be used to compute predicted labels. Returns: dataset_pred (BinaryLabelDataset): Output dataset with potentially fair predictions obtain using the ROC method. """ dataset_new = dataset.copy(deepcopy=False) fav_pred_inds = (dataset.scores > self.classification_threshold) unfav_pred_inds = ~fav_pred_inds y_pred = np.zeros(dataset.scores.shape) y_pred[fav_pred_inds] = dataset.favorable_label y_pred[unfav_pred_inds] = dataset.unfavorable_label # Indices of critical region around the classification boundary crit_region_inds = np.logical_and( dataset.scores <= self.classification_threshold+self.ROC_margin, dataset.scores > self.classification_threshold-self.ROC_margin) # Indices of privileged and unprivileged groups cond_priv = utils.compute_boolean_conditioning_vector( dataset.protected_attributes, dataset.protected_attribute_names, self.privileged_groups) cond_unpriv = utils.compute_boolean_conditioning_vector( dataset.protected_attributes, dataset.protected_attribute_names, self.unprivileged_groups) # New, fairer labels dataset_new.labels = y_pred dataset_new.labels[np.logical_and(crit_region_inds, cond_priv.reshape(-1,1))] = dataset.unfavorable_label dataset_new.labels[np.logical_and(crit_region_inds, cond_unpriv.reshape(-1,1))] = dataset.favorable_label return dataset_new
[docs] def fit_predict(self, dataset_true, dataset_pred): """fit and predict methods sequentially.""" return self.fit(dataset_true, dataset_pred).predict(dataset_pred)
# Function to obtain the pareto frontier def _get_pareto_frontier(costs, return_mask = True): # <- Fastest for many points """ :param costs: An (n_points, n_costs) array :param return_mask: True to return a mask, False to return integer indices of efficient points. :return: An array of indices of pareto-efficient points. If return_mask is True, this will be an (n_points, ) boolean array Otherwise it will be a (n_efficient_points, ) integer array of indices. adapted from: https://stackoverflow.com/questions/32791911/fast-calculation-of-pareto-front-in-python """ is_efficient = np.arange(costs.shape[0]) n_points = costs.shape[0] next_point_index = 0 # Next index in the is_efficient array to search for while next_point_index<len(costs): nondominated_point_mask = np.any(costs<=costs[next_point_index], axis=1) is_efficient = is_efficient[nondominated_point_mask] # Remove dominated points costs = costs[nondominated_point_mask] next_point_index = np.sum(nondominated_point_mask[:next_point_index])+1 if return_mask: is_efficient_mask = np.zeros(n_points, dtype = bool) is_efficient_mask[is_efficient] = True return is_efficient_mask else: return is_efficient