Source code for fairensics.methods.disparate_mistreatment

"""Wrapper for the function "DisparateMistreatment" from fair-classification.

Besides fit() and predict() the class contains three functions:

Two functions taken from fair-classification performing the actual training:
    _train_model_disp_mist():
    _get_constraint_list_cov():

And a function to transform protected attributes into a dict:
    _get_cov_thresh_dict():

Original code:
    https://github.com/mbilalzafar/fair-classification

"""
import warnings

import cvxpy
import numpy as np
from aif360.algorithms import Transformer
from aif360.datasets.binary_label_dataset import BinaryLabelDataset

from .fairness_warnings import FairnessBoundsWarning, DataSetSkewedWarning
from .utils import (
    add_intercept,
    get_protected_attributes_dict,
    get_one_hot_encoding,
    LossFunctions,
)
from ..fairensics_utils import get_unprotected_attributes


[docs]class DisparateMistreatment(Transformer): """Disparate mistreatment free classifier. Loss "L" defines whether a logistic regression or a liner svm is trained. Minimize L(w) Subject to cov(sensitive_attributes, predictions) < sensitive_attrs_to_cov_thresh Where predictions: the distance to the decision boundary Example: https://github.com/nikikilbertus/fairensics/blob/master/examples/2_2_fair-classification-mistreatment-example.ipynb """ # The original function takes an integer for the desired constraint. # The index in the list corresponds to the integer value of each constraint _CONS_TYPES = ["all", "fpr", "fnr", None, "fprfnr"]
[docs] def __init__( self, loss_function=LossFunctions.NAME_LOG_REG, constraint_type=None, take_initial_sol=True, warn=True, tau=0.005, mu=1.2, EPS=1e-6, max_iter=100, max_iter_dccp=50, ): """ Args: loss_function (str): name of loss function defined in utils constraint_type (str): one of the values in _CONS_TYPE take_initial_sol (bool): warn (bool): if true, warnings are raised on certain bounds tau, mu, EPS, max_iter, max_iter_dccp: solver related parameters """ assert constraint_type in self._CONS_TYPES super(DisparateMistreatment, self).__init__() self._params = {} self._initialized = False self._loss_function = LossFunctions.get_cvxpy_loss_function( loss_function ) self._constraint_type = self._CONS_TYPES.index(constraint_type) self._take_initial_sol = take_initial_sol self._warn = warn self._tau = tau self._mu = mu self._EPS = EPS self._max_iter = max_iter # for the convex program self._max_iter_dccp = max_iter_dccp # for the dccp algo
[docs] def fit( self, dataset: BinaryLabelDataset, sensitive_attrs_to_cov_thresh=0 ): """Fits the model. Args: dataset: AIF360 data set sensitive_attrs_to_cov_thresh (dict or float): covariance between sensitive attribute and decision boundary """ if self._warn: dataset_warning = DataSetSkewedWarning(dataset) dataset_warning.check_dataset() # if sensitive_attrs_to_cov_thresh is not a dict, each sensitive # attribute gets the same threshold if not isinstance(sensitive_attrs_to_cov_thresh, dict): sensitive_attrs_to_cov_thresh = self._get_cov_thresh_dict( sensitive_attrs_to_cov_thresh, dataset.protected_attribute_names, ) # fair-classification takes the protected attributes as dict protected_attributes_dict = get_protected_attributes_dict( dataset.protected_attribute_names, dataset.protected_attributes ) # map labels to -1 and 1 temp_labels = dataset.labels.copy() temp_labels[(dataset.labels == dataset.favorable_label)] = 1.0 temp_labels[(dataset.labels == dataset.unfavorable_label)] = -1.0 self._params["w"] = self._train_model_disp_mist( add_intercept(get_unprotected_attributes(dataset)), temp_labels.ravel(), protected_attributes_dict, sensitive_attrs_to_cov_thresh, ) self._initialized = True return self
[docs] def predict(self, dataset: BinaryLabelDataset): """Make predictions. Args: dataset: AIF360 data set Returns: either AIF360 data set or np.array if dataset is also np.array """ if not self._initialized: raise ValueError("Model not initialized. Run fit() first.") # TODO: ok? if isinstance(dataset, np.ndarray): return np.sign(np.dot(add_intercept(dataset), self._params["w"])) dataset_new = dataset.copy(deepcopy=True) dataset_new.labels = np.sign( np.dot( add_intercept(get_unprotected_attributes(dataset)), self._params["w"], ) ) # Map the dataset labels to back to their original values. temp_labels = dataset.labels.copy() temp_labels[(dataset_new.labels == 1.0)] = dataset.favorable_label temp_labels[(dataset_new.labels == -1.0)] = dataset.unfavorable_label dataset_new.labels = temp_labels.copy() if self._warn: bound_warnings = FairnessBoundsWarning(dataset, dataset_new) bound_warnings.check_bounds() return dataset_new
def _train_model_disp_mist( self, x, y, x_control, sensitive_attrs_to_cov_thresh ): """Trains model. Args: x (np.ndarray): 2D array of unprotected features with intercept y (np.ndarray): 1D numpy array of targets x_control (dict): {"s": [...]}, key "s" is the sensitive feature name and value a 1D vector of values sensitive_attrs_to_cov_thresh: covariance threshold for each cons_type, eg, key 1 for FPR Returns: w (np.ndarray): 1D array of the learned weights """ _num_points, num_features = x.shape w = cvxpy.Variable(num_features) # this is the weight vector w.value = np.random.rand(num_features) constraints = [] if self._constraint_type != self._CONS_TYPES.index(None): constraints = self._get_constraint_list( x, y, x_control, sensitive_attrs_to_cov_thresh, self._constraint_type, w, ) loss = self._loss_function(w, x, y) # take the solution of the unconstrained classifier as starting point if self._take_initial_sol: p = cvxpy.Problem(cvxpy.Minimize(loss), []) p.solve() # construct the constraint cvxpy problem prob = cvxpy.Problem(cvxpy.Minimize(loss), constraints) prob.solve( method="dccp", tau=self._tau, mu=self._mu, tau_max=1e10, solver=cvxpy.ECOS, verbose=False, feastol=self._EPS, abstol=self._EPS, reltol=self._EPS, feastol_inacc=self._EPS, abstol_inacc=self._EPS, reltol_inacc=self._EPS, max_iters=self._max_iter, max_iter=self._max_iter_dccp, ) # check whether constraints are satisfied for f_c in constraints: if not f_c.value(): warnings.warn( "Solver hasn't satisfied constraint." " Make sure that constraints are satisfied empirically." " Alternatively, consider increasing tau parameter" ) w = np.array(w.value).flatten() # flatten converts it to a 1d array return w @staticmethod def _get_constraint_list( x_train, y_train, x_control_train, sensitive_attrs_to_cov_thresh, cons_type, w, ): """Get the list of constraints to be fed to the minimizer. cons_type == 0: means the whole combined miss classification constraint (without FNR or FPR) cons_type == 1: FPR constraint cons_type == 2: FNR constraint cons_type == 4: both FPR as well as FNR constraints Args: sensitive_attrs_to_cov_thresh (dict): {s: {cov_type: val}}, s is the sensitive attr cov_type. covariance type. contains the covariance for all miss classifications, FPR and for FNR etc Returns: constraints (list(str)): accuracy constraints in cvxpy format. https://www.cvxpy.org/api_reference/cvxpy.constraints.html# """ constraints = [] for attr in sensitive_attrs_to_cov_thresh.keys(): attr_arr = x_control_train[attr] attr_arr_transformed, index_dict = get_one_hot_encoding(attr_arr) # binary attr, attr_arr_transformed = the attr_arr if index_dict is None: # constrain type -> sens_attr_val -> total number s_val_to_total = {ct: {} for ct in [0, 1, 2]} s_val_to_avg = {ct: {} for ct in [0, 1, 2]} # sum of entities (females and males) in constraints cons_sum_dict = {ct: {} for ct in [0, 1, 2]} for v in set(attr_arr): s_val_to_total[0][v] = np.sum(x_control_train[attr] == v) # FPR constraint so we only consider the ground truth # negative dataset for computing the covariance where = np.logical_and( x_control_train[attr] == v, y_train == -1 ) s_val_to_total[1][v] = np.sum(where) where = np.logical_and( x_control_train[attr] == v, y_train == +1 ) s_val_to_total[2][v] = np.sum(where) for ct in [0, 1, 2]: # N1/N in our formulation, differs from one constraint type # to another denom = s_val_to_total[ct][0] + s_val_to_total[ct][1] s_val_to_avg[ct][0] = s_val_to_total[ct][1] / denom s_val_to_avg[ct][1] = 1.0 - s_val_to_avg[ct][0] # N0/N for v in set(attr_arr): idx = x_control_train[attr] == v ########################################################### # #DCCP constraints dist_bound_prod = cvxpy.multiply( y_train[idx], x_train[idx] * w ) # y.f(x) # avg miss classification distance from boundary cons_sum_dict[0][v] = cvxpy.sum( cvxpy.minimum(0, dist_bound_prod) ) * (s_val_to_avg[0][v] / len(x_train)) # avg false positive distance from boundary # (only operates on the ground truth neg dataset) cons_sum_dict[1][v] = cvxpy.sum( cvxpy.minimum( 0, cvxpy.multiply( (1 - y_train[idx]) / 2.0, dist_bound_prod ), ) ) * (s_val_to_avg[1][v] / sum(y_train == -1)) # avg false negative distance from boundary cons_sum_dict[2][v] = cvxpy.sum( cvxpy.minimum( 0, cvxpy.multiply( (1 + y_train[idx]) / 2.0, dist_bound_prod ), ) ) * (s_val_to_avg[2][v] / sum(y_train == +1)) ########################################################### if cons_type == 4: cts = [1, 2] elif cons_type in [0, 1, 2]: cts = [cons_type] else: raise Exception("Invalid constraint type") ############################################################### # DCCP constraints for ct in cts: thresh = abs( sensitive_attrs_to_cov_thresh[attr][ct][1] - sensitive_attrs_to_cov_thresh[attr][ct][0] ) constraints.append( cons_sum_dict[ct][1] <= cons_sum_dict[ct][0] + thresh ) constraints.append( cons_sum_dict[ct][1] >= cons_sum_dict[ct][0] - thresh ) ############################################################### else: # its categorical, need to set the cov thresh for each value # TODO: need to fill up this part raise NotImplementedError( "Fill constraint code for categorical" " sensitive features... Exit." ) return constraints @staticmethod def _get_cov_thresh_dict(cov_thresh, protected_attribute_names): """Return dict with covariance thresh for each attribute and error type The dict has the form: {"sensitive_attribute_name_1":{ 0:{0:cov_thresh, 1:cov_thresh}, 1:{0:cov_thresh, 1:cov_thresh}, 2:{0:cov_thresh, 1:cov_thresh}}, "sensitive_attribute_name_2":{ 0:{0:cov_thresh, 1:cov_thresh}, 1:{0:cov_thresh, 1:cov_thresh}, 2:{0:cov_thresh, 1:cov_thresh}}, ... } Args: cov_thresh (float): covariance threshold (same for each attribute) protected_attribute_names (list): list of protected attributes Returns: sensitive_attrs_to_cov_thresh (dict): """ sensitive_attrs_to_cov_thresh = {} for sens_attr_name in protected_attribute_names: sensitive_attrs_to_cov_thresh[sens_attr_name] = { 0: {0: cov_thresh, 1: cov_thresh}, 1: {0: cov_thresh, 1: cov_thresh}, 2: {0: cov_thresh, 1: cov_thresh}, } return sensitive_attrs_to_cov_thresh