Skip to content

Commit

Permalink
Enforce checkstyle in the pulsar sql module (apache#4882)
Browse files Browse the repository at this point in the history
### Modifications

The checksyle plugin was added to the pulsar sql module to enforce the defined style. All violations were fixed:

- Ordering of imports.
- Formatting of the code.
- Absent Javadoc comments.
- Other small issues.
  • Loading branch information
vzhikserg authored and sijie committed Aug 5, 2019
1 parent ce681df commit f6fee1c
Show file tree
Hide file tree
Showing 29 changed files with 499 additions and 356 deletions.
22 changes: 22 additions & 0 deletions pulsar-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,26 @@
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<executions>
<execution>
<id>check-style</id>
<phase>verify</phase>
<configuration>
<configLocation>../buildtools/src/main/resources/pulsar/checkstyle.xml</configLocation>
<suppressionsLocation>../buildtools/src/main/resources/pulsar/suppressions.xml</suppressionsLocation>
<encoding>UTF-8</encoding>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@
package org.apache.pulsar.sql.presto;

import io.airlift.log.Logger;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;

import java.io.IOException;
import java.util.List;

/**
* Schema handler for payload in the Avro format.
*/
public class AvroSchemaHandler implements SchemaHandler {

private final DatumReader<GenericRecord> datumReader;
Expand Down Expand Up @@ -64,7 +65,7 @@ public Object deserialize(ByteBuf payload) {

BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(heapBuffer.array(), heapBuffer.arrayOffset(),
heapBuffer.readableBytes(), decoderFromCache);
if (decoderFromCache==null) {
if (decoderFromCache == null) {
decoders.set(decoder);
}
return this.datumReader.read(null, decoder);
Expand All @@ -87,7 +88,7 @@ public Object extractField(int index, Object currentRecord) {
return null;
}
if (positionIndices.length > 0) {
for (int i = 1 ; i < positionIndices.length; i++) {
for (int i = 1; i < positionIndices.length; i++) {
curr = ((GenericRecord) curr).get(positionIndices[i]);
if (curr == null) {
return null;
Expand All @@ -96,7 +97,7 @@ public Object extractField(int index, Object currentRecord) {
}
return curr;
} catch (Exception ex) {
log.debug(ex,"%s", ex);
log.debug(ex, "%s", ex);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@

import com.dslplatform.json.DslJson;
import com.facebook.presto.spi.type.Type;

import io.airlift.log.Logger;

import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;

import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;

/**
* Schema handler for payload in the JSON format.
*/
public class JSONSchemaHandler implements SchemaHandler {

private static final Logger log = Logger.get(JSONSchemaHandler.class);
Expand Down Expand Up @@ -84,7 +84,7 @@ public Object extractField(int index, Object currentRecord) {
if (field == null) {
return null;
}
for (int i = 1; i < fieldNames.length ; i++) {
for (int i = 1; i < fieldNames.length; i++) {
field = ((Map) field).get(fieldNames[i]);
if (field == null) {
return null;
Expand All @@ -101,7 +101,7 @@ public Object extractField(int index, Object currentRecord) {

return field;
} catch (Exception ex) {
log.debug(ex,"%s", ex);
log.debug(ex, "%s", ex);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,29 @@
*/
package org.apache.pulsar.sql.presto;

import static java.util.Objects.requireNonNull;

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.type.Type;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Arrays;

import static java.util.Objects.requireNonNull;

/**
* This class represents the basic information about a presto column.
*/
public class PulsarColumnHandle implements ColumnHandle {

private final String connectorId;

/**
* Column Name
* Column Name.
*/
private final String name;

/**
* Column type
* Column type.
*/
private final Type type;

Expand Down Expand Up @@ -116,17 +118,33 @@ ColumnMetadata getColumnMetadata() {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

PulsarColumnHandle that = (PulsarColumnHandle) o;

if (hidden != that.hidden) return false;
if (internal != that.internal) return false;
if (connectorId != null ? !connectorId.equals(that.connectorId) : that.connectorId != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
if (type != null ? !type.equals(that.type) : that.type != null) return false;
if (!Arrays.deepEquals(fieldNames, that.fieldNames)) return false;
if (hidden != that.hidden) {
return false;
}
if (internal != that.internal) {
return false;
}
if (connectorId != null ? !connectorId.equals(that.connectorId) : that.connectorId != null) {
return false;
}
if (name != null ? !name.equals(that.name) : that.name != null) {
return false;
}
if (type != null ? !type.equals(that.type) : that.type != null) {
return false;
}
if (!Arrays.deepEquals(fieldNames, that.fieldNames)) {
return false;
}
return Arrays.deepEquals(positionIndices, that.positionIndices);
}

Expand All @@ -144,14 +162,14 @@ public int hashCode() {

@Override
public String toString() {
return "PulsarColumnHandle{" +
"connectorId='" + connectorId + '\'' +
", name='" + name + '\'' +
", type=" + type +
", hidden=" + hidden +
", internal=" + internal +
", fieldNames=" + Arrays.toString(fieldNames) +
", positionIndices=" + Arrays.toString(positionIndices) +
'}';
return "PulsarColumnHandle{"
+ "connectorId='" + connectorId + '\''
+ ", name='" + name + '\''
+ ", type=" + type
+ ", hidden=" + hidden
+ ", internal=" + internal
+ ", fieldNames=" + Arrays.toString(fieldNames)
+ ", positionIndices=" + Arrays.toString(positionIndices)
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.type.Type;

import java.util.Arrays;
import java.util.List;

/**
* Description of the column metadata.
*/
public class PulsarColumnMetadata extends ColumnMetadata {

private boolean isInternal;
Expand Down Expand Up @@ -60,25 +61,37 @@ public Integer[] getPositionIndices() {

@Override
public String toString() {
return "PulsarColumnMetadata{" +
"isInternal=" + isInternal +
", nameWithCase='" + nameWithCase + '\'' +
", fieldNames=" + Arrays.toString(fieldNames) +
", positionIndices=" + Arrays.toString(positionIndices) +
'}';
return "PulsarColumnMetadata{"
+ "isInternal=" + isInternal
+ ", nameWithCase='" + nameWithCase + '\''
+ ", fieldNames=" + Arrays.toString(fieldNames)
+ ", positionIndices=" + Arrays.toString(positionIndices)
+ '}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}

PulsarColumnMetadata that = (PulsarColumnMetadata) o;

if (isInternal != that.isInternal) return false;
if (nameWithCase != null ? !nameWithCase.equals(that.nameWithCase) : that.nameWithCase != null) return false;
if (!Arrays.deepEquals(fieldNames, that.fieldNames)) return false;
if (isInternal != that.isInternal) {
return false;
}
if (nameWithCase != null ? !nameWithCase.equals(that.nameWithCase) : that.nameWithCase != null) {
return false;
}
if (!Arrays.deepEquals(fieldNames, that.fieldNames)) {
return false;
}
return Arrays.deepEquals(positionIndices, that.positionIndices);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.pulsar.sql.presto;

import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;

import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
Expand All @@ -26,13 +30,11 @@
import com.facebook.presto.spi.transaction.IsolationLevel;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.log.Logger;

import javax.inject.Inject;

import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;

/**
* This file contains implementation of the connector to the Presto engine.
*/
public class PulsarConnector implements Connector {

private static final Logger log = Logger.get(PulsarConnector.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@
*/
package org.apache.pulsar.sql.presto;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import java.io.IOException;
import java.util.Map;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
Expand All @@ -34,13 +39,9 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;

import java.io.IOException;
import java.util.Map;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;

/**
* Implementation of a cache for the Pulsar connector.
*/
public class PulsarConnectorCache {

private static final Logger log = Logger.get(PulsarConnectorCache.class);
Expand Down Expand Up @@ -68,7 +69,7 @@ private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws
// start stats provider
ClientConfiguration clientConfiguration = new ClientConfiguration();

pulsarConnectorConfig.getStatsProviderConfigs().forEach((key, value) -> clientConfiguration.setProperty(key, value));
pulsarConnectorConfig.getStatsProviderConfigs().forEach(clientConfiguration::setProperty);

this.statsProvider.start(clientConfiguration);

Expand All @@ -84,7 +85,8 @@ public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsa
return instance;
}

private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig)
throws Exception {
ClientConfiguration bkClientConfiguration = new ClientConfiguration()
.setZkServers(pulsarConnectorConfig.getZookeeperUri())
.setClientTcpNoDelay(false)
Expand Down Expand Up @@ -123,16 +125,17 @@ private LedgerOffloader initManagedLedgerOffloader(PulsarConnectorConfig conf) {
Map<String, String> offloaderProperties = conf.getOffloaderProperties();
offloaderProperties.put(OFFLOADERS_DIRECTOR, conf.getOffloadersDirectory());
offloaderProperties.put(MANAGED_LEDGER_OFFLOAD_DRIVER, conf.getManagedLedgerOffloadDriver());
offloaderProperties.put(MANAGED_LEDGER_OFFLOAD_MAX_THREADS, String.valueOf(conf.getManagedLedgerOffloadMaxThreads()));
offloaderProperties
.put(MANAGED_LEDGER_OFFLOAD_MAX_THREADS, String.valueOf(conf.getManagedLedgerOffloadMaxThreads()));

try {
return offloaderFactory.create(
PulsarConnectorUtils.getProperties(offloaderProperties),
ImmutableMap.of(
LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
),
getOffloaderScheduler(conf));
PulsarConnectorUtils.getProperties(offloaderProperties),
ImmutableMap.of(
LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
),
getOffloaderScheduler(conf));
} catch (IOException ioe) {
log.error("Failed to create offloader: ", ioe);
throw new RuntimeException(ioe.getMessage(), ioe.getCause());
Expand Down
Loading

0 comments on commit f6fee1c

Please sign in to comment.