Skip to content

Commit

Permalink
BanyanDB: stream sort-by time query, use internal time-series rathe…
Browse files Browse the repository at this point in the history
…r than `index` to improve the query performance. (apache#12486)
  • Loading branch information
wankai123 authored Jul 29, 2024
1 parent 6462436 commit 9644f3f
Show file tree
Hide file tree
Showing 17 changed files with 41 additions and 32 deletions.
2 changes: 2 additions & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
* Replace workaround with Armeria native supported context path.
* Add an http endpoint wrapper for health check.
* Bump up Armeria and transitive dependencies.
* BanyanDB: if the model column is already a `@BanyanDB.TimestampColumn`, set `@BanyanDB.NoIndexing` on it to reduce indexes.
* BanyanDB: stream sort-by `time` query, use internal time-series rather than `index` to improve the query performance.

#### UI

Expand Down
2 changes: 1 addition & 1 deletion oap-server-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
<httpcore.version>4.4.13</httpcore.version>
<httpasyncclient.version>4.1.5</httpasyncclient.version>
<commons-compress.version>1.21</commons-compress.version>
<banyandb-java-client.version>0.7.0-rc1</banyandb-java-client.version>
<banyandb-java-client.version>0.7.0-rc2</banyandb-java-client.version>
<kafka-clients.version>3.4.0</kafka-clients.version>
<spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
<consul.client.version>1.5.3</consul.client.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class SegmentRecord extends Record {
@Setter
@Getter
@Column(name = START_TIME)
@BanyanDB.NoIndexing
private long startTime;
@Setter
@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class SpanAttachedEventRecord extends Record {
@Setter
@Getter
@Column(name = TIMESTAMP)
@BanyanDB.NoIndexing
private long timestamp;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class EBPFProfilingTaskRecord extends NoneStream {
@Column(name = TARGET_TYPE)
private int targetType = EBPFProfilingTargetType.UNKNOWN.value();
@Column(name = CREATE_TIME)
@BanyanDB.NoIndexing
private long createTime;
@Column(name = LAST_UPDATE_TIME)
private long lastUpdateTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public StorageID id() {
@Column(name = TASK_ID)
private String taskId;
@Column(name = START_TIME)
@BanyanDB.NoIndexing
private long startTime;
@Column(name = DURATION)
private int duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class ProfileThreadSnapshotRecord extends Record {
@BanyanDB.SeriesID(index = 0)
private String segmentId;
@Column(name = DUMP_TIME)
@BanyanDB.NoIndexing
private long dumpTime;
@Column(name = SEQUENCE)
private int sequence;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,12 @@ public class ZipkinSpanRecord extends Record {
@Setter
@Getter
@Column(name = TIMESTAMP_MILLIS)
@BanyanDB.NoIndexing
private long timestampMillis;
@Setter
@Getter
@Column(name = TIMESTAMP)
@BanyanDB.NoIndexing
private long timestamp;
@Setter
@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public void apply(final StreamQuery query) {
if (request.maxDuration() != null) {
query.and(lte(ZipkinSpanRecord.DURATION, request.maxDuration()));
}
query.setOrderBy(new StreamQuery.OrderBy(ZipkinSpanRecord.TIMESTAMP_MILLIS, AbstractQuery.Sort.DESC));
query.setOrderBy(new StreamQuery.OrderBy(AbstractQuery.Sort.DESC));
query.setLimit(limit);
}
};
Expand Down Expand Up @@ -276,7 +276,7 @@ protected void apply(StreamQuery query) {
query.criteria(or(conditions));
}
query.setOrderBy(
new StreamQuery.OrderBy(ZipkinSpanRecord.TIMESTAMP_MILLIS, AbstractQuery.Sort.DESC));
new StreamQuery.OrderBy(AbstractQuery.Sort.DESC));
query.setLimit(QUERY_MAX_SIZE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.google.common.collect.ImmutableSet;
import org.apache.skywalking.banyandb.v1.client.AbstractCriteria;
import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
import org.apache.skywalking.banyandb.v1.client.DataPoint;
import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
Expand Down Expand Up @@ -98,9 +99,9 @@ protected void apply(MeasureQuery query) {
query.limit(page.getLimit());
query.offset(page.getFrom());
if (queryOrder == Order.ASC) {
query.setOrderBy(asc(Event.START_TIME));
query.setOrderBy(new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.ASC));
} else {
query.setOrderBy(desc(Event.START_TIME));
query.setOrderBy(new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.DESC));
}
for (final EventQueryCondition condition : conditionList) {
List<PairQueryCondition<?>> queryConditions = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli

private static final TimestampRange LARGEST_TIME_RANGE = new TimestampRange(0, UPPER_BOUND.toEpochMilli());

protected static final long UPPER_BOUND_TIME = UPPER_BOUND.toEpochMilli();

protected static final long LOWER_BOUND_TIME = 0;

protected AbstractBanyanDBDAO(BanyanDBStorageClient client) {
super(client);
}
Expand Down Expand Up @@ -305,14 +309,6 @@ protected PairQueryCondition<Long> ne(String name, long value) {
return PairQueryCondition.LongQueryCondition.ne(name, value);
}

protected AbstractQuery.OrderBy desc(String name) {
return new AbstractQuery.OrderBy(name, AbstractQuery.Sort.DESC);
}

protected AbstractQuery.OrderBy asc(String name) {
return new AbstractQuery.OrderBy(name, AbstractQuery.Sort.ASC);
}

protected AbstractCriteria and(List<? extends AbstractCriteria> conditions) {
if (conditions.isEmpty()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected void apply(StreamQuery query) {
if (triggerType != null) {
query.and(eq(EBPFProfilingTaskRecord.TRIGGER_TYPE, triggerType.value()));
}
query.setOrderBy(new AbstractQuery.OrderBy(EBPFProfilingTaskRecord.CREATE_TIME, AbstractQuery.Sort.DESC));
query.setOrderBy(new AbstractQuery.OrderBy(AbstractQuery.Sort.DESC));
}
});
tasks.addAll(resp.getElements().stream().map(this::buildTask).collect(Collectors.toList()));
Expand Down Expand Up @@ -100,7 +100,7 @@ protected void apply(StreamQuery query) {
}
query.and(eq(EBPFProfilingTaskRecord.TARGET_TYPE, targetType.value()));
appendTimeQuery(this, query, taskStartTime, latestUpdateTime);
query.setOrderBy(new AbstractQuery.OrderBy(EBPFProfilingTaskRecord.CREATE_TIME, AbstractQuery.Sort.DESC));
query.setOrderBy(new AbstractQuery.OrderBy(AbstractQuery.Sort.DESC));
}
});
tasks.addAll(resp.getElements().stream().map(this::buildTask).collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.banyandb.v1.client.TimestampRange;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.profiling.trace.ProfileTaskRecord;
import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
Expand Down Expand Up @@ -58,7 +59,15 @@ public BanyanDBProfileTaskQueryDAO(BanyanDBStorageClient client, int queryMaxSiz

@Override
public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
StreamQueryResponse resp = query(ProfileTaskRecord.INDEX_NAME, TAGS,
long startTS = LOWER_BOUND_TIME;
long endTS = UPPER_BOUND_TIME;
if (startTimeBucket != null) {
startTS = TimeBucket.getTimestamp(startTimeBucket);
}
if (endTimeBucket != null) {
endTS = TimeBucket.getTimestamp(endTimeBucket);
}
StreamQueryResponse resp = query(ProfileTaskRecord.INDEX_NAME, TAGS, new TimestampRange(startTS, endTS),
new QueryBuilder<StreamQuery>() {
@Override
protected void apply(StreamQuery query) {
Expand All @@ -68,19 +77,13 @@ protected void apply(StreamQuery query) {
if (StringUtil.isNotEmpty(endpointName)) {
query.and(eq(ProfileTaskRecord.ENDPOINT_NAME, endpointName));
}
if (startTimeBucket != null) {
query.and(gte(ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket)));
}
if (endTimeBucket != null) {
query.and(lte(ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket)));
}

if (limit != null) {
query.setLimit(limit);
} else {
query.setLimit(BanyanDBProfileTaskQueryDAO.this.queryMaxSize);
}
query.setOrderBy(new AbstractQuery.OrderBy(ProfileTaskRecord.START_TIME, AbstractQuery.Sort.DESC));
query.setOrderBy(new AbstractQuery.OrderBy(AbstractQuery.Sort.DESC));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;

import com.google.common.collect.ImmutableSet;
import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.banyandb.v1.client.TimestampRange;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.profiling.trace.ProfileThreadSnapshotRecord;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThreadSnapshotQueryDAO;
Expand Down Expand Up @@ -84,7 +86,7 @@ public List<String> queryProfiledSegmentIdList(String taskId) throws IOException
public void apply(StreamQuery query) {
query.and(eq(ProfileThreadSnapshotRecord.TASK_ID, taskId))
.and(eq(ProfileThreadSnapshotRecord.SEQUENCE, 0L));
query.setOrderBy(desc(ProfileThreadSnapshotRecord.DUMP_TIME));
query.setOrderBy(new StreamQuery.OrderBy(AbstractQuery.Sort.DESC));
query.setLimit(querySegmentMaxSize);
}
});
Expand Down Expand Up @@ -135,13 +137,11 @@ public void apply(StreamQuery query) {

private int querySequenceWithAgg(AggType aggType, String segmentId, long start, long end) throws IOException {
StreamQueryResponse resp = query(ProfileThreadSnapshotRecord.INDEX_NAME,
TAGS_ALL,
TAGS_ALL, new TimestampRange(start, end),
new QueryBuilder<StreamQuery>() {
@Override
public void apply(StreamQuery query) {
query.and(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
.and(lte(ProfileThreadSnapshotRecord.DUMP_TIME, end))
.and(gte(ProfileThreadSnapshotRecord.DUMP_TIME, start));
query.and(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId));
}
});

Expand Down Expand Up @@ -176,4 +176,4 @@ public void apply(StreamQuery query) {
enum AggType {
MIN, MAX
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public List<SpanAttachedEventRecord> querySpanAttachedEvents(SpanAttachedEventTr
protected void apply(StreamQuery query) {
query.and(in(SpanAttachedEventRecord.RELATED_TRACE_ID, traceIds));
query.and(eq(SpanAttachedEventRecord.TRACE_REF_TYPE, type.value()));
query.setOrderBy(new StreamQuery.OrderBy(SpanAttachedEventRecord.START_TIME_SECOND, AbstractQuery.Sort.ASC));
query.setOrderBy(new StreamQuery.OrderBy(AbstractQuery.Sort.ASC));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void apply(StreamQuery query) {

switch (queryOrder) {
case BY_START_TIME:
query.setOrderBy(new StreamQuery.OrderBy(SegmentRecord.START_TIME, AbstractQuery.Sort.DESC));
query.setOrderBy(new StreamQuery.OrderBy(AbstractQuery.Sort.DESC));
break;
case BY_DURATION:
query.setOrderBy(new StreamQuery.OrderBy(SegmentRecord.LATENCY, AbstractQuery.Sort.DESC));
Expand Down
2 changes: 1 addition & 1 deletion test/e2e-v2/script/env
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
SW_KUBERNETES_COMMIT_SHA=1335f15bf821a40a7cd71448fa805f0be265afcc
SW_ROVER_COMMIT=6bbd39aa701984482330d9dfb4dbaaff0527d55c
SW_BANYANDB_COMMIT=6f938ccf812e2183b4bd891fb90b2124aa65c170
SW_BANYANDB_COMMIT=d48a810f8cca8b66d7b3b179f36090d78f46e12c
SW_AGENT_PHP_COMMIT=3192c553002707d344bd6774cfab5bc61f67a1d3

SW_CTL_COMMIT=d5f3597733aa5217373986d776a3ee5ee8b3c468

0 comments on commit 9644f3f

Please sign in to comment.