Skip to content

Commit

Permalink
feat(platform): timeseries - Server & Client side changes to support …
Browse files Browse the repository at this point in the history
…timeseries aspect deletion & rollback. (datahub-project#4756)
  • Loading branch information
rslanka authored Sep 11, 2022
1 parent e556bcb commit 386719f
Show file tree
Hide file tree
Showing 26 changed files with 1,101 additions and 86 deletions.
22 changes: 21 additions & 1 deletion docs/how/delete-metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,28 @@ This physically deletes all rows for all aspects of the entity. This action cann
datahub delete --urn "<my urn>" --hard
```

As of datahub v.0.8.35 doing a hard delete by urn will also provide you with a way to remove references to the urn being deleted across the metadata graph. This is important to use if you don't want to have ghost references in your metadata model and want to save space in the graph database.
As of datahub v0.8.35 doing a hard delete by urn will also provide you with a way to remove references to the urn being deleted across the metadata graph. This is important to use if you don't want to have ghost references in your metadata model and want to save space in the graph database.
For now, this behaviour must be opted into by a prompt that will appear for you to manually accept or deny.

Starting v0.8.44.2, this also supports deletion of a specific `timeseries` aspect associated with the entity, optionally for a specific time range.

_Note: Deletion by a specific aspect and time range is currently supported only for timeseries aspects._

```bash
# Delete all of the aspect values for a given entity and a timeseries aspect.
datahub delete --urn "<entity urn>" -a "<timeseries aspect>" --hard
Eg: datahub delete --urn "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_dataset,TEST)" -a "datasetProfile" --hard

# Delete all of the aspect values for a given platform and a timeseries aspect.
datahub delete -p "<platform>" -a "<timeseries aspect>" --hard
Eg: datahub delete -p "snowflake" -a "datasetProfile" --hard

# Delete the aspect values for a given platform and a timeseries aspect corresponding to a specific time range.
datahub delete -p "<platform>" -a "<timeseries aspect>" --start-time '<start_time>' --end-time '<end_time>' --hard
Eg: datahub delete -p "snowflake" -a "datasetProfile" --start-time '2022-05-29 00:00:00' --end-time '2022-05-31 00:00:00' --hard
```


You can optionally add `-n` or `--dry-run` to execute a dry run before issuing the final delete command.
You can optionally add `-f` or `--force` to skip confirmations
You can optionally add `--only-soft-deleted` flag to remove soft-deleted items only.
Expand Down Expand Up @@ -119,6 +138,7 @@ datahub ingest rollback --run-id <run-id>
```

to rollback all aspects added with this run and all entities created by this run.
This deletes both the versioned and the timeseries aspects associated with these entities.

### Unsafe Entities and Rollback

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.linkedin.metadata.models;

import com.linkedin.metadata.models.registry.EntityRegistry;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;


public class EntitySpecUtils {
private EntitySpecUtils() {
}

public static List<String> getEntityTimeseriesAspectNames(@Nonnull EntityRegistry entityRegistry,
@Nonnull String entityName) {
final EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName);
final List<String> timeseriesAspectNames = entitySpec.getAspectSpecs()
.stream()
.filter(x -> x.isTimeseries())
.map(x -> x.getName())
.collect(Collectors.toList());
return timeseriesAspectNames;
}
}
26 changes: 11 additions & 15 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def post_delete_endpoint(
payload_obj: dict,
path: str,
cached_session_host: Optional[Tuple[Session, str]] = None,
) -> typing.Tuple[str, int]:
) -> typing.Tuple[str, int, int]:
session, gms_host = cached_session_host or get_session_and_host()
url = gms_host + path

Expand All @@ -314,16 +314,17 @@ def post_delete_endpoint_with_session_and_url(
session: Session,
url: str,
payload_obj: dict,
) -> typing.Tuple[str, int]:
) -> typing.Tuple[str, int, int]:
payload = json.dumps(payload_obj)

response = session.post(url, payload)

summary = parse_run_restli_response(response)
urn = summary.get("urn", "")
rows_affected = summary.get("rows", 0)
urn: str = summary.get("urn", "")
rows_affected: int = summary.get("rows", 0)
timeseries_rows_affected: int = summary.get("timeseriesRows", 0)

return urn, rows_affected
return urn, rows_affected, timeseries_rows_affected


