from collections import defaultdict
from contextlib import contextmanager
from copy import deepcopy
from logging import warning
import numpy as np
import pandas as pd
from aif360.datasets import Dataset
[docs]class StructuredDataset(Dataset):
"""Base class for all structured datasets.
A StructuredDataset requires data to be stored in :obj:`numpy.ndarray`
objects with :obj:`~numpy.dtype` as :obj:`~numpy.float64`.
Attributes:
features (numpy.ndarray): Dataset features for each instance.
labels (numpy.ndarray): Generic label corresponding to each instance
(could be ground-truth, predicted, cluster assignments, etc.).
scores (numpy.ndarray): Probability score associated with each label.
Same shape as `labels`. Only valid for binary labels (this includes
one-hot categorical labels as well).
protected_attributes (numpy.ndarray): A subset of `features` for which
fairness is desired.
feature_names (list(str)): Names describing each dataset feature.
label_names (list(str)): Names describing each label.
protected_attribute_names (list(str)): A subset of `feature_names`
corresponding to `protected_attributes`.
privileged_protected_attributes (list(numpy.ndarray)): A subset of
protected attribute values which are considered privileged from a
fairness perspective.
unprivileged_protected_attributes (list(numpy.ndarray)): The remaining
possible protected attribute values which are not included in
`privileged_protected_attributes`.
instance_names (list(str)): Indentifiers for each instance. Sequential
integers by default.
instance_weights (numpy.ndarray): Weighting for each instance. All
equal (ones) by default. Pursuant to standard practice in social
science data, 1 means one person or entity. These weights are hence
person or entity multipliers (see:
https://www.ibm.com/support/knowledgecenter/en/SS3RA7_15.0.0/com.ibm.spss.modeler.help/netezza_decisiontrees_weights.htm)
These weights *may not* be normalized to sum to 1 across the entire
dataset, rather the nominal (default) weight of each entity/record
in the data is 1. This is similar in spirit to the person weight in
census microdata samples.
https://www.census.gov/programs-surveys/acs/technical-documentation/pums/about.html
ignore_fields (set(str)): Attribute names to ignore when doing equality
comparisons. Always at least contains `'metadata'`.
metadata (dict): Details about the creation of this dataset. For
example::
{
'transformer': 'Dataset.__init__',
'params': kwargs,
'previous': None
}
"""
def __init__(self, df, label_names, protected_attribute_names,
instance_weights_name=None, scores_names=[],
unprivileged_protected_attributes=[],
privileged_protected_attributes=[], metadata=None):
"""
Args:
df (pandas.DataFrame): Input DataFrame with features, labels, and
protected attributes. Values should be preprocessed
to remove NAs and make all data numerical. Index values are
taken as instance names.
label_names (iterable): Names of the label columns in `df`.
protected_attribute_names (iterable): List of names corresponding to
protected attribute columns in `df`.
instance_weights_name (optional): Column name in `df` corresponding
to instance weights. If not provided, `instance_weights` will be
all set to 1.
unprivileged_protected_attributes (optional): If not provided, all
but the highest numerical value of each protected attribute will
be considered not privileged.
privileged_protected_attributes (optional): If not provided, the
highest numerical value of each protected attribute will be
considered privileged.
metadata (optional): Additional metadata to append.
Raises:
TypeError: Certain fields must be np.ndarrays as specified in the
class description.
ValueError: ndarray shapes must match.
"""
if df is None:
raise TypeError("Must provide a pandas DataFrame representing "
"the data (features, labels, protected attributes)")
if df.isna().any().any():
raise ValueError("Input DataFrames cannot contain NA values.")
try:
df = df.astype(np.float64)
except ValueError as e:
print("ValueError: {}".format(e))
raise ValueError("DataFrame values must be numerical.")
# Convert all column names to strings
df.columns = df.columns.astype(str).tolist()
label_names = list(map(str, label_names))
protected_attribute_names = list(map(str, protected_attribute_names))
self.feature_names = [n for n in df.columns if n not in label_names
and (not scores_names or n not in scores_names)
and n != instance_weights_name]
self.label_names = label_names
self.features = df[self.feature_names].values.copy()
self.labels = df[self.label_names].values.copy()
self.instance_names = df.index.astype(str).tolist()
if scores_names:
self.scores = df[scores_names].values.copy()
else:
self.scores = self.labels.copy()
df_prot = df.loc[:, protected_attribute_names]
self.protected_attribute_names = df_prot.columns.astype(str).tolist()
self.protected_attributes = df_prot.values.copy()
# Infer the privileged and unprivileged values in not provided
if unprivileged_protected_attributes and privileged_protected_attributes:
self.unprivileged_protected_attributes = unprivileged_protected_attributes
self.privileged_protected_attributes = privileged_protected_attributes
else:
self.unprivileged_protected_attributes = [
np.sort(np.unique(df_prot[attr].values))[:-1]
for attr in self.protected_attribute_names]
self.privileged_protected_attributes = [
np.sort(np.unique(df_prot[attr].values))[-1:]
for attr in self.protected_attribute_names]
if instance_weights_name:
self.instance_weights = df[instance_weights_name].values.copy()
else:
self.instance_weights = np.ones_like(self.instance_names,
dtype=np.float64)
# always ignore metadata and ignore_fields
self.ignore_fields = {'metadata', 'ignore_fields'}
# sets metadata
super(StructuredDataset, self).__init__(df=df, label_names=label_names,
protected_attribute_names=protected_attribute_names,
instance_weights_name=instance_weights_name,
unprivileged_protected_attributes=unprivileged_protected_attributes,
privileged_protected_attributes=privileged_protected_attributes,
metadata=metadata)
def __eq__(self, other):
"""Equality comparison for StructuredDatasets.
Note: Compares all fields other than those specified in `ignore_fields`.
"""
if not isinstance(other, StructuredDataset):
return False
def _eq(x, y):
if isinstance(x, np.ndarray) and isinstance(y, np.ndarray):
return np.all(x == y)
elif isinstance(x, list) and isinstance(y, list):
return len(x) == len(y) and all(_eq(xi, yi) for xi, yi in zip(x, y))
return x == y
return all(_eq(self.__dict__[k], other.__dict__[k])
for k in self.__dict__.keys() if k not in self.ignore_fields)
def __ne__(self, other):
return not self == other
def __repr__(self):
# return repr(self.metadata)
return str(self)
def __str__(self):
df, _ = self.convert_to_dataframe()
df.insert(0, 'instance_weights', self.instance_weights)
highest_level = ['instance weights'] + \
['features']*len(self.feature_names) + \
['labels']*len(self.label_names)
middle_level = [''] + \
['protected attribute'
if f in self.protected_attribute_names else ''
for f in self.feature_names] + \
['']*len(self.label_names)
lowest_level = [''] + self.feature_names + ['']*len(self.label_names)
df.columns = pd.MultiIndex.from_arrays(
[highest_level, middle_level, lowest_level])
df.index.name = 'instance names'
return str(df)
# TODO: *_names checks
[docs] def validate_dataset(self):
"""Error checking and type validation.
Raises:
TypeError: Certain fields must be np.ndarrays as specified in the
class description.
ValueError: ndarray shapes must match.
"""
super(StructuredDataset, self).validate_dataset()
# =========================== TYPE CHECKING ============================
for f in [self.features, self.protected_attributes, self.labels,
self.scores, self.instance_weights]:
if not isinstance(f, np.ndarray):
raise TypeError("'{}' must be an np.ndarray.".format(f.__name__))
# convert ndarrays to float64
self.features = self.features.astype(np.float64)
self.protected_attributes = self.protected_attributes.astype(np.float64)
self.labels = self.labels.astype(np.float64)
self.instance_weights = self.instance_weights.astype(np.float64)
# =========================== SHAPE CHECKING ===========================
if len(self.labels.shape) == 1:
self.labels = self.labels.reshape((-1, 1))
try:
self.scores.reshape(self.labels.shape)
except ValueError as e:
print("ValueError: {}".format(e))
raise ValueError("'scores' should have the same shape as 'labels'.")
if not self.labels.shape[0] == self.features.shape[0]:
raise ValueError("Number of labels must match number of instances:"
"\n\tlabels.shape = {}\n\tfeatures.shape = {}".format(
self.labels.shape, self.features.shape))
if not self.instance_weights.shape[0] == self.features.shape[0]:
raise ValueError("Number of weights must match number of instances:"
"\n\tinstance_weights.shape = {}\n\tfeatures.shape = {}".format(
self.instance_weights.shape, self.features.shape))
# =========================== VALUE CHECKING ===========================
if np.any(np.logical_or(self.scores < 0., self.scores > 1.)):
warning("'scores' has no well-defined meaning out of range [0, 1].")
for i in range(len(self.privileged_protected_attributes)):
priv = set(self.privileged_protected_attributes[i])
unpriv = set(self.unprivileged_protected_attributes[i])
# check for duplicates
if priv & unpriv:
raise ValueError("'privileged_protected_attributes' and "
"'unprivileged_protected_attributes' should not share any "
"common elements:\n\tBoth contain {} for feature {}".format(
list(priv & unpriv), self.protected_attribute_names[i]))
# check for unclassified values
if not set(self.protected_attributes[:, i]) <= (priv | unpriv):
raise ValueError("All observed values for protected attributes "
"should be designated as either privileged or unprivileged:"
"\n\t{} not designated for feature {}".format(
list(set(self.protected_attributes[:, i])
- (priv | unpriv)),
self.protected_attribute_names[i]))
# warn for unobserved values
if not (priv | unpriv) <= set(self.protected_attributes[:, i]):
warning("{} listed but not observed for feature {}".format(
list((priv | unpriv) - set(self.protected_attributes[:, i])),
self.protected_attribute_names[i]))
[docs] @contextmanager
def temporarily_ignore(self, *fields):
"""Temporarily add the fields provided to `ignore_fields`.
To be used in a `with` statement. Upon completing the `with` block,
`ignore_fields` is restored to its original value.
Args:
*fields: Additional fields to ignore for equality comparison within
the scope of this context manager, e.g.
`temporarily_ignore('features', 'labels')`. The temporary
`ignore_fields` attribute is the union of the old attribute and
the set of these fields.
Examples:
>>> sd = StructuredDataset(...)
>>> modified = sd.copy()
>>> modified.labels = sd.labels + 1
>>> assert sd != modified
>>> with sd.temporarily_ignore('labels'):
>>> assert sd == modified
>>> assert 'labels' not in sd.ignore_fields
"""
old_ignore = deepcopy(self.ignore_fields)
self.ignore_fields |= set(fields)
try:
yield
finally:
self.ignore_fields = old_ignore
[docs] def align_datasets(self, other):
"""Align the other dataset features, labels and protected_attributes to
this dataset.
Args:
other (StructuredDataset): Other dataset that needs to be aligned
Returns:
StructuredDataset: New aligned dataset
"""
if (set(self.feature_names) != set(other.feature_names) or
set(self.label_names) != set(other.label_names) or
set(self.protected_attribute_names)
!= set(other.protected_attribute_names)):
raise ValueError(
"feature_names, label_names, and protected_attribute_names "
"should match between this and other dataset.")
# New dataset
new = other.copy()
# re-order the columns of the new dataset
feat_inds = [new.feature_names.index(f) for f in self.feature_names]
label_inds = [new.label_names.index(f) for f in self.label_names]
prot_inds = [new.protected_attribute_names.index(f)
for f in self.protected_attribute_names]
new.features = new.features[:, feat_inds]
new.labels = new.labels[:, label_inds]
new.scores = new.scores[:, label_inds]
new.protected_attributes = new.protected_attributes[:, prot_inds]
new.privileged_protected_attributes = [
new.privileged_protected_attributes[i] for i in prot_inds]
new.unprivileged_protected_attributes = [
new.unprivileged_protected_attributes[i] for i in prot_inds]
new.feature_names = deepcopy(self.feature_names)
new.label_names = deepcopy(self.label_names)
new.protected_attribute_names = deepcopy(self.protected_attribute_names)
return new
# TODO: Should we store the protected attributes as a separate dataframe
[docs] def convert_to_dataframe(self, de_dummy_code=False, sep='=',
set_category=True):
"""Convert the StructuredDataset to a :obj:`pandas.DataFrame`.
Args:
de_dummy_code (bool): Performs de_dummy_coding, converting dummy-
coded columns to categories. If `de_dummy_code` is `True` and
this dataset contains mappings for label and/or protected
attribute values to strings in the `metadata`, this method will
convert those as well.
set_category (bool): Set the de-dummy coded features to categorical
type.
Returns:
(pandas.DataFrame, dict):
* `pandas.DataFrame`: Equivalent dataframe for a dataset. All
columns will have only numeric values. The
`protected_attributes` field in the dataset will override the
values in the `features` field.
* `dict`: Attributes. Will contain additional information pulled
from the dataset such as `feature_names`, `label_names`,
`protected_attribute_names`, `instance_names`,
`instance_weights`, `privileged_protected_attributes`,
`unprivileged_protected_attributes`. The metadata will not be
returned.
"""
df = pd.DataFrame(np.hstack((self.features, self.labels)),
columns=self.feature_names+self.label_names,
index=self.instance_names)
df.loc[:, self.protected_attribute_names] = self.protected_attributes
# De-dummy code if necessary
if de_dummy_code:
df = self._de_dummy_code_df(df, sep=sep, set_category=set_category)
if 'label_maps' in self.metadata:
for i, label in enumerate(self.label_names):
df[label] = df[label].replace(self.metadata['label_maps'][i])
if 'protected_attribute_maps' in self.metadata:
for i, prot_attr in enumerate(self.protected_attribute_names):
df[prot_attr] = df[prot_attr].replace(
self.metadata['protected_attribute_maps'][i])
# Attributes
attributes = {
"feature_names": self.feature_names,
"label_names": self.label_names,
"protected_attribute_names": self.protected_attribute_names,
"instance_names": self.instance_names,
"instance_weights": self.instance_weights,
"privileged_protected_attributes": self.privileged_protected_attributes,
"unprivileged_protected_attributes": self.unprivileged_protected_attributes
}
return df, attributes
[docs] def export_dataset(self, export_metadata=False):
"""
Export the dataset and supporting attributes
TODO: The preferred file format is HDF
"""
if export_metadata:
raise NotImplementedError("The option to export metadata has not been implemented yet")
return None
[docs] def import_dataset(self, import_metadata=False):
""" Import the dataset and supporting attributes
TODO: The preferred file format is HDF
"""
if import_metadata:
raise NotImplementedError("The option to import metadata has not been implemented yet")
return None
[docs] def split(self, num_or_size_splits, shuffle=False, seed=None):
"""Split the dataset into multiple datasets
Args:
num_or_size_splits (list or int):
shuffle (bool):
seed (int or array_like): takes the same argument as `numpy.random.seed()`
function
Returns:
list: Each element of this list is a dataset obtained during the split
"""
# Set seed
if seed is not None:
np.random.seed(seed)
n = self.features.shape[0]
if isinstance(num_or_size_splits, list):
num_folds = len(num_or_size_splits) + 1
if num_folds > 1 and all(x <= 1. for x in num_or_size_splits):
num_or_size_splits = [int(x * n) for x in num_or_size_splits]
else:
num_folds = num_or_size_splits
order = list(np.random.permutation(n) if shuffle else range(n))
folds = [self.copy() for _ in range(num_folds)]
features = np.array_split(self.features[order], num_or_size_splits)
labels = np.array_split(self.labels[order], num_or_size_splits)
scores = np.array_split(self.scores[order], num_or_size_splits)
protected_attributes = np.array_split(self.protected_attributes[order],
num_or_size_splits)
instance_weights = np.array_split(self.instance_weights[order],
num_or_size_splits)
instance_names = np.array_split(np.array(self.instance_names)[order],
num_or_size_splits)
for fold, feats, labs, scors, prot_attrs, inst_wgts, inst_name in zip(
folds, features, labels, scores, protected_attributes, instance_weights,
instance_names):
fold.features = feats
fold.labels = labs
fold.scores = scors
fold.protected_attributes = prot_attrs
fold.instance_weights = inst_wgts
fold.instance_names = list(map(str, inst_name))
fold.metadata = fold.metadata.copy()
fold.metadata.update({
'transformer': '{}.split'.format(type(self).__name__),
'params': {'num_or_size_splits': num_or_size_splits,
'shuffle': shuffle},
'previous': [self]
})
return folds
@staticmethod
def _de_dummy_code_df(df, sep="=", set_category=False):
"""De-dummy code a dummy-coded dataframe obtained with pd.get_dummies().
After reversing dummy coding the corresponding fields will be converted
to categorical.
Args:
df (pandas.DataFrame): Input dummy coded dataframe
sep (char): Separator between base name and dummy code
set_category (bool): Set the de-dummy coded features
to categorical type
Examples:
>>> columns = ["Age", "Gender=Male", "Gender=Female"]
>>> df = pd.DataFrame([[10, 1, 0], [20, 0, 1]], columns=columns)
>>> _de_dummy_code_df(df, sep="=")
Age Gender
0 10 Male
1 20 Female
"""
feature_names_dum_d, feature_names_nodum = \
StructuredDataset._parse_feature_names(df.columns)
df_new = pd.DataFrame(index=df.index,
columns=feature_names_nodum + list(feature_names_dum_d.keys()))
for fname in feature_names_nodum:
df_new[fname] = df[fname].values.copy()
for fname, vl in feature_names_dum_d.items():
for v in vl:
df_new.loc[df[fname+sep+str(v)] == 1, fname] = str(v)
if set_category:
for fname in feature_names_dum_d.keys():
df_new[fname] = df_new[fname].astype('category')
return df_new
@staticmethod
def _parse_feature_names(feature_names, sep="="):
"""Parse feature names to ordinary and dummy coded candidates.
Args:
feature_names (list): Names of features
sep (char): Separator to designate the dummy coded category in the
feature name
Returns:
(dict, list):
* feature_names_dum_d (dict): Keys are the base feature names
and values are the categories.
* feature_names_nodum (list): Non-dummy coded feature names.
Examples:
>>> feature_names = ["Age", "Gender=Male", "Gender=Female"]
>>> StructuredDataset._parse_feature_names(feature_names, sep="=")
(defaultdict(<type 'list'>, {'Gender': ['Male', 'Female']}), ['Age'])
"""
feature_names_dum_d = defaultdict(list)
feature_names_nodum = list()
for fname in feature_names:
if sep in fname:
fname_dum, v = fname.split(sep, 1)
feature_names_dum_d[fname_dum].append(v)
else:
feature_names_nodum.append(fname)
return feature_names_dum_d, feature_names_nodum