Skip to content

Commit

Permalink
(Polishing) Dropping return_per_partition argument (#457)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvadym authored Jun 16, 2023
1 parent f70cce7 commit 8e63bee
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 60 deletions.
27 changes: 8 additions & 19 deletions analysis/parameter_tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ def tune(col,
options: TuneOptions,
data_extractors: Union[pipeline_dp.DataExtractors,
pipeline_dp.PreAggregateExtractors],
public_partitions=None,
return_utility_analysis_per_partition: bool = False):
public_partitions=None):
"""Tunes parameters.
It works in the following way:
Expand All @@ -243,12 +242,8 @@ def tune(col,
public_partitions: A collection of partition keys that will be present
in the result. If not provided, tuning will be performed assuming
private partition selection is used.
return_per_partition: if true, it returns tuple, with the 2nd element
utility analysis per partitions.
Returns:
if return_per_partition == False:
returns 1 element collection which contains TuneResult
else returns tuple (1 element collection which contains TuneResult,
returns tuple (1 element collection which contains TuneResult,
a collection which contains utility analysis results per partition).
"""
_check_tune_args(options)
Expand All @@ -265,30 +260,24 @@ def tune(col,
multi_param_configuration=candidates,
partitions_sampling_prob=options.partitions_sampling_prob,
pre_aggregated_data=options.pre_aggregated_data)
result = utility_analysis.perform_utility_analysis(

utility_result, per_partition_utility_result = utility_analysis.perform_utility_analysis(
col, backend, utility_analysis_options, data_extractors,
public_partitions, return_utility_analysis_per_partition)
if return_utility_analysis_per_partition:
utility_result, per_partition_utility_result = result
else:
utility_result = result
public_partitions)
# utility_result: (UtilityReport)
# per_partition_utility_result: (pk, (PerPartitionMetrics))
use_public_partitions = public_partitions is not None

utility_result = backend.to_list(utility_result, "To list")
# 1 element collection with list[UtilityReport]
utility_result = backend.map(
utility_result,
lambda result: _convert_utility_analysis_to_tune_result_new(
utility_result, lambda result: _convert_utility_analysis_to_tune_result(
result, options, candidates, use_public_partitions,
contribution_histograms), "To Tune result")
if return_utility_analysis_per_partition:
return utility_result, per_partition_utility_result
return utility_result
return utility_result, per_partition_utility_result


def _convert_utility_analysis_to_tune_result_new(
def _convert_utility_analysis_to_tune_result(
utility_reports: Tuple[metrics.UtilityReport],
tune_options: TuneOptions,
run_configurations: analysis.MultiParameterConfiguration,
Expand Down
21 changes: 9 additions & 12 deletions analysis/tests/parameter_tuning_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ def test_find_candidate_parameters_constant_relative_step_strategy_number_of_can
self.assertEqual(list(range(1, 51)),
candidates.max_contributions_per_partition)

@parameterized.parameters(False, True)
def test_tune_count_new(self, return_utility_analysis_per_partition: bool):
def test_tune_count_new(self):
# Arrange.
input = [(i % 10, f"pk{i/10}") for i in range(10)]
public_partitions = [f"pk{i}" for i in range(10)]
Expand All @@ -206,15 +205,13 @@ def test_tune_count_new(self, return_utility_analysis_per_partition: bool):
# Act.
result = parameter_tuning.tune(input, pipeline_dp.LocalBackend(),
contribution_histograms, tune_options,
data_extractors, public_partitions,
return_utility_analysis_per_partition)
data_extractors, public_partitions)

# Assert.
if return_utility_analysis_per_partition:
tune_result, per_partition_utility_analysis = result
self.assertLen(per_partition_utility_analysis, 10)
else:
tune_result = result
tune_result, per_partition_utility_analysis = result
per_partition_utility_analysis = list(per_partition_utility_analysis)
self.assertLen(per_partition_utility_analysis, 40)

tune_result = list(tune_result)[0]

self.assertEqual(tune_options, tune_result.options)
Expand Down Expand Up @@ -246,9 +243,9 @@ def test_tune_privacy_id_count_new(self):
]

# Act.
result = parameter_tuning.tune(input, pipeline_dp.LocalBackend(),
contribution_histograms, tune_options,
data_extractors, public_partitions)
result, _ = parameter_tuning.tune(input, pipeline_dp.LocalBackend(),
contribution_histograms, tune_options,
data_extractors, public_partitions)

# Assert.
result = list(result)[0]
Expand Down
8 changes: 5 additions & 3 deletions analysis/tests/utility_analysis_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def test_wo_public_partitions(self, pre_aggregated: bool):
partition_extractor=lambda x: f"pk{x[0]}",
preaggregate_extractor=lambda x: x[1])

