Source code for fairensics.methods.preferential_fairness

"""Wrapper for PreferentialFairness from fair-classification.

Besides fit() and predict() the class contains three functions:

All three functions are part of the original code:
    _train_model_pref_fairness()
    _get_di_cons_single_boundary()
    _get_preferred_cons()

Original code:
    https://github.com/mbilalzafar/fair-classification

"""
import warnings

import cvxpy
import numpy as np
from aif360.algorithms import Transformer
from aif360.datasets.binary_label_dataset import BinaryLabelDataset

from .fairness_warnings import FairnessBoundsWarning, DataSetSkewedWarning
from .utils import add_intercept, LossFunctions
from ..fairensics_utils import get_unprotected_attributes


[docs]class PreferentialFairness(Transformer): """Train separate classifier clf_z for each group of protected attribute z. Loss "L" defines whether a logistic regression or a liner svm is trained. Minimize L(w) Subject to sum(predictions_z) > sum(predictions_z') Where predictions_z are the predictions using group zs classifier clf_z predictions_z' are the predictions using group z's classifier clf_z' Example: https://github.com/nikikilbertus/fairensics/blob/master/examples/2_3_fair-classification-preferential-fairness-example.ipynb """ # original function takes an integer for the constraint. Index in the list # corresponds to the integer value of each constraint. _CONS_TYPES = ["parity", "impact", "treatment", "both", None]
[docs] def __init__( self, loss_function=LossFunctions.NAME_LOG_REG, constraint_type=None, train_multiple=False, lam=None, warn=True, tau=0.5, mu=1.2, EPS=1e-4, max_iter=100, max_iter_dccp=50, ): """ Args: loss_function (str): name of loss function defined in utils. constraint_type (str): one of the values in _CONS_TYPE. train_multiple (bool): if true, a classifier for each group of protected attribute is trained lam (dict, optional): ... warn (bool): if true, warnings are raised on certain bounds. tau, mu, EPS, max_iter, max_iter_dccp: solver related parameters. """ assert constraint_type in self._CONS_TYPES if lam is None: if not train_multiple: lam = 0.0 else: lam = {0: 0.0, 1: 0.0} else: if train_multiple: assert isinstance(lam, dict) super(PreferentialFairness, self).__init__() self._loss_function = LossFunctions.get_cvxpy_loss_function( loss_function ) self._lam = lam self._train_multiple = train_multiple self._warn = warn self._tau = tau self._mu = mu self._EPS = EPS self._max_iter = max_iter self._max_iter_dccp = max_iter_dccp self._initialized = False self._params = {} self._constraint_type = self._CONS_TYPES.index(constraint_type)
[docs] def fit( self, dataset: BinaryLabelDataset, s_val_to_cons_sum=None, prot_attr_ind=0, ): """Fits the model. Args: dataset: AIF360 data set. s_val_to_cons_sum (dict): the ramp approximation, only needed for _constraint_type 1 and 3. prot_attr_ind (int): index of the protected feature to apply constraints to. """ if self._warn: dataset_warning = DataSetSkewedWarning(dataset) dataset_warning.check_dataset() # 1 and 3 are the constraints if s_val_to_cons_sum is None and self._constraint_type in [1, 3]: parity_clf = PreferentialFairness( constraint_type="parity", lam=0.01 ) parity_clf.fit(dataset) _, dist_dict = parity_clf._get_distance_boundary( add_intercept(get_unprotected_attributes(dataset)), dataset.protected_attributes[:, prot_attr_ind], ) s_val_to_cons_sum = parity_clf._get_sensitive_attr_cov(dist_dict) # map labels to -1 and 1 temp_labels = dataset.labels.copy() idx = np.array(dataset.labels == dataset.favorable_label).ravel() temp_labels[idx, 0] = 1.0 idx = np.array(dataset.labels == dataset.unfavorable_label).ravel() temp_labels[idx, 0] = -1.0 if self._train_multiple: obj_function = self._train_multi_model_pref_fairness else: obj_function = self._train_single_model_pref_fairness self._params["w"] = obj_function( add_intercept(get_unprotected_attributes(dataset)), temp_labels.ravel(), dataset.protected_attributes[:, 0], s_val_to_cons_sum, ) self._initialized = True return self
[docs] def predict(self, dataset: BinaryLabelDataset): """Make predictions. Args: dataset: either AIF360 data set or np.ndarray. Returns: either AIF360 data set or np.ndarray if dataset is a np.ndarray. """ if not self._initialized: raise ValueError("Model not initialized. Run fit() first.") # TODO: ok? if isinstance(dataset, np.ndarray): raise NotImplementedError dataset_new = dataset.copy(deepcopy=True) dist_arr, _ = self._get_distance_boundary( add_intercept(get_unprotected_attributes(dataset)), dataset.protected_attributes[:, 0], ) dataset_new.labels = np.sign(dist_arr) # Map the dataset labels to back to their original values. temp_labels = dataset.labels.copy() temp_labels[(dataset_new.labels == 1.0)] = dataset.favorable_label temp_labels[(dataset_new.labels == -1.0)] = dataset.unfavorable_label dataset_new.labels = temp_labels.copy() if self._warn: bound_warnings = FairnessBoundsWarning(dataset, dataset_new) bound_warnings.check_bounds() return dataset_new
def _train_single_model_pref_fairness( self, X, y, x_sensitive, s_val_to_cons_sum ): """Train a single model for all groups. Args: X (np.ndarray): 2D n x d array, the unprotected features. y (np.ndarray): 1D, n length vector, the labels. x_sensitive (np.ndarray): 1D, (n, ), the protected attribute. s_val_to_cons_sum: The ramp approximation, only needed for _constraint_type 1 and 3. Returns: w (np.ndarray): 1D Array, the learned weights. """ w = cvxpy.Variable(X.shape[1]) # this is the weight vector w.value = np.random.rand(X.shape[1]) obj = 0 # regularizer -- first term in w is intercept -> no regularization obj += cvxpy.sum_squares(w[1:]) * self._lam obj += self._loss_function(w, X, y) constraints = self._get_constraint_list( w, X, y, x_sensitive, s_val_to_cons_sum ) prob = cvxpy.Problem(cvxpy.Minimize(obj), constraints) prob.solve( method="dccp", tau=self._tau, mu=self._mu, tau_max=1e10, verbose=False, feastol=self._EPS, abstol=self._EPS, reltol=self._EPS, feastol_inacc=self._EPS, abstol_inacc=self._EPS, reltol_inacc=self._EPS, max_iters=self._max_iter, max_iter=self._max_iter_dccp, ) # check that the fairness constraint is satisfied for f_c in constraints: if not f_c.value(): warnings.warn( "Solver hasn't satisfied constraint." " Make sure that constraints are satisfied empirically." " Alternatively, consider increasing tau parameter" ) return np.array(w.value).flatten() # flatten converts it to a 1d array def _train_multi_model_pref_fairness( self, X, y, x_sensitive, s_val_to_cons_sum ): """Train a separate classifier for each group k in protected attribute. Args: X (np.ndarray): 2D n x d array, the unprotected features. y (np.ndarray): 1D, n length vector, the labels. x_sensitive (np.ndarray): 1D, (n, ), the protected attribute. s_val_to_cons_sum: The ramp approximation, only needed for _constraint_type 1 and 3. Returns: w (dict): dict of 1D weight arrays learned for each group k in protected attribute. """ w = {} for k in set(x_sensitive): w[k] = cvxpy.Variable(X.shape[1]) # this is the weight vector w[k].value = np.random.rand( X.shape[1] ) # initialize the value of w num_all = X.shape[0] # set of all data points obj = 0 for k in set(x_sensitive): idx = x_sensitive == k X_k = X[idx] y_k = y[idx] # first term in w is the intercept, so no need to regularize that obj += cvxpy.sum_squares(w[k][1:]) * self._lam[k] obj += self._loss_function(w[k], X_k, y_k, num_all) # notice that we are dividing by the length of the whole dataset, # and not just of this group. # the group that has more people contributes more to the loss constraints = self._get_constraint_list( w, X, y, x_sensitive, s_val_to_cons_sum ) prob = cvxpy.Problem(cvxpy.Minimize(obj), constraints) prob.solve( method="dccp", tau=self._tau, mu=self._mu, tau_max=1e10, verbose=False, feastol=self._EPS, abstol=self._EPS, reltol=self._EPS, feastol_inacc=self._EPS, abstol_inacc=self._EPS, reltol_inacc=self._EPS, max_iters=self._max_iter, max_iter=self._max_iter_dccp, ) # check that the fairness constraint is satisfied for f_c in constraints: if not f_c.value(): warnings.warn( "Solver hasn't satisfied constraint." " Make sure that constraints are satisfied empirically." " Alternatively, consider increasing tau parameter" ) w_return = {} for k in set(x_sensitive): w_return[k] = np.array(w[k].value).flatten() return w_return def _get_constraint_list(self, w, X, y, x_sensitive, s_val_to_cons_sum): """Return list of constraints. Args: w: the learned weights for each group k in protected attribute. X, y, x_sensitive, s_val_to_cons_sum: see _train_multi_model_pref_fairness Returns: """ if self._constraint_type == 0: # disp imp with single boundary cov_thresh = np.abs(0.0) # perfect fairness return self._get_parity_cons(X, y, x_sensitive, w, cov_thresh) # preferred imp, pref imp + pref treat elif self._constraint_type in [1, 3]: return self._get_preferred_cons( X, x_sensitive, w, s_val_to_cons_sum ) elif self._constraint_type == 2: return self._get_preferred_cons(X, x_sensitive, w) return [] def _decision_function(self, X, k=None): """Makes predictions for all samples in X. Args: X (np.ndarary): [n_samples, n_features], the input samples. k: the group whose decision boundary should be used. None means that we trained one clf for the whole dataset. Returns: (np.ndarray), 1D, the predictions """ if k is None: ret = np.dot(X, self._params["w"]) else: ret = np.dot(X, self._params["w"][k]) return ret def _get_distance_boundary(self, X, x_sensitive): """Returns the distance boundary for each sample. Args: X: (np.ndarary): [n_samples, n_features], the input samples. x_sensitive (np.ndarary): 1D, the protected feature Returns: distance_boundary_arr (np.ndarary): distance to boundary, each groups owns w is applied on it distance_boundary_dict (dict): dict of the form s_attr_group (points from group 0/1) -> w_group (boundary of group 0/1) -> distances for this group with this boundary """ # s_attr_group (0/1) -> w_group (0/1) -> distances distances_boundary_dict = {} # we have one model for the whole data if not isinstance(self._params["w"], dict): distance_boundary_arr = self._decision_function(X) # there is only one boundary, so the results with this_group and # other_group boundaries are the same for attr in set(x_sensitive): distances_boundary_dict[attr] = {} idx = x_sensitive == attr for k in set(x_sensitive): # apply same decision function for all the sensitive attrs # because same w is trained for everyone distances_boundary_dict[attr][k] = self._decision_function( X[idx] ) else: # w is a dict distance_boundary_arr = np.zeros(X.shape[0]) for attr in set(x_sensitive): distances_boundary_dict[attr] = {} idx = x_sensitive == attr X_g = X[idx] # each group gets decision with their own boundary distance_boundary_arr[idx] = self._decision_function(X_g, attr) for k in self._params["w"].keys(): # each group gets a decision with both boundaries distances_boundary_dict[attr][k] = self._decision_function( X_g, k ) return distance_boundary_arr, distances_boundary_dict @staticmethod def _get_sensitive_attr_cov(dist_dict): """Compute ramp function for each group to estimate acceptance rate. Args: dist_dict (dict): distance to decision boundary as returned by _get_distance_boundary(). Returns: s_val_to_cons_sum (dict): ... """ # {0: {}, 1: {}} # s_attr_group (0/1) -> w_group (0/1) -> ramp approx s_val_to_cons_sum = {} for s_val in dist_dict.keys(): s_val_to_cons_sum[s_val] = {} for w_group in dist_dict[s_val].keys(): fx = dist_dict[s_val][w_group] s_val_to_cons_sum[s_val][w_group] = ( np.sum(np.maximum(0, fx)) / fx.shape[0] ) return s_val_to_cons_sum def _get_preferred_cons(self, X, x_sensitive, w, s_val_to_cons_sum=None): """Get constraint list. No need to pass s_val_to_cons_sum for preferred treatment (envy free) constraints. For details on cons_type, see the documentation of fit() function. Args: X, x_sensitive, w, s_val_to_cons_sum: see _get_constraint_list() Returns: constraints (list(str)): constraints in cvxpy format. https://www.cvxpy.org/api_reference/cvxpy.constraints.html# """ constraints = [] # 1 - pref imp, 2 - EF, 3 - pref imp & EF if self._constraint_type in [1, 2, 3]: prod_dict = {} # s_attr_group (0/1) -> w_group (0/1) -> val for val in set(x_sensitive): prod_dict[val] = {} idx = x_sensitive == val X_g = X[idx] num_g = X_g.shape[0] for k in w.keys(): # get the distance with each group's w prod_dict[val][k] = ( cvxpy.sum(cvxpy.multiply(0, X_g * w[k])) / num_g ) else: raise Exception("Invalid constraint type") # 1 for preferred impact -- 3 for preferred impact and envy free if self._constraint_type == 1 or self._constraint_type == 3: constraints.append(prod_dict[0][0] >= s_val_to_cons_sum[0][0]) constraints.append(prod_dict[1][1] >= s_val_to_cons_sum[1][1]) # envy free if self._constraint_type == 2 or self._constraint_type == 3: constraints.append(prod_dict[0][0] >= prod_dict[0][1]) constraints.append(prod_dict[1][1] >= prod_dict[1][0]) return constraints @staticmethod def _get_parity_cons(X, _y, x_sensitive, w, cov_thresh): """Get parity impact constraint list. Args: X, x_sensitive, w, cov_thresh: see _get_constraint_list() method. Returns: constraints (list(str)): constraints in cvxpy format. https://www.cvxpy.org/api_reference/cvxpy.constraints.html# """ # covariance thresh has to be a small positive number assert cov_thresh >= 0 constraints = [] z_i_z_bar = x_sensitive - np.mean(x_sensitive) fx = X * w prod = cvxpy.sum(cvxpy.multiply(z_i_z_bar, fx)) / X.shape[0] constraints.append(prod <= cov_thresh) constraints.append(prod >= -cov_thresh) return constraints