forked from confluentinc/kafka-streams-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
CMSStore.scala
271 lines (249 loc) · 11.9 KB
/
CMSStore.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
/*
* Copyright Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.examples.streams.algebird
import com.twitter.algebird.{CMSHasher, TopCMS, TopPctCMS}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.processor.{ProcessorContext, StateStore}
import org.apache.kafka.streams.state.StateSerdes
/**
* An in-memory store that leverages the Count-Min Sketch implementation of
* [[https://github.com/twitter/algebird Twitter Algebird]].
*
* This store allows you to probabilistically count items of type T with a
* [[https://en.wikipedia.org/wiki/Count%E2%80%93min_sketch Count-Min Sketch]] data structure.
* Here, the counts returned by the store will be approximate counts, i.e. estimations, because a
* Count-Min Sketch trades slightly inaccurate counts for greatly reduced space utilization
* (however, the estimation error is mathematically proven to be bounded).
* With probability at least `1 - delta`, this estimate is within `eps * N` of the true frequency
* (i.e., `true frequency <= estimate <= true frequency + eps * N`), where `N` is the total number
* of items counted ("seen" in the input) so far (cf. [[CMSStore#totalCount]]).
*
* A traditional Count-Min Sketch is a fixed-size data structure that is essentially an array of
* counters of a particular width (derived from the parameter `eps`) and depth (derived from the
* parameter `delta`). The CMS variant used in this store, [[TopPctCMS]], additionally tracks the
* so-called "heavy hitters" among the counted items (i.e. the items with the largest counts) based
* on a percentage threshold; the size of heavy hitters is still bounded, however, hence the total
* size of the [[TopPctCMS]] data structure is still fixed.
*
* =Fault-tolerance=
*
* This store supports changelogging its state to Kafka and is thus fault-tolerant. Every time the
* store is flushed (cf. [[org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG]]) the
* underlying CMS data structure is written to the store's changelog topic. For many use cases
* this approach should be sufficiently efficient because the absolute size of a CMS is typically
* rather small (a few KBs up to a megabyte, depending on the CMS settings, which are determined by
* e.g. your error bound requirements for approximate counts).
*
* =Usage=
*
* Note: Twitter Algebird is best used with Scala, so all the examples below are in Scala, too.
*
* In a Kafka Streams application, you'd typically create this store as such:
*
* {{{
* val builder: StreamsBuilder = new StreamsBuilder()
*
* // In this example, we create a store for type [[String]].
* // It's recommended to reduce Kafka's log segment size for the changelogs of CMS stores, which
* // you can do by passing the respective Kafka setting to the CMSStoreBuilder via `withLoggingEnabled()`.
* builder.addStateStore(new CMSStoreBuilder[String]("my-cms-store-name", Serdes.String()))
* }}}
*
* Then you'd use the store within a [[org.apache.kafka.streams.processor.Processor]] or a
* [[org.apache.kafka.streams.kstream.Transformer]] similar to:
*
* {{{
* class ProbabilisticCounter extends Transformer[Array[Byte], String, KeyValue[String, Long]] {
*
* private var cmsState: CMSStore[String] = _
* private var processorContext: ProcessorContext = _
*
* override def init(processorContext: ProcessorContext): Unit = {
* this.processorContext = processorContext
* cmsState = this.processorContext.getStateStore("my-cms-store-name").asInstanceOf[CMSStore[String]]
* }
*
* override def transform(key: Array[Byte], value: String): KeyValue[String, Long] = {
* // Count the record value, think: "+ 1"
* cmsState.put(value)
*
* // Emit the latest count estimate for the record value
* KeyValue.pair[String, Long](value, cmsState.get(value))
* }
*
* override def punctuate(l: Long): KeyValue[String, Long] = null
*
* override def close(): Unit = {}
* }
* }}}
*
* @param name The name of this store instance
* @param loggingEnabled Whether or not changelogging (fault-tolerance) is enabled for this store.
* @param delta CMS parameter: A bound on the probability that a query estimate does not
* lie within some small interval (an interval that depends on `eps`) around
* the truth.
* See [[TopPctCMS]] and [[com.twitter.algebird.CMSMonoid]].
* @param eps CMS parameter: One-sided error bound on the error of each point query,
* i.e. frequency estimate.
* See [[TopPctCMS]] and [[com.twitter.algebird.CMSMonoid]].
* @param seed CMS parameter: A seed to initialize the random number generator used to
* create the pairwise independent hash functions. Typically you do not
* need to change this.
* See [[TopPctCMS]] and [[com.twitter.algebird.CMSMonoid]].
* @param heavyHittersPct CMS parameter: A threshold for finding heavy hitters, i.e., items that
* appear at least (heavyHittersPct * totalCount) times in the stream.
* Every item that appears at least `(heavyHittersPct * totalCount)` times
* is included, and with probability `p >= 1 - delta`, no item whose count
* is less than `(heavyHittersPct - eps) * totalCount` is included.
* This also means that this parameter is an upper bound on the number of
* heavy hitters that will be tracked: the set of heavy hitters contains at
* most `1 / heavyHittersPct` elements. For example, if
* `heavyHittersPct=0.01` (or 0.25), then at most `1 / 0.01 = 100` items
* or `1 / 0.25 = 4` items) will be tracked/returned as heavy hitters.
* This parameter can thus control the memory footprint required for
* tracking heavy hitters.
* See [[TopPctCMS]] and [[com.twitter.algebird.TopPctCMSMonoid]].
* @tparam T The type used to identify the items to be counted with the CMS. For example, if
* you want to count the occurrence of user names, you could use count user names
* directly with `T=String`; alternatively, you could map each username to a unique
* numeric ID expressed as a `Long`, and then count the occurrences of those `Long`s with
* a CMS of type `T=Long`. Note that such a mapping between the items of your problem
* domain and their identifiers used for counting via CMS should be bijective.
* We require a [[CMSHasher]] context bound for `K`, see [[CMSHasher]] for available
* implicits that can be imported.
* See [[com.twitter.algebird.CMSMonoid]] for further information.
*/
class CMSStore[T: CMSHasher](override val name: String,
val loggingEnabled: Boolean = true,
val delta: Double = 1E-10,
val eps: Double = 0.001,
val seed: Int = 1,
val heavyHittersPct: Double = 0.01)
extends StateStore {
private val cmsMonoid = TopPctCMS.monoid[T](eps, delta, seed, heavyHittersPct)
/**
* The "storage backend" of this store.
*
* Needs proper initializing in case the store's changelog is empty.
*/
private var cms: TopCMS[T] = cmsMonoid.zero
private var timestampOfLastStateStoreUpdate: Long = 0L
private var changeLogger: CMSStoreChangeLogger[Integer, TopCMS[T]] = _
/**
* The record key used to write to the state's changelog.
*
* This key can be a constant because:
*
* 1. We always write the full CMS when writing to the changelog.
* 2. A CMS does not retain information about which items were counted, i.e. it does not track
* information about the keyspace (in the case of this store, the only information about the
* keyspace are the heavy hitters); so, unless we opted for a different approach than (1)
* above, we cannot leverage keyspace information anyways.
* 3. We use a [[CMSStoreChangeLogger]] that uses a stream task's
* [[org.apache.kafka.streams.processor.TaskId]] to identify the changelog partition to write to.
* Thus only one particular stream task will ever be writing to that changelog partition.
* 4. When restoring from the changelog, a stream task will read only its own (one) changelog
* partition.
*
* In other words, we can hardcode the changelog key because only the "right" stream task will be
* (a) writing to AND (b) reading from the respective partition of the changelog.
*/
private[algebird] val changelogKey = 42
/**
* For unit testing
*/
private[algebird] def cmsFrom(items: Seq[T]): TopCMS[T] = cmsMonoid.create(items)
/**
* For unit testing
*/
private[algebird] def cmsFrom(item: T): TopCMS[T] = cmsMonoid.create(item)
@volatile private var open: Boolean = false
/**
* Initializes this store, including restoring the store's state from its changelog.
*/
override def init(context: ProcessorContext, root: StateStore) {
val serdes = new StateSerdes[Integer, TopCMS[T]](
name,
Serdes.Integer(),
TopCMSSerde[T])
changeLogger = new CMSStoreChangeLogger[Integer, TopCMS[T]](name, context, serdes)
// Note: We must manually guard with `loggingEnabled` here because `context.register()` ignores
// that parameter.
if (root != null && loggingEnabled) {
context.register(root, (_, value) => {
if (value == null) {
cms = cmsMonoid.zero
}
else {
cms = serdes.valueFrom(value)
}
})
}
open = true
}
/**
* Returns the estimated count of the item.
*
* @param item item to be counted
* @return estimated count
*/
def get(item: T): Long = cms.frequency(item).estimate
/**
* Counts the item.
*
* @param item item to be counted
*/
def put(item: T, timestamp: Long): Unit = {
cms = cms + item
timestampOfLastStateStoreUpdate = timestamp
}
/**
* The top items counted so far, with the percentage-based cut-off being defined by the CMS
* parameter `heavyHittersPct`.
*
* @return the top items counted so far
*/
def heavyHitters: Set[T] = cms.heavyHitters
/**
* Returns the total number of items counted ("seen" in the input) so far.
*
* This number is not the same as the total number of <em>unique</em> items counted so far, i.e.
* it is not the cardinality of the set of items.
*
* Example: After having counted the input "foo", "bar", "foo", the return value would be 3.
*
* @return number of count operations so far
*/
def totalCount: Long = cms.totalCount
override val persistent: Boolean = false
override def isOpen: Boolean = open
/**
* Periodically saves the latest CMS state to Kafka.
*
* =Implementation detail=
*
* The changelog records have the form: (hardcodedKey, CMS). That is, we are backing up the
* underlying CMS data structure in its entirety to Kafka.
*/
override def flush() {
if (loggingEnabled) {
changeLogger.logChange(changelogKey, cms, timestampOfLastStateStoreUpdate)
}
}
override def close() {
open = false
}
}