-
Notifications
You must be signed in to change notification settings - Fork 6.5k
/
Copy pathlog_redaction.py
161 lines (127 loc) · 4.73 KB
/
log_redaction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import argparse
import json
import logging
from apache_beam import (
CombineFn,
CombineGlobally,
DoFn,
io,
ParDo,
Pipeline,
WindowInto,
)
from apache_beam.error import PipelineError
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
from google.cloud import logging_v2
# TODO: Place inspection and de-identification configurations
class PayloadAsJson(DoFn):
"""Convert PubSub message payload to UTF-8 and return as JSON"""
def process(self, element):
yield json.loads(element.decode("utf-8"))
class BatchPayloads(CombineFn):
"""Opinionated way to batch all payloads in the window"""
def create_accumulator(self):
return []
def add_input(self, accumulator, input):
accumulator.append(input)
return accumulator
def merge_accumulators(self, accumulators):
merged = [item for accumulator in accumulators for item in accumulator]
return merged
def extract_output(self, accumulator):
return accumulator
# TODO: Placeholder for LogRedaction class
class IngestLogs(DoFn):
"""Ingest payloads into destination log"""
def __init__(self, destination_log_name):
self.destination_log_name = destination_log_name
self.logger = None
def _replace_log_name(self, entry):
# update log name in the entry with destination log
entry["logName"] = self.logger.name
return entry
def setup(self):
# initialize logging client
if self.logger:
return
logging_client = logging_v2.Client()
if not logging_client:
logging.error("Cannot create GCP Logging Client")
raise PipelineError("Cannot create GCP Logging Client")
self.logger = logging_client.logger(self.destination_log_name)
if not self.logger:
logging.error("Google client library cannot create Logger object")
raise PipelineError("Google client library cannot create Logger object")
def process(self, element):
if self.logger:
logs = list(map(self._replace_log_name, element))
self.logger.client.logging_api.write_entries(logs)
yield logs
def run(
pubsub_subscription: str,
destination_log_name: str,
window_size: float,
pipeline_args: list[str] = None,
) -> None:
"""Runs Dataflow pipeline"""
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True
)
# TODO: Read job's deployment region
pipeline = Pipeline(options=pipeline_options)
_ = (
pipeline
| "Read log entries from Pub/Sub"
>> io.ReadFromPubSub(subscription=pubsub_subscription)
| "Convert log entry payload to Json" >> ParDo(PayloadAsJson())
| "Aggregate payloads in fixed time intervals"
>> WindowInto(FixedWindows(window_size))
# Optimize Google API consumption and avoid possible throttling
# by calling APIs for batched data and not per each element
| "Batch aggregated payloads"
>> CombineGlobally(BatchPayloads()).without_defaults()
# TODO: Placeholder for redaction transformation
| "Ingest to output log" >> ParDo(IngestLogs(destination_log_name))
)
pipeline.run()
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--pubsub_subscription",
help="The Cloud Pub/Sub subscription to read from in the format "
'"projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>".',
)
parser.add_argument(
"--destination_log_name",
help="The log name to ingest log entries in the format "
'"projects/<PROJECT_ID>/logs/<LOG_ID>".',
)
parser.add_argument(
"--window_size",
type=float,
default=60.0,
help="Output file's window size in seconds.",
)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.pubsub_subscription,
known_args.destination_log_name,
known_args.window_size,
pipeline_args,
)