col = analysis.perform_utility_analysis(
col, per_partition_result = analysis.perform_utility_analysis(
col=col,
backend=pipeline_dp.LocalBackend(),
options=analysis.UtilityAnalysisOptions(
Expand All @@ -94,6 +94,7 @@ def test_wo_public_partitions(self, pre_aggregated: bool):
data_extractors=data_extractors)

col = list(col)
per_partition_result = list(per_partition_result)

# Assert
self.assertLen(col, 1)
Expand Down Expand Up @@ -182,6 +183,7 @@ def test_wo_public_partitions(self, pre_aggregated: bool):
report=expected_copy)
]
common.assert_dataclasses_are_equal(self, report, expected)
self.assertLen(per_partition_result, 10)

@parameterized.named_parameters(
dict(testcase_name="Gaussian noise",
Expand Down Expand Up @@ -211,7 +213,7 @@ def test_w_public_partitions(self, noise_kind, expected_noise_std):
partition_extractor=lambda x: f"pk{x}",
value_extractor=lambda x: 0)

col = analysis.perform_utility_analysis(
col, _ = analysis.perform_utility_analysis(
col=col,
backend=pipeline_dp.LocalBackend(),
options=analysis.UtilityAnalysisOptions(
Expand Down Expand Up @@ -252,7 +254,7 @@ def test_multi_parameters(self):

public_partitions = ["pk0", "pk1"]

output = analysis.perform_utility_analysis(
output, _ = analysis.perform_utility_analysis(
col=input,
backend=pipeline_dp.LocalBackend(),
options=analysis.UtilityAnalysisOptions(
Expand Down
26 changes: 12 additions & 14 deletions analysis/utility_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ def perform_utility_analysis(
options: analysis.UtilityAnalysisOptions,
data_extractors: Union[pipeline_dp.DataExtractors,
pipeline_dp.PreAggregateExtractors],
public_partitions=None,
return_per_partition: bool = False):
public_partitions=None):
"""Performs utility analysis for DP aggregations.
Args:
Expand All @@ -59,16 +58,11 @@ def perform_utility_analysis(
public_partitions: A collection of partition keys that will be present
in the result. If not provided, the utility analysis with private
partition selection will be performed.
return_per_partition: if true, in addition it returns utility analysis per
partitions as a 2nd element.
Returns:
if return_per_partition == False:
returns collections which contains metrics.UtilityReport with
one report per each input configuration
else:
returns a tuple, with the 1st element as in first 'if' clause and
the 2nd a collection with elements
(partition_key, [metrics.PerPartitionMetrics]).
returns a tuple. Its 1st element is a collection which contains
metrics.UtilityReport with one report per each input configuration.
The 2nd element of the tuple is a collection with elements
((partition_key, configuration_index), metrics.PerPartitionMetrics).
"""
budget_accountant = pipeline_dp.NaiveBudgetAccountant(
total_epsilon=options.epsilon, total_delta=options.delta)
Expand Down Expand Up @@ -98,6 +92,12 @@ def perform_utility_analysis(
col = backend.flat_map(col, _unnest_metrics, "Unnest metrics")
# ((configuration_index, bucket), metrics.PerPartitionMetrics)

per_partition_result = backend.flat_map(
per_partition_result, lambda kv: (
((kv[0], i), result) for i, result in enumerate(kv[1])),
"Unpack PerPartitionMetrics from list")
# ((partition_key, configuration_index), metrics.PerPartitionMetrics)

combiner = cross_partition_combiners.CrossPartitionCombiner(
options.aggregate_params.metrics, public_partitions is not None)

Expand Down Expand Up @@ -126,9 +126,7 @@ def perform_utility_analysis(
"Group utility reports")
# result: (UtilityReport)

if return_per_partition:
return result, per_partition_result
return result
return result, per_partition_result


def _pack_per_partition_metrics(
Expand Down
15 changes: 3 additions & 12 deletions examples/restaurant_visits/run_without_frameworks_tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,20 +147,11 @@ def tune_parameters():
parameters_to_tune=parameters_to_tune,
pre_aggregated_data=FLAGS.run_on_preaggregated_data)

result, per_partition = parameter_tuning.tune(input, backend, hist,
tune_options, data_extractors,
public_partitions)
if FLAGS.output_file_per_partition_analysis:
result, per_partition = parameter_tuning.tune(
input,
backend,
hist,
tune_options,
data_extractors,
public_partitions,
return_utility_analysis_per_partition=True)
write_to_file(per_partition, FLAGS.output_file_per_partition_analysis)
else:
result = parameter_tuning.tune(restaurant_visits_rows, backend, hist,
tune_options, data_extractors,
public_partitions, False)

# Here's where the lazy iterator initiates computations and gets transformed
# into actual results
Expand Down

0 comments on commit 8e63bee

Please sign in to comment.