d5dcfec745
set_denominator was added so that the batch size doesn't need to be specified before constructing the optimizer, but it breaks the DPQuery abstraction. Now the optimizer uses a GaussianSumQuery instead of GaussianAverageQuery, and normalization by batch size is done inside the optimizer. Also instead of creating all DPQueries with a PrivacyLedger and then wrapping with QueryWithLedger, it is now sufficient to create the queries with no ledger and QueryWithLedger will construct the ledger and pass it to all inner queries. PiperOrigin-RevId: 251462353
245 lines
9.8 KiB
Python
245 lines
9.8 KiB
Python
# Copyright 2018, The TensorFlow Authors.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""Differentially private optimizers for TensorFlow."""
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
from distutils.version import LooseVersion
|
|
import tensorflow as tf
|
|
|
|
from privacy.analysis import privacy_ledger
|
|
from privacy.dp_query import gaussian_query
|
|
|
|
if LooseVersion(tf.__version__) < LooseVersion('2.0.0'):
|
|
nest = tf.contrib.framework.nest
|
|
else:
|
|
nest = tf.nest
|
|
|
|
|
|
def make_optimizer_class(cls):
|
|
"""Constructs a DP optimizer class from an existing one."""
|
|
if LooseVersion(tf.__version__) < LooseVersion('2.0.0'):
|
|
parent_code = tf.train.Optimizer.compute_gradients.__code__
|
|
child_code = cls.compute_gradients.__code__
|
|
GATE_OP = tf.train.Optimizer.GATE_OP # pylint: disable=invalid-name
|
|
else:
|
|
parent_code = tf.optimizers.Optimizer._compute_gradients.__code__ # pylint: disable=protected-access
|
|
child_code = cls._compute_gradients.__code__ # pylint: disable=protected-access
|
|
GATE_OP = None # pylint: disable=invalid-name
|
|
if child_code is not parent_code:
|
|
tf.logging.warning(
|
|
'WARNING: Calling make_optimizer_class() on class %s that overrides '
|
|
'method compute_gradients(). Check to ensure that '
|
|
'make_optimizer_class() does not interfere with overridden version.',
|
|
cls.__name__)
|
|
|
|
class DPOptimizerClass(cls):
|
|
"""Differentially private subclass of given class cls."""
|
|
|
|
def __init__(
|
|
self,
|
|
dp_sum_query,
|
|
num_microbatches=None,
|
|
unroll_microbatches=False,
|
|
*args, # pylint: disable=keyword-arg-before-vararg, g-doc-args
|
|
**kwargs):
|
|
"""Initialize the DPOptimizerClass.
|
|
|
|
Args:
|
|
dp_sum_query: DPQuery object, specifying differential privacy
|
|
mechanism to use.
|
|
num_microbatches: How many microbatches into which the minibatch is
|
|
split. If None, will default to the size of the minibatch, and
|
|
per-example gradients will be computed.
|
|
unroll_microbatches: If true, processes microbatches within a Python
|
|
loop instead of a tf.while_loop. Can be used if using a tf.while_loop
|
|
raises an exception.
|
|
"""
|
|
super(DPOptimizerClass, self).__init__(*args, **kwargs)
|
|
self._dp_sum_query = dp_sum_query
|
|
self._num_microbatches = num_microbatches
|
|
self._global_state = self._dp_sum_query.initial_global_state()
|
|
# TODO(b/122613513): Set unroll_microbatches=True to avoid this bug.
|
|
# Beware: When num_microbatches is large (>100), enabling this parameter
|
|
# may cause an OOM error.
|
|
self._unroll_microbatches = unroll_microbatches
|
|
|
|
def compute_gradients(self,
|
|
loss,
|
|
var_list,
|
|
gate_gradients=GATE_OP,
|
|
aggregation_method=None,
|
|
colocate_gradients_with_ops=False,
|
|
grad_loss=None,
|
|
gradient_tape=None):
|
|
if callable(loss):
|
|
# TF is running in Eager mode, check we received a vanilla tape.
|
|
if not gradient_tape:
|
|
raise ValueError('When in Eager mode, a tape needs to be passed.')
|
|
|
|
vector_loss = loss()
|
|
if self._num_microbatches is None:
|
|
self._num_microbatches = tf.shape(vector_loss)[0]
|
|
if isinstance(self._dp_sum_query, privacy_ledger.QueryWithLedger):
|
|
self._dp_sum_query.set_batch_size(self._num_microbatches)
|
|
sample_state = self._dp_sum_query.initial_sample_state(
|
|
self._global_state, var_list)
|
|
microbatches_losses = tf.reshape(vector_loss,
|
|
[self._num_microbatches, -1])
|
|
sample_params = (
|
|
self._dp_sum_query.derive_sample_params(self._global_state))
|
|
|
|
def process_microbatch(i, sample_state):
|
|
"""Process one microbatch (record) with privacy helper."""
|
|
microbatch_loss = tf.reduce_mean(tf.gather(microbatches_losses, [i]))
|
|
grads = gradient_tape.gradient(microbatch_loss, var_list)
|
|
sample_state = self._dp_sum_query.accumulate_record(
|
|
sample_params, sample_state, grads)
|
|
return sample_state
|
|
|
|
for idx in range(self._num_microbatches):
|
|
sample_state = process_microbatch(idx, sample_state)
|
|
|
|
grad_sums, self._global_state = (
|
|
self._dp_sum_query.get_noised_result(
|
|
sample_state, self._global_state))
|
|
|
|
def normalize(v):
|
|
return v / tf.cast(self._num_microbatches, tf.float32)
|
|
|
|
final_grads = nest.map_structure(normalize, grad_sums)
|
|
|
|
grads_and_vars = list(zip(final_grads, var_list))
|
|
return grads_and_vars
|
|
|
|
else:
|
|
# TF is running in graph mode, check we did not receive a gradient tape.
|
|
if gradient_tape:
|
|
raise ValueError('When in graph mode, a tape should not be passed.')
|
|
|
|
# Note: it would be closer to the correct i.i.d. sampling of records if
|
|
# we sampled each microbatch from the appropriate binomial distribution,
|
|
# although that still wouldn't be quite correct because it would be
|
|
# sampling from the dataset without replacement.
|
|
if self._num_microbatches is None:
|
|
self._num_microbatches = tf.shape(loss)[0]
|
|
if isinstance(self._dp_sum_query, privacy_ledger.QueryWithLedger):
|
|
self._dp_sum_query.set_batch_size(self._num_microbatches)
|
|
|
|
microbatches_losses = tf.reshape(loss, [self._num_microbatches, -1])
|
|
sample_params = (
|
|
self._dp_sum_query.derive_sample_params(self._global_state))
|
|
|
|
def process_microbatch(i, sample_state):
|
|
"""Process one microbatch (record) with privacy helper."""
|
|
grads, _ = zip(*super(cls, self).compute_gradients(
|
|
tf.reduce_mean(tf.gather(microbatches_losses,
|
|
[i])), var_list, gate_gradients,
|
|
aggregation_method, colocate_gradients_with_ops, grad_loss))
|
|
grads_list = [
|
|
g if g is not None else tf.zeros_like(v)
|
|
for (g, v) in zip(list(grads), var_list)
|
|
]
|
|
sample_state = self._dp_sum_query.accumulate_record(
|
|
sample_params, sample_state, grads_list)
|
|
return sample_state
|
|
|
|
if var_list is None:
|
|
var_list = (
|
|
tf.trainable_variables() + tf.get_collection(
|
|
tf.GraphKeys.TRAINABLE_RESOURCE_VARIABLES))
|
|
|
|
sample_state = self._dp_sum_query.initial_sample_state(
|
|
self._global_state, var_list)
|
|
|
|
if self._unroll_microbatches:
|
|
for idx in range(self._num_microbatches):
|
|
sample_state = process_microbatch(idx, sample_state)
|
|
else:
|
|
# Use of while_loop here requires that sample_state be a nested
|
|
# structure of tensors. In general, we would prefer to allow it to be
|
|
# an arbitrary opaque type.
|
|
cond_fn = lambda i, _: tf.less(i, self._num_microbatches)
|
|
body_fn = lambda i, state: [tf.add(i, 1), process_microbatch(i, state)] # pylint: disable=line-too-long
|
|
idx = tf.constant(0)
|
|
_, sample_state = tf.while_loop(cond_fn, body_fn, [idx, sample_state])
|
|
|
|
grad_sums, self._global_state = (
|
|
self._dp_sum_query.get_noised_result(
|
|
sample_state, self._global_state))
|
|
|
|
def normalize(v):
|
|
return tf.truediv(v, tf.cast(self._num_microbatches, tf.float32))
|
|
|
|
final_grads = nest.map_structure(normalize, grad_sums)
|
|
|
|
return list(zip(final_grads, var_list))
|
|
|
|
return DPOptimizerClass
|
|
|
|
|
|
def make_gaussian_optimizer_class(cls):
|
|
"""Constructs a DP optimizer with Gaussian averaging of updates."""
|
|
|
|
class DPGaussianOptimizerClass(make_optimizer_class(cls)):
|
|
"""DP subclass of given class cls using Gaussian averaging."""
|
|
|
|
def __init__(
|
|
self,
|
|
l2_norm_clip,
|
|
noise_multiplier,
|
|
num_microbatches=None,
|
|
ledger=None,
|
|
unroll_microbatches=False,
|
|
*args, # pylint: disable=keyword-arg-before-vararg
|
|
**kwargs):
|
|
dp_sum_query = gaussian_query.GaussianSumQuery(
|
|
l2_norm_clip, l2_norm_clip * noise_multiplier)
|
|
|
|
if ledger:
|
|
dp_sum_query = privacy_ledger.QueryWithLedger(dp_sum_query,
|
|
ledger=ledger)
|
|
|
|
super(DPGaussianOptimizerClass, self).__init__(
|
|
dp_sum_query,
|
|
num_microbatches,
|
|
unroll_microbatches,
|
|
*args,
|
|
**kwargs)
|
|
|
|
@property
|
|
def ledger(self):
|
|
return self._dp_sum_query.ledger
|
|
|
|
return DPGaussianOptimizerClass
|
|
|
|
if LooseVersion(tf.__version__) < LooseVersion('2.0.0'):
|
|
AdagradOptimizer = tf.train.AdagradOptimizer
|
|
AdamOptimizer = tf.train.AdamOptimizer
|
|
GradientDescentOptimizer = tf.train.GradientDescentOptimizer
|
|
else:
|
|
AdagradOptimizer = tf.optimizers.Adagrad
|
|
AdamOptimizer = tf.optimizers.Adam
|
|
GradientDescentOptimizer = tf.optimizers.SGD # pylint: disable=invalid-name
|
|
|
|
DPAdagradOptimizer = make_optimizer_class(AdagradOptimizer)
|
|
DPAdamOptimizer = make_optimizer_class(AdamOptimizer)
|
|
DPGradientDescentOptimizer = make_optimizer_class(GradientDescentOptimizer)
|
|
|
|
DPAdagradGaussianOptimizer = make_gaussian_optimizer_class(AdagradOptimizer)
|
|
DPAdamGaussianOptimizer = make_gaussian_optimizer_class(AdamOptimizer)
|
|
DPGradientDescentGaussianOptimizer = make_gaussian_optimizer_class(
|
|
GradientDescentOptimizer)
|