Removing leftovers from the old API.

PiperOrigin-RevId: 334792006
This commit is contained in:
Vadym Doroshenko 2020-10-01 05:13:12 -07:00 committed by A. Unique TensorFlower
parent a579cc4afc
commit 9a56402c0d
2 changed files with 0 additions and 285 deletions

View file

@ -15,214 +15,8 @@
# Lint as: python3
"""Utility functions for membership inference attacks."""
from typing import Text, Dict, Union, List, Any, Tuple
import numpy as np
import scipy.special
from sklearn import metrics
ArrayDict = Dict[Text, np.ndarray]
Dataset = Tuple[Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]]
# ------------------------------------------------------------------------------
# Utilities for managing result dictionaries
# ------------------------------------------------------------------------------
def to_numpy(in_dict: Dict[Text, Any]) -> ArrayDict:
"""Convert values of dict to numpy arrays.
Warning: This may fail if the values cannot be converted to numpy arrays.
Args:
in_dict: A dictionary mapping Text keys to values where the values must be
something that can be converted to a numpy array.
Returns:
a dictionary with the same keys as input with all values converted to numpy
arrays
"""
return {k: np.array(v) for k, v in in_dict.items()}
def ensure_1d(in_dict: Dict[Text, Union[int, float, np.ndarray]]) -> ArrayDict:
"""Ensure all values of a dictionary are at least 1D numpy arrays.
Args:
in_dict: The input dictionary mapping Text keys to numpy arrays or numbers.
Returns:
dictionary with same keys as in_dict and values converted to numpy arrays
with at least one dimension (i.e., pack scalars into arrays)
"""
return {k: np.atleast_1d(v) for k, v in in_dict.items()}
def prepend_to_keys(in_dict: Dict[Text, Any], prefix: Text) -> Dict[Text, Any]:
"""Prepend a prefix to all keys of a dictionary.
Args:
in_dict: The input dictionary mapping Text keys to numpy arrays.
prefix: Text which to prepend to each key in in_dict
Returns:
dictionary with same values as in_dict and all keys having prefix prepended
to them
"""
return {prefix + k: v for k, v in in_dict.items()}
# ------------------------------------------------------------------------------
# Subsampling and data selection functionality
# ------------------------------------------------------------------------------
def select_indices(in_dict: ArrayDict, indices: np.ndarray) -> ArrayDict:
"""Subsample all values in the dictionary by the provided indices.
Args:
in_dict: The input dictionary mapping Text keys to numpy array values.
indices: A numpy which can be used to index other arrays, specifying the
indices to subsample from in_dict values.
Returns:
dictionary with same keys as in_dict and subsampled values
"""
return {k: v[indices] for k, v in in_dict.items()}
def merge_dictionaries(res: List[ArrayDict]) -> ArrayDict:
"""Convert iterable of dicts to dict of iterables."""
output = {k: np.empty(0) for k in res[0]}
for k in output:
output[k] = np.concatenate([r[k] for r in res if k in r], axis=0)
return output
def get_features(features: ArrayDict,
feature_name: Text,
top_k: int,
add_loss: bool = False) -> np.ndarray:
"""Combine the specified features into one array.
Args:
features: A dictionary containing all possible features.
feature_name: Which feature to use (logits or prob).
top_k: The number of the top features (of feature_name) to select.
add_loss: Whether to also add the loss as a feature.
Returns:
combined numpy array with the selected features (n_examples, n_features)
"""
if top_k < 1:
raise ValueError('Must select at least one feature.')
feats = np.sort(features[feature_name], axis=-1)[:, :top_k]
if add_loss:
feats = np.concatenate((feats, features['loss'][:, np.newaxis]), axis=-1)
return feats
def subsample_to_balance(features: ArrayDict, random_state: int) -> ArrayDict:
"""Subsample if necessary to balance labels."""
train_idx = features['is_train'] == 1
test_idx = np.logical_not(train_idx)
n0 = np.sum(test_idx)
n1 = np.sum(train_idx)
if n0 < 20 or n1 < 20:
raise RuntimeError('Need at least 20 examples from training and test set.')
np.random.seed(random_state)
if n0 > n1:
use_idx = np.random.choice(np.where(test_idx)[0], n1, replace=False)
use_idx = np.concatenate((use_idx, np.where(train_idx)[0]))
features = {k: v[use_idx] for k, v in features.items()}
elif n0 < n1:
use_idx = np.random.choice(np.where(train_idx)[0], n0, replace=False)
use_idx = np.concatenate((use_idx, np.where(test_idx)[0]))
features = {k: v[use_idx] for k, v in features.items()}
return features
def get_train_test_split(features: ArrayDict, add_loss: bool,
test_size: float) -> Dataset:
"""Get training and test data split."""
y = features['is_train']
n_total = len(y)
n_test = int(test_size * n_total)
perm = np.random.permutation(len(y))
test_idx = perm[:n_test]
train_idx = perm[n_test:]
y_train = y[train_idx]
y_test = y[test_idx]
# We are using 10 top logits as a good default value if there are more than 10
# classes. Typically, there is no significant amount of weight in more than
# 10 logits.
n_logits = min(features['logits'].shape[1], 10)
x = get_features(features, 'logits', n_logits, add_loss)
x_train, x_test = x[train_idx], x[test_idx]
return (x_train, y_train), (x_test, y_test)
# ------------------------------------------------------------------------------
# Computation of the attack metrics
# ------------------------------------------------------------------------------
def compute_performance_metrics(true_labels: np.ndarray,
predictions: np.ndarray,
threshold: float = None) -> ArrayDict:
"""Compute relevant classification performance metrics.
The outout metrics are
1.arrays of thresholds and corresponding true and false positives (fpr, tpr).
2.auc area under fpr-tpr curve.
3.advantage max difference between tpr and fpr.
4.precision/recall/accuracy/f1_score if threshold arg is given.
Args:
true_labels: True labels.
predictions: Predicted probabilities/scores.
threshold: The threshold to use on `predictions` binary classification.
Returns:
A dictionary with relevant metrics which are fully described by their key.
"""
results = {}
if threshold is not None:
results.update({
'precision':
metrics.precision_score(true_labels, predictions > threshold),
'recall':
metrics.recall_score(true_labels, predictions > threshold),
'accuracy':
metrics.accuracy_score(true_labels, predictions > threshold),
'f1_score':
metrics.f1_score(true_labels, predictions > threshold),
})
fpr, tpr, thresholds = metrics.roc_curve(true_labels, predictions)
auc = metrics.auc(fpr, tpr)
advantage = np.max(np.abs(tpr - fpr))
results.update({
'fpr': fpr,
'tpr': tpr,
'thresholds': thresholds,
'auc': auc,
'advantage': advantage,
})
return ensure_1d(results)
# ------------------------------------------------------------------------------
# Loss functions
# ------------------------------------------------------------------------------
def log_loss(labels: np.ndarray, pred: np.ndarray, small_value=1e-8):

