Skip to content

Commit

Permalink
Do not sample per partition when min/max_sum_per_partitions is set (#481
Browse files Browse the repository at this point in the history
)
  • Loading branch information
dvadym authored Aug 25, 2023
1 parent b5c90bb commit 2c76461
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 31 deletions.
3 changes: 2 additions & 1 deletion analysis/utility_analysis_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ def analyze(self,
return result

def _create_contribution_bounder(
self, params: pipeline_dp.AggregateParams
self, params: pipeline_dp.AggregateParams,
expects_per_partition_sampling: bool
) -> contribution_bounders.ContributionBounder:
"""Creates ContributionBounder for utility analysis."""
if self._options.pre_aggregated_data:
Expand Down
2 changes: 1 addition & 1 deletion pipeline_dp/aggregate_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import math
from dataclasses import dataclass
from enum import Enum
from typing import Any, Iterable, Sequence, Callable, Union, Optional, List
from typing import Any, Sequence, Callable, Optional, List

import numpy as np

Expand Down
20 changes: 20 additions & 0 deletions pipeline_dp/combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ def metrics_names(self) -> List[str]:
def explain_computation(self) -> ExplainComputationReport:
pass

def expects_per_partition_sampling(self) -> bool:
"""Whether this combiner expects sampled data per partition.
If this method returns True, the framework samples data per partition
up to aggregate_params.max_contributions_per_partition elements before
calling self.create_accumulator(). Otherwise all elements per
(privacy_id, partition_key) are passed to create_accumulator() and this
combiner has the full responsibility to bound sensitivity.
"""
return True


class CustomCombiner(Combiner, abc.ABC):
"""Base class for custom combiners.
Expand Down Expand Up @@ -309,6 +320,9 @@ def mechanism_spec(self) -> budget_accounting.MechanismSpec:
def sensitivities(self) -> dp_computations.Sensitivities:
return self._sensitivities

def expects_per_partition_sampling(self) -> bool:
return False


class SumCombiner(Combiner, AdditiveMechanismMixin):
"""Combiner for computing dp sum.
Expand Down Expand Up @@ -347,6 +361,9 @@ def compute_metrics(self, sum_: AccumulatorType) -> dict:
def metrics_names(self) -> List[str]:
return ['sum']

def expects_per_partition_sampling(self) -> bool:
return not self._bounding_per_partition

def explain_computation(self) -> ExplainComputationReport:

def explain_computation_fn():
Expand Down Expand Up @@ -718,6 +735,9 @@ def metrics_names(self) -> List[str]:
def explain_computation(self) -> ExplainComputationReport:
return [combiner.explain_computation() for combiner in self._combiners]

def expects_per_partition_sampling(self) -> bool:
return any(c.expects_per_partition_sampling() for c in self._combiners)


class VectorSumCombiner(Combiner):
"""Combiner for computing dp vector sum.
Expand Down
3 changes: 2 additions & 1 deletion pipeline_dp/contribution_bounders.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ def bound_contributions(self, col, params, backend, report_generator,
# Bound cross partition contributions with sampling.
sample = sampling_utils.choose_from_list_without_replacement
sample_size = params.max_partitions_contributed
col = backend.map_values(col, lambda a: sample(a, sample_size))
col = backend.map_values(col, lambda a: sample(a, sample_size),
"Sample")

# (privacy_id, [partition_key, [value]])

Expand Down
14 changes: 8 additions & 6 deletions pipeline_dp/dp_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ def _aggregate(self, col, params: pipeline_dp.AggregateParams,
self._add_report_stage(
f"Public partition selection: dropped non public partitions")
if not params.contribution_bounds_already_enforced:
contribution_bounder = self._create_contribution_bounder(params)
contribution_bounder = self._create_contribution_bounder(
params, combiner.expects_per_partition_sampling())
col = contribution_bounder.bound_contributions(
col, params, self._backend, self._current_report_generator,
combiner.create_accumulator)
Expand Down Expand Up @@ -367,17 +368,18 @@ def _create_compound_combiner(
self._budget_accountant)

def _create_contribution_bounder(
self, params: pipeline_dp.AggregateParams
self, params: pipeline_dp.AggregateParams,
expects_per_partition_sampling: bool
) -> contribution_bounders.ContributionBounder:
"""Creates ContributionBounder based on aggregation parameters."""
if params.max_contributions:
return \
contribution_bounders.SamplingPerPrivacyIdContributionBounder(
)
else:
return \
contribution_bounders.SamplingCrossAndPerPartitionContributionBounder(
)
if expects_per_partition_sampling:
return contribution_bounders.SamplingCrossAndPerPartitionContributionBounder(
)
return contribution_bounders.SamplingCrossPartitionContributionBounder()

def _extract_columns(self, col,
data_extractors: pipeline_dp.DataExtractors):
Expand Down
44 changes: 44 additions & 0 deletions tests/combiners_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,33 @@
from absl.testing import absltest
from absl.testing import parameterized
import typing
from typing import List
import pipeline_dp
import pipeline_dp.combiners as dp_combiners
import pipeline_dp.budget_accounting as ba

import numpy as np


class EmptyCombiner(dp_combiners.Combiner):
"""Empty combiner implementation for mocking."""

def create_accumulator(self, values):
return None

def merge_accumulators(self, accumulator1, accumulator2):
return None

def compute_metrics(self, accumulator):
return None

def metrics_names(self) -> List[str]:
return []

def explain_computation(self):
return None


def _create_mechanism_spec(
no_noise: bool,
mechanism_type: pipeline_dp.MechanismType = pipeline_dp.MechanismType.
Expand Down Expand Up @@ -354,6 +374,7 @@ def test_create_accumulator_per_contribution_bounding(self, no_noise):
# Bounding on values.
self.assertEqual(2, combiner.create_accumulator([1, 3]))
self.assertEqual(1, combiner.create_accumulator([0, 3]))
self.assertTrue(combiner.expects_per_partition_sampling())

def test_create_accumulator_per_partition_bound(self):
combiner = self._create_combiner(no_noise=False,
Expand All @@ -363,6 +384,7 @@ def test_create_accumulator_per_partition_bound(self):
# Clipping sum to [0, 3].
self.assertEqual(3, combiner.create_accumulator([4, 1]))
self.assertEqual(0, combiner.create_accumulator([-10, 5, 3]))
self.assertFalse(combiner.expects_per_partition_sampling())

@parameterized.named_parameters(
dict(testcase_name='no_noise', no_noise=True, per_partition_bound=True),
Expand Down Expand Up @@ -667,6 +689,28 @@ def test_compute_metrics_with_noise(self):
self.assertTrue(np.var(noised_count) > 1) # check that noise is added
self.assertTrue(np.var(noised_sum) > 1) # check that noise is added

def test_expects_per_partition_sampling(self):

class MockCombiner(EmptyCombiner):

def __init__(self, return_value: bool):
self._return_value = return_value

def expects_per_partition_sampling(self) -> bool:
return self._return_value

def create_combiner(return_values: List[bool]):
combiners = [MockCombiner(v) for v in return_values]
return dp_combiners.CompoundCombiner(combiners,
return_named_tuple=True)

self.assertTrue(
create_combiner([True]).expects_per_partition_sampling())
self.assertTrue(
create_combiner([True, False]).expects_per_partition_sampling())
self.assertFalse(
create_combiner([False, False]).expects_per_partition_sampling())


class VectorSumCombinerTest(parameterized.TestCase):

Expand Down
81 changes: 59 additions & 22 deletions tests/dp_engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,37 +509,50 @@ def test_aggregate_report(self):
@patch(
'pipeline_dp.contribution_bounders'
'.SamplingCrossAndPerPartitionContributionBounder.bound_contributions')
def test_aggregate_computation_graph(self, mock_bound_contributions):
def test_aggregate_computation_graph_per_contribution_bounding(
self, mock_bound_contributions):
# Arrange
aggregate_params, _ = self._create_params_default()
aggregate_params.metrics = [pipeline_dp.Metrics.COUNT]

engine = self._create_dp_engine_default()
mock_bound_contributions.return_value = []

engine.aggregate(col=[0],
params=aggregate_params,
data_extractors=self._get_default_extractors())

# Assert
mock_bound_contributions.assert_called_with(unittest.mock.ANY,
aggregate_params,
unittest.mock.ANY,
unittest.mock.ANY,
unittest.mock.ANY)

@patch('pipeline_dp.contribution_bounders'
'.SamplingCrossPartitionContributionBounder.bound_contributions')
def test_aggregate_computation_graph_per_partition_bounding(
self, mock_bound_contributions):
# Arrange
aggregate_params = pipeline_dp.AggregateParams(
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
metrics=[agg.Metrics.COUNT],
max_partitions_contributed=5,
max_contributions_per_partition=3)
budget_accountant = NaiveBudgetAccountant(total_epsilon=1,
total_delta=1e-10)

col = [[1], [2], [3], [3]]
data_extractor = pipeline_dp.DataExtractors(
privacy_id_extractor=lambda x: f"pid{x}",
partition_extractor=lambda x: f"pk{x}",
value_extractor=lambda x: x)
metrics=[pipeline_dp.Metrics.SUM],
min_sum_per_partition=0,
max_sum_per_partition=1,
max_partitions_contributed=1,
max_contributions_per_partition=1)

mock_bound_contributions.return_value = [
[("pid1", "pk1"), (1, [1])],
[("pid2", "pk2"), (1, [1])],
[("pid3", "pk3"), (1, [2])],
]
engine = self._create_dp_engine_default()
mock_bound_contributions.return_value = []

backend = pipeline_dp.LocalBackend()
engine = pipeline_dp.DPEngine(budget_accountant, backend)
engine.aggregate(col=col,
engine.aggregate(col=[0],
params=aggregate_params,
data_extractors=data_extractor)
data_extractors=self._get_default_extractors())

# Assert
mock_bound_contributions.assert_called_with(unittest.mock.ANY,
aggregate_params, backend,
aggregate_params,
unittest.mock.ANY,
unittest.mock.ANY,
unittest.mock.ANY)

Expand Down Expand Up @@ -1184,6 +1197,30 @@ def test_annotate_call(self, mock_annotate_fn):
self.assertEqual(total_epsilon / 3, budget.epsilon)
self.assertEqual(total_delta / 3, budget.delta)

def test_min_max_sum_per_partition(self):
dp_engine, budget_accountant = self._create_dp_engine_default(
epsilon=1000, return_accountant=True)
data = [1] * 1000 + [-1] * 1005
params = pipeline_dp.AggregateParams(metrics=[pipeline_dp.Metrics.SUM],
max_partitions_contributed=1,
min_sum_per_partition=-3,
max_sum_per_partition=1,
max_contributions_per_partition=1)
extractors = pipeline_dp.DataExtractors(
privacy_id_extractor=lambda _: 0,
partition_extractor=lambda _: 0,
value_extractor=lambda x: x)

output = dp_engine.aggregate(data,
params,
extractors,
public_partitions=[0])

budget_accountant.compute_budgets()
output = list(output)
self.assertLen(output, 1)
self.assertAlmostEqual(output[0][1].sum, -3, delta=0.1)

def test_pld_not_supported_metrics(self):
with self.assertRaisesRegex(
NotImplementedError,
Expand Down

0 comments on commit 2c76461

Please sign in to comment.