-
-
Notifications
You must be signed in to change notification settings - Fork 77
/
pre_aggregation.py
61 lines (54 loc) · 2.78 KB
/
pre_aggregation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# Copyright 2022 OpenMined.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pipeline_dp
import analysis.contribution_bounders as utility_contribution_bounders
def preaggregate(col,
backend: pipeline_dp.PipelineBackend,
data_extractors: pipeline_dp.DataExtractors,
partitions_sampling_prob: float = 1):
"""Preaggregates data from a collection.
The output is a collection with elements
(partition_key, (count, sum, n_partitions)).
Each element corresponds to each (privacy_id, partition_key) which is
present in the dataset. count and sum correspond to count and sum of values
contributed by the privacy_id to the partition_key. n_partitions is the
number of distinct partitions contributed by the privacy_id.
If partitions_sampling_prob < 1, the output partitions will be sampled.
Args:
col: collection where all elements are of the same type.
backend: PipelineBackend for performing transformations on collections.
data_extractors: functions that extract needed pieces of information
from elements of 'col'.
partitions_sampling_prob: the probability with which each partition
will be sampled. It is useful for speed-up computations on the large
datasets.
Returns:
a collection with elements (partition_key, (count, sum, n_partitions)).
"""
col = backend.map(
col, lambda row: (data_extractors.privacy_id_extractor(row),
data_extractors.partition_extractor(row),
data_extractors.value_extractor(row)),
"Extract (privacy_id, partition_key, value))")
# col: (privacy_id, partition_key, value):
bounder = utility_contribution_bounders.AnalysisContributionBounder(
partitions_sampling_prob)
col = bounder.bound_contributions(col,
params=None,
backend=backend,
report_generator=None,
aggregate_fn=lambda x: x)
# col: ((privacy_id, partition_key), (count, sum, n_partitions, n_contributions)).
return backend.map(col, lambda row: (row[0][1], row[1]), "Drop privacy id")
# (partition_key, (count, sum, n_partitions))