View file

@ -23,85 +23,6 @@ from tensorflow_privacy.privacy.membership_inference_attack import utils
class UtilsTest(absltest.TestCase):
def __init__(self, methodname):
"""Initialize the test class."""
super().__init__(methodname)
rng = np.random.RandomState(33)
logits = rng.uniform(low=0, high=1, size=(1000, 14))
loss = rng.uniform(low=0, high=1, size=(1000,))
is_train = rng.binomial(1, 0.7, size=(1000,))
self.mydict = {'logits': logits, 'loss': loss, 'is_train': is_train}
def test_compute_metrics(self):
"""Test computation of attack metrics."""
true = np.array([0, 0, 0, 1, 1, 1])
pred = np.array([0.6, 0.9, 0.4, 0.8, 0.7, 0.2])
results = utils.compute_performance_metrics(true, pred, threshold=0.5)
for k in [
'precision', 'recall', 'accuracy', 'f1_score', 'fpr', 'tpr',
'thresholds', 'auc', 'advantage'
]:
self.assertIn(k, results)
np.testing.assert_almost_equal(results['accuracy'], 1. / 2.)
np.testing.assert_almost_equal(results['precision'], 2. / (2. + 2.))
np.testing.assert_almost_equal(results['recall'], 2. / (2. + 1.))
def test_prepend_to_keys(self):
"""Test prepending of text to keys of a dictionary."""
mydict = utils.prepend_to_keys(self.mydict, 'test')
for k in mydict:
self.assertTrue(k.startswith('test'))
def test_select_indices(self):
"""Test selecting indices from dictionary with array values."""
mydict = {'a': np.arange(10), 'b': np.linspace(0, 1, 10)}
idx = np.arange(5)
mydictidx = utils.select_indices(mydict, idx)
np.testing.assert_allclose(mydictidx['a'], np.arange(5))
np.testing.assert_allclose(mydictidx['b'], np.linspace(0, 1, 10)[:5])
idx = np.array([1, 0, 1, 0, 1, 0, 1, 0, 1, 0]) > 0.5
mydictidx = utils.select_indices(mydict, idx)
np.testing.assert_allclose(mydictidx['a'], np.arange(0, 10, 2))
np.testing.assert_allclose(mydictidx['b'], np.linspace(0, 1, 10)[0:10:2])
def test_get_features(self):
"""Test extraction of features."""
for k in [1, 5, 10, 15]:
for add_loss in [True, False]:
feats = utils.get_features(
self.mydict, 'logits', top_k=k, add_loss=add_loss)
k_selected = min(k, 14)
self.assertEqual(feats.shape, (1000, k_selected + int(add_loss)))
def test_subsample_to_balance(self):
"""Test subsampling of two arrays."""
feats = utils.subsample_to_balance(self.mydict, random_state=23)
train = np.sum(self.mydict['is_train'])
test = 1000 - train
n_chosen = min(train, test)
self.assertEqual(feats['logits'].shape, (2 * n_chosen, 14))
self.assertEqual(feats['loss'].shape, (2 * n_chosen,))
self.assertEqual(np.sum(feats['is_train']), n_chosen)
self.assertEqual(np.sum(1 - feats['is_train']), n_chosen)
def test_get_data(self):
"""Test train test split data generation."""
for test_size in [0.2, 0.5, 0.8, 0.55555]:
(x_train, y_train), (x_test, y_test) = utils.get_train_test_split(
self.mydict, add_loss=True, test_size=test_size)
n_test = int(test_size * 1000)
n_train = 1000 - n_test
self.assertEqual(x_train.shape, (n_train, 11))
self.assertEqual(y_train.shape, (n_train,))
self.assertEqual(x_test.shape, (n_test, 11))
self.assertEqual(y_test.shape, (n_test,))
def test_log_loss(self):
"""Test computing cross-entropy loss."""
# Test binary case with a few normal values