diff --git a/tensorflow_privacy/privacy/membership_inference_attack/utils.py b/tensorflow_privacy/privacy/membership_inference_attack/utils.py index 24785a7..0231bb2 100644 --- a/tensorflow_privacy/privacy/membership_inference_attack/utils.py +++ b/tensorflow_privacy/privacy/membership_inference_attack/utils.py @@ -15,214 +15,8 @@ # Lint as: python3 """Utility functions for membership inference attacks.""" -from typing import Text, Dict, Union, List, Any, Tuple - import numpy as np import scipy.special -from sklearn import metrics - - -ArrayDict = Dict[Text, np.ndarray] -Dataset = Tuple[Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]] - -# ------------------------------------------------------------------------------ -# Utilities for managing result dictionaries -# ------------------------------------------------------------------------------ - - -def to_numpy(in_dict: Dict[Text, Any]) -> ArrayDict: - """Convert values of dict to numpy arrays. - - Warning: This may fail if the values cannot be converted to numpy arrays. - - Args: - in_dict: A dictionary mapping Text keys to values where the values must be - something that can be converted to a numpy array. - - Returns: - a dictionary with the same keys as input with all values converted to numpy - arrays - """ - return {k: np.array(v) for k, v in in_dict.items()} - - -def ensure_1d(in_dict: Dict[Text, Union[int, float, np.ndarray]]) -> ArrayDict: - """Ensure all values of a dictionary are at least 1D numpy arrays. - - Args: - in_dict: The input dictionary mapping Text keys to numpy arrays or numbers. - - Returns: - dictionary with same keys as in_dict and values converted to numpy arrays - with at least one dimension (i.e., pack scalars into arrays) - """ - return {k: np.atleast_1d(v) for k, v in in_dict.items()} - - -def prepend_to_keys(in_dict: Dict[Text, Any], prefix: Text) -> Dict[Text, Any]: - """Prepend a prefix to all keys of a dictionary. - - Args: - in_dict: The input dictionary mapping Text keys to numpy arrays. - prefix: Text which to prepend to each key in in_dict - - Returns: - dictionary with same values as in_dict and all keys having prefix prepended - to them - """ - return {prefix + k: v for k, v in in_dict.items()} - - -# ------------------------------------------------------------------------------ -# Subsampling and data selection functionality -# ------------------------------------------------------------------------------ - - -def select_indices(in_dict: ArrayDict, indices: np.ndarray) -> ArrayDict: - """Subsample all values in the dictionary by the provided indices. - - Args: - in_dict: The input dictionary mapping Text keys to numpy array values. - indices: A numpy which can be used to index other arrays, specifying the - indices to subsample from in_dict values. - - Returns: - dictionary with same keys as in_dict and subsampled values - """ - return {k: v[indices] for k, v in in_dict.items()} - - -def merge_dictionaries(res: List[ArrayDict]) -> ArrayDict: - """Convert iterable of dicts to dict of iterables.""" - output = {k: np.empty(0) for k in res[0]} - for k in output: - output[k] = np.concatenate([r[k] for r in res if k in r], axis=0) - return output - - -def get_features(features: ArrayDict, - feature_name: Text, - top_k: int, - add_loss: bool = False) -> np.ndarray: - """Combine the specified features into one array. - - Args: - features: A dictionary containing all possible features. - feature_name: Which feature to use (logits or prob). - top_k: The number of the top features (of feature_name) to select. - add_loss: Whether to also add the loss as a feature. - - Returns: - combined numpy array with the selected features (n_examples, n_features) - """ - if top_k < 1: - raise ValueError('Must select at least one feature.') - feats = np.sort(features[feature_name], axis=-1)[:, :top_k] - if add_loss: - feats = np.concatenate((feats, features['loss'][:, np.newaxis]), axis=-1) - return feats - - -def subsample_to_balance(features: ArrayDict, random_state: int) -> ArrayDict: - """Subsample if necessary to balance labels.""" - train_idx = features['is_train'] == 1 - test_idx = np.logical_not(train_idx) - n0 = np.sum(test_idx) - n1 = np.sum(train_idx) - - if n0 < 20 or n1 < 20: - raise RuntimeError('Need at least 20 examples from training and test set.') - - np.random.seed(random_state) - - if n0 > n1: - use_idx = np.random.choice(np.where(test_idx)[0], n1, replace=False) - use_idx = np.concatenate((use_idx, np.where(train_idx)[0])) - features = {k: v[use_idx] for k, v in features.items()} - elif n0 < n1: - use_idx = np.random.choice(np.where(train_idx)[0], n0, replace=False) - use_idx = np.concatenate((use_idx, np.where(test_idx)[0])) - features = {k: v[use_idx] for k, v in features.items()} - - return features - - -def get_train_test_split(features: ArrayDict, add_loss: bool, - test_size: float) -> Dataset: - """Get training and test data split.""" - y = features['is_train'] - n_total = len(y) - n_test = int(test_size * n_total) - perm = np.random.permutation(len(y)) - test_idx = perm[:n_test] - train_idx = perm[n_test:] - y_train = y[train_idx] - y_test = y[test_idx] - - # We are using 10 top logits as a good default value if there are more than 10 - # classes. Typically, there is no significant amount of weight in more than - # 10 logits. - n_logits = min(features['logits'].shape[1], 10) - x = get_features(features, 'logits', n_logits, add_loss) - - x_train, x_test = x[train_idx], x[test_idx] - return (x_train, y_train), (x_test, y_test) - - -# ------------------------------------------------------------------------------ -# Computation of the attack metrics -# ------------------------------------------------------------------------------ - - -def compute_performance_metrics(true_labels: np.ndarray, - predictions: np.ndarray, - threshold: float = None) -> ArrayDict: - """Compute relevant classification performance metrics. - - The outout metrics are - 1.arrays of thresholds and corresponding true and false positives (fpr, tpr). - 2.auc area under fpr-tpr curve. - 3.advantage max difference between tpr and fpr. - 4.precision/recall/accuracy/f1_score if threshold arg is given. - - Args: - true_labels: True labels. - predictions: Predicted probabilities/scores. - threshold: The threshold to use on `predictions` binary classification. - - Returns: - A dictionary with relevant metrics which are fully described by their key. - """ - results = {} - if threshold is not None: - results.update({ - 'precision': - metrics.precision_score(true_labels, predictions > threshold), - 'recall': - metrics.recall_score(true_labels, predictions > threshold), - 'accuracy': - metrics.accuracy_score(true_labels, predictions > threshold), - 'f1_score': - metrics.f1_score(true_labels, predictions > threshold), - }) - - fpr, tpr, thresholds = metrics.roc_curve(true_labels, predictions) - auc = metrics.auc(fpr, tpr) - advantage = np.max(np.abs(tpr - fpr)) - - results.update({ - 'fpr': fpr, - 'tpr': tpr, - 'thresholds': thresholds, - 'auc': auc, - 'advantage': advantage, - }) - return ensure_1d(results) - - -# ------------------------------------------------------------------------------ -# Loss functions -# ------------------------------------------------------------------------------ def log_loss(labels: np.ndarray, pred: np.ndarray, small_value=1e-8): diff --git a/tensorflow_privacy/privacy/membership_inference_attack/utils_test.py b/tensorflow_privacy/privacy/membership_inference_attack/utils_test.py index d96bbbc..9384382 100644 --- a/tensorflow_privacy/privacy/membership_inference_attack/utils_test.py +++ b/tensorflow_privacy/privacy/membership_inference_attack/utils_test.py @@ -23,85 +23,6 @@ from tensorflow_privacy.privacy.membership_inference_attack import utils class UtilsTest(absltest.TestCase): - def __init__(self, methodname): - """Initialize the test class.""" - super().__init__(methodname) - rng = np.random.RandomState(33) - logits = rng.uniform(low=0, high=1, size=(1000, 14)) - loss = rng.uniform(low=0, high=1, size=(1000,)) - is_train = rng.binomial(1, 0.7, size=(1000,)) - self.mydict = {'logits': logits, 'loss': loss, 'is_train': is_train} - - def test_compute_metrics(self): - """Test computation of attack metrics.""" - true = np.array([0, 0, 0, 1, 1, 1]) - pred = np.array([0.6, 0.9, 0.4, 0.8, 0.7, 0.2]) - - results = utils.compute_performance_metrics(true, pred, threshold=0.5) - - for k in [ - 'precision', 'recall', 'accuracy', 'f1_score', 'fpr', 'tpr', - 'thresholds', 'auc', 'advantage' - ]: - self.assertIn(k, results) - - np.testing.assert_almost_equal(results['accuracy'], 1. / 2.) - np.testing.assert_almost_equal(results['precision'], 2. / (2. + 2.)) - np.testing.assert_almost_equal(results['recall'], 2. / (2. + 1.)) - - def test_prepend_to_keys(self): - """Test prepending of text to keys of a dictionary.""" - mydict = utils.prepend_to_keys(self.mydict, 'test') - for k in mydict: - self.assertTrue(k.startswith('test')) - - def test_select_indices(self): - """Test selecting indices from dictionary with array values.""" - mydict = {'a': np.arange(10), 'b': np.linspace(0, 1, 10)} - - idx = np.arange(5) - mydictidx = utils.select_indices(mydict, idx) - np.testing.assert_allclose(mydictidx['a'], np.arange(5)) - np.testing.assert_allclose(mydictidx['b'], np.linspace(0, 1, 10)[:5]) - - idx = np.array([1, 0, 1, 0, 1, 0, 1, 0, 1, 0]) > 0.5 - mydictidx = utils.select_indices(mydict, idx) - np.testing.assert_allclose(mydictidx['a'], np.arange(0, 10, 2)) - np.testing.assert_allclose(mydictidx['b'], np.linspace(0, 1, 10)[0:10:2]) - - def test_get_features(self): - """Test extraction of features.""" - for k in [1, 5, 10, 15]: - for add_loss in [True, False]: - feats = utils.get_features( - self.mydict, 'logits', top_k=k, add_loss=add_loss) - k_selected = min(k, 14) - self.assertEqual(feats.shape, (1000, k_selected + int(add_loss))) - - def test_subsample_to_balance(self): - """Test subsampling of two arrays.""" - feats = utils.subsample_to_balance(self.mydict, random_state=23) - - train = np.sum(self.mydict['is_train']) - test = 1000 - train - n_chosen = min(train, test) - self.assertEqual(feats['logits'].shape, (2 * n_chosen, 14)) - self.assertEqual(feats['loss'].shape, (2 * n_chosen,)) - self.assertEqual(np.sum(feats['is_train']), n_chosen) - self.assertEqual(np.sum(1 - feats['is_train']), n_chosen) - - def test_get_data(self): - """Test train test split data generation.""" - for test_size in [0.2, 0.5, 0.8, 0.55555]: - (x_train, y_train), (x_test, y_test) = utils.get_train_test_split( - self.mydict, add_loss=True, test_size=test_size) - n_test = int(test_size * 1000) - n_train = 1000 - n_test - self.assertEqual(x_train.shape, (n_train, 11)) - self.assertEqual(y_train.shape, (n_train,)) - self.assertEqual(x_test.shape, (n_test, 11)) - self.assertEqual(y_test.shape, (n_test,)) - def test_log_loss(self): """Test computing cross-entropy loss.""" # Test binary case with a few normal values