Skip to content

Commit

Permalink
Change type of publish_time to timestamp (apache#4757)
Browse files Browse the repository at this point in the history
Fixes apache#4734

### Motivation

"publish_time" is Pulsar SQL internal column, as Pulsar only stores timestamps, it doesn’t store the timezone information. Use timestamp as "publish_time" type is more correct way in Pulsar SQL.

### Modifications

Change type of publish_time to timestamp.

### Verifying this change

predicate of publish_time is pushdown

Use `__publish_time__` to trim messages:
```
SELECT COUNT(*)
FROM "sql-test-1" 
WHERE "__publish_time__" >= TIMESTAMP '2019-07-18 17:26:50.119' 
AND  "__publish_time__" < TIMESTAMP '2019-07-18 17:26:51.119';
```
![image](https://user-images.githubusercontent.com/12592133/61447301-43835080-a983-11e9-814b-bc2b378f02b9.png)

Without `__publish_time__` predicate:
```
SELECT COUNT(*)
FROM "sql-test-1";
```
![image](https://user-images.githubusercontent.com/12592133/61447427-82190b00-a983-11e9-8d3f-3bf2a4798047.png)
  • Loading branch information
codelipenghui authored and sijie committed Jul 19, 2019
1 parent f5b20cd commit 6f5416e
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ public Object getData(RawMessage message) {
.TIMESTAMP, "Application defined timestamp in milliseconds of when the event occurred");

public static final PulsarInternalColumn PUBLISH_TIME = new PublishTimeColumn("__publish_time__",
TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE,
"The timestamp in milliseconds of when event as published");
TimestampType.TIMESTAMP, "The timestamp in milliseconds of when event as published");

public static final PulsarInternalColumn MESSAGE_ID = new MessageIdColumn("__message_id__", VarcharType.VARCHAR,
"The message ID of the message used to generate this row");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.bookkeeper.conf.ClientConfiguration;

import javax.inject.Inject;
import java.sql.Timestamp;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
Expand Down Expand Up @@ -321,29 +322,34 @@ public static PredicatePushdownInfo getPredicatePushdownInfo(String connectorId,
Range range = domain.getValues().getRanges().getOrderedRanges().get(0);

if (!range.getHigh().isUpperUnbounded()) {
upperBoundTs = new SqlTimestampWithTimeZone(range.getHigh().getValueBlock().get()
.getLong(0, 0)).getMillisUtc();
upperBoundTs = new Timestamp(range.getHigh().getValueBlock().get()
.getLong(0, 0)).getTime();
}

if (!range.getLow().isLowerUnbounded()) {
lowerBoundTs = new SqlTimestampWithTimeZone(range.getLow().getValueBlock().get()
.getLong(0, 0)).getMillisUtc();
lowerBoundTs = new Timestamp(range.getLow().getValueBlock().get()
.getLong(0, 0)).getTime();
}

PositionImpl overallStartPos;
if (lowerBoundTs == null) {
overallStartPos = (PositionImpl) readOnlyCursor.getReadPosition();
} else {
overallStartPos = findPosition(readOnlyCursor, lowerBoundTs);
if (overallStartPos == null) {
overallStartPos = (PositionImpl) readOnlyCursor.getReadPosition();
}
}

PositionImpl overallEndPos;
if (upperBoundTs == null) {

readOnlyCursor.skipEntries(Math.toIntExact(totalNumEntries));
overallEndPos = (PositionImpl) readOnlyCursor.getReadPosition();
} else {
overallEndPos = findPosition(readOnlyCursor, upperBoundTs);
if (overallEndPos == null) {
overallEndPos = overallStartPos;
}
}

// Just use a close bound since presto can always filter out the extra entries even if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.facebook.presto.spi.predicate.Range;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.predicate.ValueSet;
import com.facebook.presto.spi.type.TimeZoneKey;
import io.airlift.log.Logger;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.naming.TopicName;
Expand All @@ -40,8 +39,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import static com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
import static com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -185,9 +183,8 @@ public void testPublishTimePredicatePushdown() throws Exception {


Map<ColumnHandle, Domain> domainMap = new HashMap<>();
Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP_WITH_TIME_ZONE, packDateTimeWithZone
(currentTimeMs + 1L, TimeZoneKey.UTC_KEY), true, packDateTimeWithZone(currentTimeMs + 50L,
TimeZoneKey.UTC_KEY), true)), false);
Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP, currentTimeMs + 1L, true,
currentTimeMs + 50L, true)), false);
domainMap.put(PulsarInternalColumn.PUBLISH_TIME.getColumnHandle(pulsarConnectorId.toString(), false), domain);
TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);

Expand Down Expand Up @@ -243,9 +240,8 @@ public void testPublishTimePredicatePushdownPartitionedTopic() throws Exception


Map<ColumnHandle, Domain> domainMap = new HashMap<>();
Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP_WITH_TIME_ZONE, packDateTimeWithZone
(currentTimeMs + 1L, TimeZoneKey.UTC_KEY), true, packDateTimeWithZone(currentTimeMs + 50L,
TimeZoneKey.UTC_KEY), true)), false);
Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP, currentTimeMs + 1L, true,
currentTimeMs + 50L, true)), false);
domainMap.put(PulsarInternalColumn.PUBLISH_TIME.getColumnHandle(pulsarConnectorId.toString(), false), domain);
TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);

Expand Down

0 comments on commit 6f5416e

Please sign in to comment.