From ceee90b1acfb5ab44ed40fa45d619a2fbb2a80f2 Mon Sep 17 00:00:00 2001 From: "A. Unique TensorFlower" Date: Fri, 14 Dec 2018 14:57:07 -0800 Subject: [PATCH] Add GaussianSumQuery and express GaussianAverageQuery in terms of it. Also: 1. Add unit tests for both types of query. 2. Add function "get_query_result" to PrivateQuery. (The utility of having this function is made clear in the test class, where the function _run_query operates on either GaussianSum- or GaussianAverageQueries.) PiperOrigin-RevId: 225609398 --- privacy/optimizers/dp_adam.py | 3 +- privacy/optimizers/dp_gradient_descent.py | 3 +- privacy/optimizers/gaussian_average_query.py | 132 +++++++++++++++--- .../optimizers/gaussian_average_query_test.py | 111 +++++++++++++++ privacy/optimizers/private_queries.py | 40 ++++++ 5 files changed, 266 insertions(+), 23 deletions(-) create mode 100644 privacy/optimizers/gaussian_average_query_test.py diff --git a/privacy/optimizers/dp_adam.py b/privacy/optimizers/dp_adam.py index 84aa474..1578782 100644 --- a/privacy/optimizers/dp_adam.py +++ b/privacy/optimizers/dp_adam.py @@ -95,8 +95,9 @@ class DPAdamOptimizer(tf.train.AdamOptimizer): grads, _ = zip(*super(DPAdamOptimizer, self).compute_gradients( tf.gather(microbatches_losses, [i]), var_list, gate_gradients, aggregation_method, colocate_gradients_with_ops, grad_loss)) + grads_list = list(grads) sample_state = self._privacy_helper.accumulate_record( - sample_params, sample_state, grads) + sample_params, sample_state, grads_list) return [tf.add(i, 1), sample_state] i = tf.constant(0) diff --git a/privacy/optimizers/dp_gradient_descent.py b/privacy/optimizers/dp_gradient_descent.py index e6d5f56..141e18b 100644 --- a/privacy/optimizers/dp_gradient_descent.py +++ b/privacy/optimizers/dp_gradient_descent.py @@ -80,8 +80,9 @@ class DPGradientDescentOptimizer(tf.train.GradientDescentOptimizer): grads, _ = zip(*super(DPGradientDescentOptimizer, self).compute_gradients( tf.gather(microbatches_losses, [i]), var_list, gate_gradients, aggregation_method, colocate_gradients_with_ops, grad_loss)) + grads_list = list(grads) sample_state = self._privacy_helper.accumulate_record( - sample_params, sample_state, grads) + sample_params, sample_state, grads_list) return [tf.add(i, 1), sample_state] i = tf.constant(0) diff --git a/privacy/optimizers/gaussian_average_query.py b/privacy/optimizers/gaussian_average_query.py index c6d4687..62a7c57 100644 --- a/privacy/optimizers/gaussian_average_query.py +++ b/privacy/optimizers/gaussian_average_query.py @@ -25,28 +25,33 @@ import tensorflow as tf from privacy.optimizers import private_queries +nest = tf.contrib.framework.nest -class GaussianAverageQuery(private_queries.PrivateAverageQuery): - """Implements PrivateQuery interface for Gaussian average queries. - Accumulates clipped vectors, then adds Gaussian noise to the average. +class GaussianSumQuery(private_queries.PrivateSumQuery): + """Implements PrivateQuery interface for Gaussian sum queries. + + Accumulates clipped vectors, then adds Gaussian noise to the sum. """ # pylint: disable=invalid-name _GlobalState = collections.namedtuple( - '_GlobalState', ['l2_norm_clip', 'stddev', 'denominator']) + '_GlobalState', ['l2_norm_clip', 'stddev']) - def __init__(self, l2_norm_clip, stddev, denominator): - """Initializes the GaussianAverageQuery.""" + def __init__(self, l2_norm_clip, stddev): + """Initializes the GaussianSumQuery. + + Args: + l2_norm_clip: The clipping norm to apply to the global norm of each + record. + stddev: The stddev of the noise added to the sum. + """ self._l2_norm_clip = l2_norm_clip self._stddev = stddev - self._denominator = denominator def initial_global_state(self): - """Returns the initial global state for the PrivacyHelper.""" - return self._GlobalState( - float(self._l2_norm_clip), float(self._stddev), - float(self._denominator)) + """Returns the initial global state for the GaussianSumQuery.""" + return self._GlobalState(float(self._l2_norm_clip), float(self._stddev)) def derive_sample_params(self, global_state): """Given the global state, derives parameters to use for the next sample. @@ -70,7 +75,7 @@ class GaussianAverageQuery(private_queries.PrivateAverageQuery): Returns: An initial sample state. """ del global_state # unused. - return tf.contrib.framework.nest.map_structure(tf.zeros_like, tensors) + return nest.map_structure(tf.zeros_like, tensors) def accumulate_record(self, params, sample_state, record): """Accumulates a single record into the sample state. @@ -84,9 +89,93 @@ class GaussianAverageQuery(private_queries.PrivateAverageQuery): The updated sample state. """ l2_norm_clip = params - clipped, _ = tf.clip_by_global_norm(record, l2_norm_clip) - return tf.contrib.framework.nest.map_structure(tf.add, sample_state, - clipped) + record_as_list = nest.flatten(record) + clipped_as_list, _ = tf.clip_by_global_norm(record_as_list, l2_norm_clip) + clipped = nest.pack_sequence_as(record, clipped_as_list) + return nest.map_structure(tf.add, sample_state, clipped) + + def get_noised_sum(self, sample_state, global_state): + """Gets noised sum after all records of sample have been accumulated. + + Args: + sample_state: The sample state after all records have been accumulated. + global_state: The global state. + + Returns: + A tuple (estimate, new_global_state) where "estimate" is the estimated + sum of the records and "new_global_state" is the updated global state. + """ + def add_noise(v): + return v + tf.random_normal(tf.shape(v), stddev=global_state.stddev) + + return nest.map_structure(add_noise, sample_state), global_state + + +class GaussianAverageQuery(private_queries.PrivateAverageQuery): + """Implements PrivateQuery interface for Gaussian average queries. + + Accumulates clipped vectors, adds Gaussian noise, and normalizes. + """ + + # pylint: disable=invalid-name + _GlobalState = collections.namedtuple( + '_GlobalState', ['sum_state', 'denominator']) + + def __init__(self, l2_norm_clip, sum_stddev, denominator): + """Initializes the GaussianAverageQuery. + + Args: + l2_norm_clip: The clipping norm to apply to the global norm of each + record. + sum_stddev: The stddev of the noise added to the sum (before + normalization). + denominator: The normalization constant (applied after noise is added to + the sum). + """ + self._sum_query = GaussianSumQuery(l2_norm_clip, sum_stddev) + self._denominator = denominator + + def initial_global_state(self): + """Returns the initial global state for the GaussianAverageQuery.""" + sum_global_state = self._sum_query.initial_global_state() + return self._GlobalState(sum_global_state, float(self._denominator)) + + def derive_sample_params(self, global_state): + """Given the global state, derives parameters to use for the next sample. + + Args: + global_state: The current global state. + + Returns: + Parameters to use to process records in the next sample. + """ + return self._sum_query.derive_sample_params(global_state.sum_state) + + def initial_sample_state(self, global_state, tensors): + """Returns an initial state to use for the next sample. + + Args: + global_state: The current global state. + tensors: A structure of tensors used as a template to create the initial + sample state. + + Returns: An initial sample state. + """ + # GaussianAverageQuery has no state beyond the sum state. + return self._sum_query.initial_sample_state(global_state.sum_state, tensors) + + def accumulate_record(self, params, sample_state, record): + """Accumulates a single record into the sample state. + + Args: + params: The parameters for the sample. + sample_state: The current sample state. + record: The record to accumulate. + + Returns: + The updated sample state. + """ + return self._sum_query.accumulate_record(params, sample_state, record) def get_noised_average(self, sample_state, global_state): """Gets noised average after all records of sample have been accumulated. @@ -99,10 +188,11 @@ class GaussianAverageQuery(private_queries.PrivateAverageQuery): A tuple (estimate, new_global_state) where "estimate" is the estimated average of the records and "new_global_state" is the updated global state. """ - def noised_average(v): - return tf.truediv( - v + tf.random_normal(tf.shape(v), stddev=self._stddev), - global_state.denominator) + noised_sum, new_sum_global_state = self._sum_query.get_noised_sum( + sample_state, global_state.sum_state) + new_global_state = self._GlobalState( + new_sum_global_state, global_state.denominator) + def normalize(v): + return tf.truediv(v, global_state.denominator) - return (tf.contrib.framework.nest.map_structure(noised_average, - sample_state), global_state) + return nest.map_structure(normalize, noised_sum), new_global_state diff --git a/privacy/optimizers/gaussian_average_query_test.py b/privacy/optimizers/gaussian_average_query_test.py new file mode 100644 index 0000000..28ce337 --- /dev/null +++ b/privacy/optimizers/gaussian_average_query_test.py @@ -0,0 +1,111 @@ +# Copyright 2018, The TensorFlow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for GaussianAverageQuery.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +import tensorflow as tf + +from privacy.optimizers import gaussian_average_query + + +class GaussianAverageQueryTest(tf.test.TestCase): + + def _run_query(self, query, *records): + """Executes query on the given set of records and returns the result.""" + global_state = query.initial_global_state() + params = query.derive_sample_params(global_state) + sample_state = query.initial_sample_state(global_state, records[0]) + for record in records: + sample_state = query.accumulate_record(params, sample_state, record) + result, _ = query.get_query_result(sample_state, global_state) + return result + + def test_gaussian_sum_no_clip_no_noise(self): + with self.cached_session() as sess: + record1 = tf.constant([2.0, 0.0]) + record2 = tf.constant([-1.0, 1.0]) + + query = gaussian_average_query.GaussianSumQuery( + l2_norm_clip=10.0, stddev=0.0) + query_result = self._run_query(query, record1, record2) + result = sess.run(query_result) + expected = [1.0, 1.0] + self.assertAllClose(result, expected) + + def test_gaussian_sum_with_clip_no_noise(self): + with self.cached_session() as sess: + record1 = tf.constant([-6.0, 8.0]) # Clipped to [-3.0, 4.0]. + record2 = tf.constant([4.0, -3.0]) # Not clipped. + + query = gaussian_average_query.GaussianSumQuery( + l2_norm_clip=5.0, stddev=0.0) + query_result = self._run_query(query, record1, record2) + result = sess.run(query_result) + expected = [1.0, 1.0] + self.assertAllClose(result, expected) + + def test_gaussian_sum_with_noise(self): + with self.cached_session() as sess: + record1, record2 = 2.71828, 3.14159 + stddev = 1.0 + + query = gaussian_average_query.GaussianSumQuery( + l2_norm_clip=5.0, stddev=stddev) + query_result = self._run_query(query, record1, record2) + + noised_sums = [] + for _ in xrange(1000): + noised_sums.append(sess.run(query_result)) + + result_stddev = np.std(noised_sums) + self.assertNear(result_stddev, stddev, 0.1) + + def test_gaussian_average_no_noise(self): + with self.cached_session() as sess: + record1 = tf.constant([5.0, 0.0]) # Clipped to [3.0, 0.0]. + record2 = tf.constant([-1.0, 2.0]) # Not clipped. + + query = gaussian_average_query.GaussianAverageQuery( + l2_norm_clip=3.0, sum_stddev=0.0, denominator=2.0) + query_result = self._run_query(query, record1, record2) + result = sess.run(query_result) + expected_average = [1.0, 1.0] + self.assertAllClose(result, expected_average) + + def test_gaussian_average_with_noise(self): + with self.cached_session() as sess: + record1, record2 = 2.71828, 3.14159 + sum_stddev = 1.0 + denominator = 2.0 + + query = gaussian_average_query.GaussianAverageQuery( + l2_norm_clip=5.0, sum_stddev=sum_stddev, denominator=denominator) + query_result = self._run_query(query, record1, record2) + + noised_averages = [] + for _ in xrange(1000): + noised_averages.append(sess.run(query_result)) + + result_stddev = np.std(noised_averages) + avg_stddev = sum_stddev / denominator + self.assertNear(result_stddev, avg_stddev, 0.1) + + +if __name__ == '__main__': + tf.test.main() diff --git a/privacy/optimizers/private_queries.py b/privacy/optimizers/private_queries.py index 86a1967..fd856ec 100644 --- a/privacy/optimizers/private_queries.py +++ b/privacy/optimizers/private_queries.py @@ -71,6 +71,42 @@ class PrivateQuery(object): """ pass + @abc.abstractmethod + def get_query_result(self, sample_state, global_state): + """Gets query result after all records of sample have been accumulated. + + Args: + sample_state: The sample state after all records have been accumulated. + global_state: The global state. + + Returns: + A tuple (result, new_global_state) where "result" is the result of the + query and "new_global_state" is the updated global state. + """ + pass + + +class PrivateSumQuery(PrivateQuery): + """Interface for differentially private mechanisms to compute a sum.""" + + @abc.abstractmethod + def get_noised_sum(self, sample_state, global_state): + """Gets estimate of sum after all records of sample have been accumulated. + + Args: + sample_state: The sample state after all records have been accumulated. + global_state: The global state. + + Returns: + A tuple (estimate, new_global_state) where "estimate" is the estimated + sum of the records and "new_global_state" is the updated global state. + """ + pass + + def get_query_result(self, sample_state, global_state): + """Delegates to get_noised_sum.""" + return self.get_noised_sum(sample_state, global_state) + class PrivateAverageQuery(PrivateQuery): """Interface for differentially private mechanisms to compute an average.""" @@ -88,3 +124,7 @@ class PrivateAverageQuery(PrivateQuery): average of the records and "new_global_state" is the updated global state. """ pass + + def get_query_result(self, sample_state, global_state): + """Delegates to get_noised_average.""" + return self.get_noised_average(sample_state, global_state)