Source code for aif360.algorithms.preprocessing.lfr
import numpy as np
import scipy.optimize as optim
from aif360.algorithms import Transformer
from aif360.algorithms.preprocessing.lfr_helpers import helpers as lfr_helpers
[docs]class LFR(Transformer):
"""Learning fair representations is a pre-processing technique that finds a
latent representation which encodes the data well but obfuscates information
about protected attributes [2]_.
References:
.. [2] R. Zemel, Y. Wu, K. Swersky, T. Pitassi, and C. Dwork, "Learning
Fair Representations." International Conference on Machine Learning,
2013.
Based on code from https://github.com/zjelveh/learning-fair-representations
"""
def __init__(self,
unprivileged_groups,
privileged_groups,
k=5,
Ax=0.01,
Ay=1.0,
Az=50.0,
print_interval=250,
verbose=1,
seed=None):
"""
Args:
unprivileged_groups (tuple): Representation for unprivileged group.
privileged_groups (tuple): Representation for privileged group.
k (int, optional): Number of prototypes.
Ax (float, optional): Input recontruction quality term weight.
Az (float, optional): Fairness constraint term weight.
Ay (float, optional): Output prediction error.
print_interval (int, optional): Print optimization objective value
every print_interval iterations.
verbose (int, optional): If zero, then no output.
seed (int, optional): Seed to make `predict` repeatable.
"""
super(LFR, self).__init__(
unprivileged_groups=unprivileged_groups,
privileged_groups=privileged_groups)
self.seed = seed
self.unprivileged_groups = unprivileged_groups
self.privileged_groups = privileged_groups
if len(self.unprivileged_groups) > 1 or len(self.privileged_groups) > 1:
raise ValueError("Only one unprivileged_group or privileged_group supported.")
self.protected_attribute_name = list(self.unprivileged_groups[0].keys())[0]
self.unprivileged_group_protected_attribute_value = self.unprivileged_groups[0][self.protected_attribute_name]
self.privileged_group_protected_attribute_value = self.privileged_groups[0][self.protected_attribute_name]
self.k = k
self.Ax = Ax
self.Ay = Ay
self.Az = Az
self.print_interval = print_interval
self.verbose = verbose
self.learned_model = None
[docs] def fit(self, dataset, **kwargs):
"""Compute the transformation parameters that leads to fair representations.
Args:
dataset (BinaryLabelDataset): Dataset containing true labels.
Returns:
LFR: Returns self.
"""
if self.seed is not None:
np.random.seed(self.seed)
num_train_samples, self.features_dim = np.shape(dataset.features)
d = np.reshape(
dataset.protected_attributes[:, dataset.protected_attribute_names.index(self.protected_attribute_name)],
[-1, 1])
sensitive_idx = np.array(np.where(d == self.unprivileged_group_protected_attribute_value))[0].flatten()
nonsensitive_idx = np.array(np.where(d == self.privileged_group_protected_attribute_value))[0].flatten()
training_sensitive = dataset.features[sensitive_idx]
training_nonsensitive = dataset.features[nonsensitive_idx]
ytrain_sensitive = dataset.labels[sensitive_idx]
ytrain_nonsensitive = dataset.labels[nonsensitive_idx]
model_inits = np.random.uniform(size=self.features_dim * 2 + self.k + self.features_dim * self.k)
bnd = []
for i, _ in enumerate(model_inits):
if i < self.features_dim * 2 or i >= self.features_dim * 2 + self.k:
bnd.append((None, None))
else:
bnd.append((0, 1))
self.learned_model = optim.fmin_l_bfgs_b(lfr_helpers.LFR_optim_obj, x0=model_inits, epsilon=1e-5,
args=(training_sensitive, training_nonsensitive,
ytrain_sensitive[:, 0], ytrain_nonsensitive[:, 0], self.k, self.Ax,
self.Ay, self.Az, 0, self.print_interval),
bounds=bnd, approx_grad=True, maxfun=5000,
maxiter=5000, disp=self.verbose)[0]
return self
[docs] def transform(self, dataset, threshold=0.5, **kwargs):
"""Transform the dataset using learned model parameters.
Args:
dataset (BinaryLabelDataset): Dataset containing labels that needs to be transformed.
threshold(float, optional): threshold parameter used for binary label prediction.
Returns:
dataset (BinaryLabelDataset): Transformed Dataset.
"""
if self.seed is not None:
np.random.seed(self.seed)
num_test_samples, _ = np.shape(dataset.features)
d = np.reshape(
dataset.protected_attributes[:, dataset.protected_attribute_names.index(self.protected_attribute_name)],
[-1, 1])
sensitive_idx = np.array(np.where(d == self.unprivileged_group_protected_attribute_value))[0].flatten()
nonsensitive_idx = np.array(np.where(d == self.privileged_group_protected_attribute_value))[0].flatten()
testing_sensitive = dataset.features[sensitive_idx]
testing_nonsensitive = dataset.features[nonsensitive_idx]
ytest_sensitive = dataset.labels[sensitive_idx]
ytest_nonsensitive = dataset.labels[nonsensitive_idx]
# extract training model parameters
Ns, P = testing_sensitive.shape
N, _ = testing_nonsensitive.shape
alphaoptim0 = self.learned_model[:P]
alphaoptim1 = self.learned_model[P: 2 * P]
woptim = self.learned_model[2 * P: (2 * P) + self.k]
voptim = np.matrix(self.learned_model[(2 * P) + self.k:]).reshape((self.k, P))
# compute distances on the test dataset using train model params
dist_sensitive = lfr_helpers.distances(testing_sensitive, voptim, alphaoptim1, Ns, P, self.k)
dist_nonsensitive = lfr_helpers.distances(testing_nonsensitive, voptim, alphaoptim0, N, P, self.k)
# compute cluster probabilities for test instances
M_nk_sensitive = lfr_helpers.M_nk(dist_sensitive, Ns, self.k)
M_nk_nonsensitive = lfr_helpers.M_nk(dist_nonsensitive, N, self.k)
# learned mappings for test instances
res_sensitive = lfr_helpers.x_n_hat(testing_sensitive, M_nk_sensitive, voptim, Ns, P, self.k)
x_n_hat_sensitive = res_sensitive[0]
res_nonsensitive = lfr_helpers.x_n_hat(testing_nonsensitive, M_nk_nonsensitive, voptim, N, P, self.k)
x_n_hat_nonsensitive = res_nonsensitive[0]
# compute predictions for test instances
res_sensitive = lfr_helpers.yhat(M_nk_sensitive, ytest_sensitive, woptim, Ns, self.k)
y_hat_sensitive = res_sensitive[0]
res_nonsensitive = lfr_helpers.yhat(M_nk_nonsensitive, ytest_nonsensitive, woptim, N, self.k)
y_hat_nonsensitive = res_nonsensitive[0]
transformed_features = np.zeros(shape=np.shape(dataset.features))
transformed_labels = np.zeros(shape=np.shape(dataset.labels))
transformed_features[sensitive_idx] = x_n_hat_sensitive
transformed_features[nonsensitive_idx] = x_n_hat_nonsensitive
transformed_labels[sensitive_idx] = np.reshape(y_hat_sensitive,
[-1, 1])
transformed_labels[nonsensitive_idx] = np.reshape(y_hat_nonsensitive,
[-1, 1])
transformed_labels = (np.array(transformed_labels) > threshold).astype(np.float64)
# Mutated, fairer dataset with new labels
dataset_new = dataset.copy(deepcopy=True)
dataset_new.features = transformed_features
dataset_new.labels = transformed_labels
return dataset_new
[docs] def fit_transform(self, dataset, seed=None):
"""fit and transform methods sequentially"""
return self.fit(dataset, seed=seed).transform(dataset)