Skip to content

Commit

Permalink
Merge pull request yahoo#580 from sapphirew/thetaSketchEstimate
Browse files Browse the repository at this point in the history
Adding ThetaSketchEstimateWrapper
  • Loading branch information
sapphirew authored Nov 26, 2019
2 parents b910ebe + e9b749b commit 995aac1
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 1 deletion.
15 changes: 15 additions & 0 deletions core/src/main/scala/com/yahoo/maha/core/DerivedExpression.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,21 @@ object DruidExpression {
)))*/
}

case class ThetaSketchEstimateWrapper(name: String) extends BaseDruidExpression {

def render(insideDerived: Boolean) = {
(s: String, aggregatorNameAliasMap: Map[String, String]) =>
val fieldName = name.replaceAll("[}{]","")
val e = fromString(name)
new SketchEstimatePostAggregator(fieldName, e.render(insideDerived)(e.fieldNamePlaceHolder,aggregatorNameAliasMap), null)
}

val hasRollupExpression = false
val hasNumericOperation = false

def asString = name
}

case class ThetaSketchEstimator(fn: ThetaSketchSetOp, aggregators: List[String]) extends BaseDruidExpression {
import scala.collection.JavaConverters._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class BaseDruidQueryGeneratorTest extends FunSuite with Matchers with BeforeAndA
, FactCol("segments_unique_users", DecType(), DruidFilteredRollup(InFilter("segments", List("1234")), "uniqueUserCount", DruidThetaSketchRollup))
, FactCol("conv_unique_users", DecType(), DruidFilteredRollup(JavaScriptFilter("segments", "function(x) { return x > 0; }"), "uniqueUserCount", DruidThetaSketchRollup))
, DruidDerFactCol("Total Unique User Count", DecType(), ThetaSketchEstimator(INTERSECT, List("{ageBucket_unique_users}", "{woeids_unique_users}", "{segments_unique_users}")))
, DruidDerFactCol("Conv Segments Unique User Count", DecType(), ThetaSketchEstimateWrapper("{conv_unique_users}") ++ ThetaSketchEstimateWrapper("{segments_unique_users}"))
),
annotations = annotations
)
Expand Down Expand Up @@ -346,7 +347,8 @@ class BaseDruidQueryGeneratorTest extends FunSuite with Matchers with BeforeAndA
PublicFactCol("segments_unique_users", "segments_unique_users", InBetweenEquality),
PublicFactCol("conv_unique_users", "Conversion User Count", InBetweenEquality),
PublicFactCol("Total Unique User Count", "Total Unique User Count", InBetweenEquality),
PublicFactCol("variance", "Variance", InBetweenEquality)
PublicFactCol("variance", "Variance", InBetweenEquality),
PublicFactCol("Conv Segments Unique User Count", "Conv Segments Unique User Count", InBetweenEquality)
),
//Set(EqualityFilter("Source", "2")),
Set(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3130,4 +3130,40 @@ class DruidQueryGeneratorTest extends BaseDruidQueryGeneratorTest {

assert(result.contains(json), result)
}

test("Test generating a query for a request with derived fact column using ThetaSketchEstimateWrapper") {
val jsonString =
s"""{
"cube": "k_stats",
"selectFields": [
{"field": "Day"},
{"field": "Impressions"},
{"field": "Clicks"},
{"field": "segments_unique_users"},
{"field": "Conversion User Count"},
{"field": "Conv Segments Unique User Count"},

{"field": "Advertiser ID"}
],
"filterExpressions": [
{"field": "Day", "operator": "between", "from": "$fromDate", "to": "$toDate"},
{"field": "Advertiser ID", "operator": "=", "value": "12345"}
],
"paginationStartIndex":20,
"rowsPerPage":100
}"""

val request: ReportingRequest = ReportingRequest.forceDruid(getReportingRequestSync(jsonString))
val registry = defaultRegistry
val requestModel = RequestModel.from(request, registry)
assert(requestModel.isSuccess, requestModel.errorMessage("Building request model failed"))

val queryPipelineTry = generatePipeline(requestModel.toOption.get)
assert(queryPipelineTry.isSuccess, queryPipelineTry.errorMessage("Fail to get the query pipeline"))

val queryPipeline = queryPipelineTry.toOption.get
val query = queryPipelineTry.toOption.get.queryChain.drivingQuery.asInstanceOf[DruidQuery[_]].asString
print(query)
assert(query.contains(s""""postAggregations":[{"type":"arithmetic","name":"Conv Segments Unique User Count","fn":"+","fields":[{"type":"thetaSketchEstimate","name":"conv_unique_users","field":{"type":"fieldAccess","name":"conv_unique_users","fieldName":"Conversion User Count"}},{"type":"thetaSketchEstimate","name":"segments_unique_users","field":{"type":"fieldAccess","name":"segments_unique_users","fieldName":"segments_unique_users"}}]}]"""))
}
}

0 comments on commit 995aac1

Please sign in to comment.