Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Module 1005 - API Custom Batching - Tests #90

Open
GeekSheikh opened this issue Apr 1, 2021 · 3 comments · Fixed by #381
Open

Module 1005 - API Custom Batching - Tests #90

GeekSheikh opened this issue Apr 1, 2021 · 3 comments · Fixed by #381
Labels
Testing Tests Needed Or Part of Testing Suite

Comments

@GeekSheikh
Copy link
Contributor

GeekSheikh commented Apr 1, 2021

API batching for Module 1005, Bronze_ClusterEventLogs. The reason this is such an important component is because the ONLY place to get this information (which drives cost and cluster util / everything) is the API. Some customers have 100s of thousand cluster event states per day. This is too much data to pull to the driver all at once, thus I batch it (not talking API pagination, that's working) but I built a custom batcher to query number of cluster events between x and y dates and then batch them to 500K. Pull 500K events, write to table, repeat. This data expires after 60d (so I'm told) and once the data expires in the API, it's not accessible programmatically anywhere else again.

Historical loads and large customers will hit this upper bound and it's critical to ensure the process is working. I envision several tests. I believe unit tests can be used to simulate a small upper bound (set 20 as the upper bound to batch for testing) and ensure that the data is pulled, written and then continues to pull the next batch.

This test isn't as important as the higher-level tests you're working on now as errors here would likely present themselves later but in some cases they may not. I wanted to put this on the radar as a required bronze test set need.

Relevant code is below.
The clusterEvents API call requires the clusterID, thus a list of all clusterIDs with new events must be created to send to the API calls as part of the API query. The section below is the section responsible for ensuring all clusters with new events are accounted for, this is a VERY CRITICAL component of Overwatch.

protected def prepClusterEventLogs(auditLogsTable: PipelineTable,
start_time: TimeTypes, end_time: TimeTypes,
apiEnv: ApiEnv,
organizationId: String): DataFrame = {
val extraQuery = Map(
"start_time" -> start_time.asUnixTimeMilli, // 1588935326000L, //
"end_time" -> end_time.asUnixTimeMilli, //1589021726000L //
"limit" -> 500
)
// TODO -- upgrade to incrementalDF
val auditDFBase = auditLogsTable.asDF
.filter(
'date.between(start_time.asColumnTS.cast("date"), end_time.asColumnTS.cast("date")) &&
'timestamp.between(lit(start_time.asUnixTimeMilli), lit(end_time.asUnixTimeMilli))
)
val existingClusterIds = auditDFBase
.filter('serviceName === "clusters" && 'actionName.like("%Result"))
.select($"requestParams.clusterId".alias("cluster_id"))
.filter('cluster_id.isNotNull)
.distinct
val newClusterIds = auditDFBase
.filter('serviceName === "clusters" && 'actionName === "create")
.select(get_json_object($"response.result", "$.cluster_id").alias("cluster_id"))
.filter('cluster_id.isNotNull)
.distinct
val clusterIDs = existingClusterIds
.unionByName(newClusterIds)
.distinct
.as[String]
.collect()

Batching Code:

val batchSize = 500000D
// TODO -- remove hard-coded path
val tmpClusterEventsPath = "/tmp/overwatch/bronze/clusterEventsBatches"
val clusterEventsBuffer = buildClusterEventBatches(apiEnv, batchSize, start_time.asUnixTimeMilli, end_time.asUnixTimeMilli, clusterIDs)
logger.log(Level.INFO, s"NUMBER OF BATCHES: ${clusterEventsBuffer.length} \n" +
s"ESTIMATED EVENTS: ${clusterEventsBuffer.length * batchSize.toInt}")
var batchCounter = 0
clusterEventsBuffer.foreach(clusterIdsBatch => {
batchCounter += 1
logger.log(Level.INFO, s"BEGINNING BATCH ${batchCounter} of ${clusterEventsBuffer.length}")
val clusterEvents = apiByID("clusters/events", apiEnv, "post",
clusterIdsBatch, "cluster_id", Some(extraQuery))
try {
val tdf = SchemaTools.scrubSchema(
spark.read.json(Seq(clusterEvents: _*).toDS()).select(explode('events).alias("events"))
.select(col("events.*"))
)
val changeInventory = Map[String, Column](
"details.attributes.custom_tags" -> SchemaTools.structToMap(tdf, "details.attributes.custom_tags"),
"details.attributes.spark_conf" -> SchemaTools.structToMap(tdf, "details.attributes.spark_conf"),
"details.attributes.spark_env_vars" -> SchemaTools.structToMap(tdf, "details.attributes.spark_env_vars"),
"details.previous_attributes.custom_tags" -> SchemaTools.structToMap(tdf, "details.previous_attributes.custom_tags"),
"details.previous_attributes.spark_conf" -> SchemaTools.structToMap(tdf, "details.previous_attributes.spark_conf"),
"details.previous_attributes.spark_env_vars" -> SchemaTools.structToMap(tdf, "details.previous_attributes.spark_env_vars")
)
SchemaTools.scrubSchema(tdf.select(SchemaTools.modifyStruct(tdf.schema, changeInventory): _*))
.withColumn("organization_id", lit(organizationId))
.write.mode("append").format("delta")
.option("mergeSchema", "true")
.save(tmpClusterEventsPath)
} catch {
case e: Throwable => {
logger.log(Level.WARN, s"While attempting to grab events data for clusters below, an error occurred" +
s"\n${clusterIdsBatch.foreach(println)}", e)
}
}
})

@GeekSheikh GeekSheikh added the Testing Tests Needed Or Part of Testing Suite label Apr 1, 2021
@GeekSheikh
Copy link
Contributor Author

Also, while you're deep in the weeds, if you have time, it'd be AWESOME to figure out why I'm having issues parallelizing the API calls. I don't want to serialize the calls and send them to the workers, this is unnecessary. The API concurrent limit is 32 so I would like to parallelize this to 12 or so (add configs later). When I try to par it on the driver, I get errors (as I recall). It'd be great to understand what the issue is if you have time.

If you don't get time, please break this out into another issue and we'll take it later.

@GeekSheikh
Copy link
Contributor Author

UPDATE: I found the issue that started this thought process.

The issue was I made a change a while back to simplify the "cluster IDs with new states" logic and my logic was wrong. I forgot to account for clusters that may have changed states but were not edited, started, restarted, or terminated. So while I get most of the cluster events, I miss cluster events for long-running clusters; big miss for customers with large, long-running clusters for streaming or shared interactive work. oops.

@GeekSheikh
Copy link
Contributor Author

@Sriram-databricks -- looks like most of this will be removed and so won't be needed. Please review and close this with the new APICall implementation for clsuterEvents

@GeekSheikh GeekSheikh modified the milestones: 0.6.1.2, 0.6.1.3 Aug 9, 2022
@GeekSheikh GeekSheikh modified the milestones: 0.7.0.1, backlog Oct 5, 2022
@gueniai gueniai removed this from the backlog milestone Sep 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Testing Tests Needed Or Part of Testing Suite
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants