"""Wrapper and functions for DisparateImpact remover from fair-classification.
The base class _DisparateImpact implements predict function for both methods.
The classes AccurateDisparateImpact and FairDisparateImpact inherit from
_DisparateImpact and implement fit() functions with different input signatures
and algorithms for minimization.
Original code:
https://github.com/mbilalzafar/fair-classification
"""
import warnings
from copy import deepcopy
import numpy as np
from aif360.algorithms import Transformer
from aif360.datasets.binary_label_dataset import BinaryLabelDataset
from scipy.optimize import minimize
from .fairness_warnings import FairnessBoundsWarning, DataSetSkewedWarning
from .utils import (
add_intercept,
get_protected_attributes_dict,
get_one_hot_encoding,
LossFunctions,
)
from ..fairensics_utils import get_unprotected_attributes
class _DisparateImpact(Transformer):
"""Base class for the two methods removing disparate impact.
Example:
https://github.com/nikikilbertus/fairensics/blob/master/examples/2_1_fair-classification-disparate-impact-example.ipynb
"""
def __init__(self, loss_function, warn):
"""
Args:
loss_function (str): loss function string from utils.LossFunctions.
warn (bool): if true, warnings are raised on certain bounds.
"""
super(_DisparateImpact, self).__init__()
self._warn = warn
self._params = {}
self._initialized = False
self._loss_function = LossFunctions.get_loss_function(loss_function)
def predict(self, dataset: BinaryLabelDataset):
"""Make predictions.
Args:
dataset: either AIF360 data set or np.ndarray.
For AIF360 data sets, protected features will be ignored.
For np.ndarray, only unprotected features should be included.
Returns:
Either AIF360 data set or np.ndarray if dataset is np.ndarray.
"""
if not self._initialized:
raise ValueError("Model not initialized. Run `fit` first.")
# TODO: ok?
if isinstance(dataset, np.ndarray):
return np.sign(np.dot(add_intercept(dataset), self._params["w"]))
dataset_new = dataset.copy(deepcopy=True)
dataset_new.labels = np.sign(
np.dot(
add_intercept(get_unprotected_attributes(dataset)),
self._params["w"],
)
)
# Map the dataset labels to back to their original values.
temp_labels = dataset.labels.copy()
temp_labels[(dataset_new.labels == 1.0)] = dataset.favorable_label
temp_labels[(dataset_new.labels == -1.0)] = dataset.unfavorable_label
dataset_new.labels = temp_labels.copy()
if self._warn:
bound_warnings = FairnessBoundsWarning(dataset, dataset_new)
bound_warnings.check_bounds()
return dataset_new
@staticmethod
def _get_cov_thresh_dict(cov_thresh, protected_attribute_names):
"""Return dict with covariance threshold for each protected attribute.
Each attribute gets the same threshold (cov_thresh).
Args:
cov_thresh (float): the covariance threshold.
protected_attribute_names (list(str)): list of protected attribute
names.
Returns:
sensitive_attrs_to_cov_thresh (dict):
dict of form {"sensitive_attribute_name_1":cov_thresh, ...}.
"""
sensitive_attrs_to_cov_thresh = {}
for sens_attr_name in protected_attribute_names:
sensitive_attrs_to_cov_thresh[sens_attr_name] = cov_thresh
return sensitive_attrs_to_cov_thresh
[docs]class AccurateDisparateImpact(_DisparateImpact):
"""Minimize loss subject to fairness constraints.
Loss "L" defines whether a logistic regression or a liner SVM is trained.
Minimize
L(w)
Subject to
cov(sensitive_attributes, true_labels, predictions) <
sensitive_attrs_to_cov_thresh
Where:
predictions: the distance to the decision boundary
"""
[docs] def __init__(self, loss_function=LossFunctions.NAME_LOG_REG, warn=True):
super(AccurateDisparateImpact, self).__init__(
loss_function=loss_function, warn=warn
)
[docs] def fit(
self,
dataset: BinaryLabelDataset,
sensitive_attrs_to_cov_thresh=0,
sensitive_attributes=None,
):
"""Fit the model.
Args:
dataset: AIF360 data set
sensitive_attrs_to_cov_thresh (float or dict): dictionary as
returned by _get_cov_thresh_dict(). If a single float is passed
the dict is generated using the _get_cov_thresh_dict() method.
sensitive_attributes (list(str)): names of protected attributes to
apply constraints to.
"""
if self._warn:
dataset_warning = DataSetSkewedWarning(dataset)
dataset_warning.check_dataset()
# constraints are only applied to the selected sensitive attributes
# if no list is provided, constraints are applied to all protected
if sensitive_attributes is None:
sensitive_attributes = dataset.protected_attribute_names
# if sensitive_attrs_to_cov_thresh is not a dict, each sensitive
# attribute gets the same threshold
if not isinstance(sensitive_attrs_to_cov_thresh, dict):
sensitive_attrs_to_cov_thresh = self._get_cov_thresh_dict(
sensitive_attrs_to_cov_thresh,
dataset.protected_attribute_names,
)
# fair-classification takes the protected attributes as dict
protected_attributes_dict = get_protected_attributes_dict(
dataset.protected_attribute_names, dataset.protected_attributes
)
# map labels to -1 and 1
temp_labels = dataset.labels.copy()
temp_labels[(dataset.labels == dataset.favorable_label)] = 1.0
temp_labels[(dataset.labels == dataset.unfavorable_label)] = -1.0
self._params["w"] = self._train_model_sub_to_fairness(
add_intercept(get_unprotected_attributes(dataset)),
temp_labels.ravel(),
protected_attributes_dict,
sensitive_attributes,
sensitive_attrs_to_cov_thresh,
)
self._initialized = True
return self
def _train_model_sub_to_fairness(
self,
x,
y,
x_control,
sensitive_attrs,
sensitive_attrs_to_cov_thresh,
max_iter=10000,
):
""" Optimize the loss function under fairness constraints.
Args:
x (np.ndarray): 2D array of unprotected features and intercept.
y (np.ndarray): 1D array of labels.
x_control (dict): dict of protected attributes as returned by
get_protected_attributes_dict().
max_iter (int): maximum iterations for solver.
sensitive_attrs, sensitive_attrs_to_cov_thresh: see fit() method.
Returns:
w (np.ndarray): 1D array of the learned weights.
TODO: sensitive_attrs is redundant. sensitive_attrs_to_cov_thresh
should only contain features for which constraints are applied.
"""
constraints = self._get_fairness_constraint_list(
x, y, x_control, sensitive_attrs, sensitive_attrs_to_cov_thresh
)
x0 = np.random.rand(x.shape[1])
w = minimize(
fun=self._loss_function,
x0=x0,
args=(x, y),
method="SLSQP",
options={"maxiter": max_iter},
constraints=constraints,
)
if not w.success:
warnings.warn(
"Optimization problem did not converge. "
"Check the solution returned by the optimizer:"
)
print(w)
return w.x
def _get_fairness_constraint_list(
self, x, y, x_control, sensitive_attrs, sensitive_attrs_to_cov_thresh
):
"""Get list of constraints for fairness. See fit method for details.
Returns:
constraints (list(str)): fairness constraints in cvxpy format.
https://www.cvxpy.org/api_reference/cvxpy.constraints.html#
"""
constraints = []
for attr in sensitive_attrs:
attr_arr = x_control[attr]
attr_arr_transformed, index_dict = get_one_hot_encoding(attr_arr)
if index_dict is None: # binary attribute
thresh = sensitive_attrs_to_cov_thresh[attr]
c = {
"type": "ineq",
"fun": self._test_sensitive_attr_constraint_cov,
"args": (x, y, attr_arr_transformed, thresh, False),
}
constraints.append(c)
else: # categorical attribute, need to set the cov threshs
for attr_val, ind in index_dict.items():
attr_name = attr_val
thresh = sensitive_attrs_to_cov_thresh[attr][attr_name]
t = attr_arr_transformed[:, ind]
c = {
"type": "ineq",
"fun": self._test_sensitive_attr_constraint_cov,
"args": (x, y, t, thresh, False),
}
constraints.append(c)
return constraints
@staticmethod
def _test_sensitive_attr_constraint_cov(
model, x_arr, y_arr_dist_boundary, x_control, thresh, verbose
):
"""
The covariance is computed b/w the sensitive attr val and the distance
from the boundary. If the model is None, we assume that the
y_arr_dist_boundary contains the distance from the decision boundary.
If the model is not None, we just compute a dot product or model and
x_arr for the case of SVM, we pass the distance from boundary because
the intercept in internalized for the class and we have compute the
distance using the project function.
This function will return -1 if the constraint specified by thresh
parameter is not satisfied otherwise it will return +1 if the return
value is >=0, then the constraint is satisfied.
"""
assert x_arr.shape[0] == x_control.shape[0]
if len(x_control.shape) > 1: # make sure we just have one column
assert x_control.shape[1] == 1
if model is None:
arr = y_arr_dist_boundary # simply the output labels
else:
arr = np.dot(
model, x_arr.T
) # the sign of this is the output label
arr = np.array(arr, dtype=np.float64)
cov = np.dot(x_control - np.mean(x_control), arr) / len(x_control)
# <0 if covariance > thresh -- condition is not satisfied
ans = thresh - abs(cov)
if verbose is True:
print("Covariance is", cov)
print("Diff is:", ans)
print()
return ans
[docs]class FairDisparateImpact(_DisparateImpact):
"""Minimize disparate impact subject to accuracy constraints.
Loss "L" defines whether a logistic regression or a liner svm is trained.
Minimize
cov(sensitive_attributes, predictions)
Subject to
L(w) <= (1-gamma)L(w*)
Where
L(w*): is the loss of the unconstrained classifier
predictions: the distance to the decision boundary
"""
[docs] def __init__(self, loss_function=LossFunctions.NAME_LOG_REG, warn=True):
super(FairDisparateImpact, self).__init__(
loss_function=loss_function, warn=warn
)
[docs] def fit(
self,
dataset: BinaryLabelDataset,
sensitive_attributes=None,
sep_constraint=False,
gamma=0,
):
"""Fits the model.
Args:
dataset: AIF360 data set.
sensitive_attributes (list(str)): names of protected attributes to
apply constraints to.
sep_constraint (bool): apply fine grained accuracy constraint.
gamma (float): trade off for accuracy for sep_constraint.
"""
if self._warn:
dataset_warning = DataSetSkewedWarning(dataset)
dataset_warning.check_dataset()
# constraints are only applied to the selected sensitive attributes
# if no list is provided, constraints for all protected attributes
if sensitive_attributes is None:
sensitive_attributes = dataset.protected_attribute_names
# fair-classification takes the protected attributes as dict
protected_attributes_dict = get_protected_attributes_dict(
dataset.protected_attribute_names, dataset.protected_attributes
)
# map labels to -1 and 1
temp_labels = dataset.labels.copy()
temp_labels[(dataset.labels == dataset.favorable_label)] = 1.0
temp_labels[(dataset.labels == dataset.unfavorable_label)] = -1.0
self._params["w"] = self._train_model_sub_to_acc(
add_intercept(get_unprotected_attributes(dataset)),
temp_labels.ravel(),
protected_attributes_dict,
sensitive_attributes,
sep_constraint,
gamma,
)
self._initialized = True
return self
def _train_model_sub_to_acc(
self,
x,
y,
x_control,
sensitive_attrs,
sep_constraint,
gamma=None,
max_iter=10000,
):
"""Optimize fairness subject to accuracy constraints.
WARNING: Only first protected attribute is considered as constraint.
All others are ignored.
Args:
x (np.ndarray): 2D array of unprotected features and intercept.
y (np.ndarray): 1D array of labels.
x_control (dict): dict of protected attributes as returned by
get_protected_attributes_dict().
max_iter (int): maximum number of iterations for solver
sep_constraint, sensitive_attrs, gamma: see fit() method
Returns:
w (np.ndarray): 1D, the learned weight vector for the classifier.
"""
def cross_cov_abs_optm_func(weight_vec, x_in, x_control_in_arr):
cross_cov = x_control_in_arr - np.mean(x_control_in_arr)
cross_cov *= np.dot(weight_vec, x_in.T)
return float(abs(sum(cross_cov))) / float(x_in.shape[0])
x0 = np.random.rand(x.shape[1])
# get the initial loss without constraints
w = minimize(
fun=self._loss_function,
x0=x0,
args=(x, y),
method="SLSQP",
options={"maxiter": max_iter},
)
old_w = deepcopy(w.x)
constraints = self._get_accuracy_constraint_list(
x, y, x_control, w, gamma, sep_constraint, sensitive_attrs
)
if len(x_control) > 1:
warnings.warn(
"Only the first protected attribute is considered "
"with this constraint."
)
# TODO: only the first protected attribute is passed
# optimize for fairness under the unconstrained accuracy loss
w = minimize(
fun=cross_cov_abs_optm_func,
x0=old_w,
args=(x, x_control[sensitive_attrs[0]]),
method="SLSQP",
options={"maxiter": max_iter},
constraints=constraints,
)
if not w.success:
warnings.warn(
"Optimization problem did not converge. "
"Check the solution returned by the optimizer:"
)
print(w)
return w.x
def _get_accuracy_constraint_list(
self, x, y, x_control, w, gamma, sep_constraint, sensitive_attrs
):
"""Constraint list for accuracy constraint.
Args:
x, y, x_control, gamma, sep_constraint, sensitive_attrs: see
_train_model_sub_to_acc method.
w (scipy.optimize.OptimizeResult): the learned weights of the
unconstrained classifier.
Returns:
constraints (list(str)): accuracy constraints in cvxpy format.
https://www.cvxpy.org/api_reference/cvxpy.constraints.html#
TODO: Currently, only the first protected attribute is considered.
The code should be extended to more than one sensitive_attr.
"""
def constraint_gamma_all(w_, x_, y_, initial_loss_arr):
# gamma_arr = np.ones_like(y) * gamma # set gamma for everyone
new_loss = self._loss_function(w_, x_, y_)
old_loss = sum(initial_loss_arr)
return ((1.0 + gamma) * old_loss) - new_loss
def constraint_protected_people(w_, x_, _y):
# don't confuse the protected here with the sensitive feature
# protected/non-protected values protected here means that these
# points should not be misclassified to negative class
return np.dot(w_, x_.T) # if positive, constraint is satisfied
def constraint_unprotected_people(w_, _ind, old_loss, x_, y_):
new_loss = self._loss_function(w_, np.array([x_]), np.array(y_))
return ((1.0 + gamma) * old_loss) - new_loss
predicted_labels = np.sign(np.dot(w.x, x.T))
unconstrained_loss_arr = self._loss_function(
w.x, x, y, return_arr=True
)
constraints = []
if sep_constraint: # separate gamma for different people
for i in range(0, len(predicted_labels)):
# TODO: use favorable_label instead of 1.0
# TODO: extend here to allow more than one sensitive attribute
if (
predicted_labels[i] == 1.0
and x_control[sensitive_attrs[0]][i] == 1.0
):
c = {
"type": "ineq",
"fun": constraint_protected_people,
"args": (x[i], y[i]),
}
constraints.append(c)
else:
c = {
"type": "ineq",
"fun": constraint_unprotected_people,
"args": (i, unconstrained_loss_arr[i], x[i], y[i]),
}
constraints.append(c)
else: # same gamma for everyone
c = {
"type": "ineq",
"fun": constraint_gamma_all,
"args": (x, y, unconstrained_loss_arr),
}
constraints.append(c)
return constraints