def get_urns_by_filter(
Expand Down Expand Up @@ -624,7 +625,7 @@ def get_aspects_for_entity(
# Process timeseries aspects & append to aspect_list
timeseries_aspects: List[str] = [a for a in aspects if a in TIMESERIES_ASPECT_MAP]
for timeseries_aspect in timeseries_aspects:
timeseries_response = get_latest_timeseries_aspect_values(
timeseries_response: Dict = get_latest_timeseries_aspect_values(
entity_urn, timeseries_aspect, cached_session_host
)
values: List[Dict] = timeseries_response.get("value", {}).get("values", [])
Expand All @@ -633,18 +634,13 @@ def get_aspects_for_entity(
timeseries_aspect
)
if aspect_cls is not None:
aspect_value = values[0]
ts_aspect = values[0]["aspect"]
# Decode the json-encoded generic aspect value.
aspect_value["aspect"]["value"] = json.loads(
aspect_value["aspect"]["value"]
)
aspect_list[
aspect_cls.RECORD_SCHEMA.fullname.replace("pegasus2avro.", "")
] = aspect_value
ts_aspect["value"] = json.loads(ts_aspect["value"])
aspect_list[timeseries_aspect] = ts_aspect

aspect_map: Dict[str, Union[dict, _Aspect]] = {}
for a in aspect_list.values():
aspect_name = a["name"]
for aspect_name, a in aspect_list.items():
aspect_py_class: Optional[Type[Any]] = _get_pydantic_class_from_aspect_name(
aspect_name
)
Expand Down
101 changes: 84 additions & 17 deletions metadata-ingestion/src/datahub/cli/delete_cli.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
import time
from dataclasses import dataclass
from datetime import datetime
from random import choices
from typing import Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple

import click
import progressbar
Expand Down Expand Up @@ -30,25 +31,27 @@

@dataclass
class DeletionResult:
start_time_millis: int = int(time.time() * 1000.0)
end_time_millis: int = 0
start_time: int = int(time.time() * 1000.0)
end_time: int = 0
num_records: int = 0
num_timeseries_records: int = 0
num_entities: int = 0
sample_records: Optional[List[List[str]]] = None

def start(self) -> None:
self.start_time_millis = int(time.time() * 1000.0)
self.start_time = int(time.time() * 1000.0)

def end(self) -> None:
self.end_time_millis = int(time.time() * 1000.0)
self.end_time = int(time.time() * 1000.0)

def merge(self, another_result: "DeletionResult") -> None:
self.end_time_millis = another_result.end_time_millis
self.end_time = another_result.end_time
self.num_records = (
self.num_records + another_result.num_records
if another_result.num_records != UNKNOWN_NUM_RECORDS
else UNKNOWN_NUM_RECORDS
)
self.num_timeseries_records += another_result.num_timeseries_records
self.num_entities += another_result.num_entities
if another_result.sample_records:
if not self.sample_records:
Expand Down Expand Up @@ -82,26 +85,66 @@ def delete_for_registry(


@click.command()
@click.option("--urn", required=False, type=str)
@click.option("-f", "--force", required=False, is_flag=True)
@click.option("--soft/--hard", required=False, is_flag=True, default=True)
@click.option("-e", "--env", required=False, type=str)
@click.option("-p", "--platform", required=False, type=str)
@click.option("--entity_type", required=False, type=str, default="dataset")
@click.option("--urn", required=False, type=str, help="the urn of the entity")
@click.option(
"-a",
"--aspect_name",
required=False,
type=str,
help="the aspect name associated with the entity(only for timeseries aspects)",
)
@click.option(
"-f", "--force", required=False, is_flag=True, help="force the delete if set"
)
@click.option(
"--soft/--hard",
required=False,
is_flag=True,
default=True,
help="specifies soft/hard deletion",
)
@click.option(
"-e", "--env", required=False, type=str, help="the environment of the entity"
)
@click.option(
"-p", "--platform", required=False, type=str, help="the platform of the entity"
)
@click.option(
"--entity_type",
required=False,
type=str,
default="dataset",
help="the entity_type of the entity",
)
@click.option("--query", required=False, type=str)
@click.option(
"--start-time",
required=False,
type=click.DateTime(),
help="the start time(only for timeseries aspects)",
)
@click.option(
"--end-time",
required=False,
type=click.DateTime(),
help="the end time(only for timeseries aspects)",
)
@click.option("--registry-id", required=False, type=str)
@click.option("-n", "--dry-run", required=False, is_flag=True)
@click.option("--only-soft-deleted", required=False, is_flag=True, default=False)
@upgrade.check_upgrade
@telemetry.with_telemetry
def delete(
urn: str,
aspect_name: Optional[str],
force: bool,
soft: bool,
env: str,
platform: str,
entity_type: str,
query: str,
start_time: Optional[datetime],
end_time: Optional[datetime],
registry_id: str,
dry_run: bool,
only_soft_deleted: bool,
Expand Down Expand Up @@ -161,9 +204,12 @@ def delete(

deletion_result: DeletionResult = delete_one_urn_cmd(
urn,
aspect_name=aspect_name,
soft=soft,
dry_run=dry_run,
entity_type=entity_type,
start_time=start_time,
end_time=end_time,
cached_session_host=(session, host),
)

Expand Down Expand Up @@ -201,11 +247,14 @@ def delete(
if not dry_run:
message = "soft delete" if soft else "hard delete"
click.echo(
f"Took {(deletion_result.end_time_millis-deletion_result.start_time_millis)/1000.0} seconds to {message} {deletion_result.num_records} rows for {deletion_result.num_entities} entities"
f"Took {(deletion_result.end_time-deletion_result.start_time)/1000.0} seconds to {message}"
f" {deletion_result.num_records} versioned rows"
f" and {deletion_result.num_timeseries_records} timeseries aspect rows"
f" for {deletion_result.num_entities} entities."
)
else:
click.echo(
f"{deletion_result.num_entities} entities with {deletion_result.num_records if deletion_result.num_records != UNKNOWN_NUM_RECORDS else 'unknown'} rows will be affected. Took {(deletion_result.end_time_millis-deletion_result.start_time_millis)/1000.0} seconds to evaluate."
f"{deletion_result.num_entities} entities with {deletion_result.num_records if deletion_result.num_records != UNKNOWN_NUM_RECORDS else 'unknown'} rows will be affected. Took {(deletion_result.end_time-deletion_result.start_time)/1000.0} seconds to evaluate."
)
if deletion_result.sample_records:
click.echo(
Expand Down Expand Up @@ -276,7 +325,7 @@ def delete_with_filters(
click.echo(
f"No urns to delete. Maybe you want to change entity_type={entity_type} or platform={platform} to be something different?"
)
return DeletionResult(end_time_millis=int(time.time() * 1000.0))
return DeletionResult(end_time=int(time.time() * 1000.0))

if not force and not dry_run:
type_delete = "soft" if soft else "permanently"
Expand Down Expand Up @@ -320,6 +369,9 @@ def _delete_one_urn(
soft: bool = False,
dry_run: bool = False,
entity_type: str = "dataset",
aspect_name: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
cached_session_host: Optional[Tuple[sessions.Session, str]] = None,
cached_emitter: Optional[rest_emitter.DatahubRestEmitter] = None,
run_id: str = "delete-run-id",
Expand Down Expand Up @@ -359,13 +411,22 @@ def _delete_one_urn(
else:
logger.info(f"[Dry-run] Would soft-delete {urn}")
elif not dry_run:
payload_obj = {"urn": urn}
urn, rows_affected = cli_utils.post_delete_endpoint(
payload_obj: Dict[str, Any] = {"urn": urn}
if aspect_name:
payload_obj["aspectName"] = aspect_name
if start_time:
payload_obj["startTimeMillis"] = int(round(start_time.timestamp() * 1000))
if end_time:
payload_obj["endTimeMillis"] = int(round(end_time.timestamp() * 1000))
rows_affected: int
ts_rows_affected: int
urn, rows_affected, ts_rows_affected = cli_utils.post_delete_endpoint(
payload_obj,
"/entities?action=delete",
cached_session_host=cached_session_host,
)
deletion_result.num_records = rows_affected
deletion_result.num_timeseries_records = ts_rows_affected
else:
logger.info(f"[Dry-run] Would hard-delete {urn} {soft_delete_msg}")
deletion_result.num_records = (
Expand All @@ -379,9 +440,12 @@ def _delete_one_urn(
@telemetry.with_telemetry
def delete_one_urn_cmd(
urn: str,
aspect_name: Optional[str] = None,
soft: bool = False,
dry_run: bool = False,
entity_type: str = "dataset",
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
cached_session_host: Optional[Tuple[sessions.Session, str]] = None,
cached_emitter: Optional[rest_emitter.DatahubRestEmitter] = None,
) -> DeletionResult:
Expand All @@ -396,6 +460,9 @@ def delete_one_urn_cmd(
soft,
dry_run,
entity_type,
aspect_name,
start_time,
end_time,
cached_session_host,
cached_emitter,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.metadata.aspect.EnvelopedAspect;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.timeseries.AggregationSpec;
import com.linkedin.timeseries.DeleteAspectValuesResult;
import com.linkedin.timeseries.GenericTable;
import com.linkedin.timeseries.GroupingBucket;
import java.util.List;
Expand All @@ -29,4 +30,23 @@ List<EnvelopedAspect> getAspectValues(@Nonnull final Urn urn, @Nonnull String en
@Nonnull
GenericTable getAggregatedStats(@Nonnull String entityName, @Nonnull String aspectName,
@Nonnull AggregationSpec[] aggregationSpecs, @Nullable Filter filter, @Nullable GroupingBucket[] groupingBuckets);

/**
* Generic filter based deletion for timseries aspects.
* @param entityName - The name of the entity.
* @param aspectName - The name of the aspect.
* @param filter - The filter to be used for deletion of the documents on the index.
* @return - number of documents deleted.
*/
@Nonnull
DeleteAspectValuesResult deleteAspectValues(@Nonnull String entityName, @Nonnull String aspectName,
@Nonnull Filter filter);

/**
* Rollback the timeseries aspects associated with a runId.
* @param runId The runId that needs to be rolledback.
* @return
*/
@Nonnull
DeleteAspectValuesResult rollbackTimeseriesAspects(@Nonnull String runId);
}
Loading

0 comments on commit 386719f

Please sign in to comment.