Skip to content

Commit

Permalink
[Baseline] Apply Baseline plugin to iceberg-hive (apache#233)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored and rdblue committed Jun 28, 2019
1 parent 0047bb6 commit d6d9e4d
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 187 deletions.
2 changes: 2 additions & 0 deletions .baseline/checkstyle/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@

<!-- Generated code should not be subjected to checkstyle. -->
<suppress files="[/\\].*[/\\]generated.*[/\\]" checks="." />

<suppress files="org.apache.iceberg.hive.ScriptRunner.java" checks="RegexpHeader"/>
</suppressions>
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ task deploySite(type: Exec) {
// We enable baseline-idea everywhere so that everyone can use IntelliJ to build code against the
// Baseline style guide.
def baselineProjects = [ project("iceberg-api"), project("iceberg-common"), project("iceberg-core"),
project("iceberg-data"), project("iceberg-orc"), project("iceberg-spark") ]
project("iceberg-data"), project("iceberg-orc"), project("iceberg-spark"),
project("iceberg-hive") ]


configure(subprojects - baselineProjects) {
Expand Down
60 changes: 30 additions & 30 deletions hive/src/main/java/org/apache/iceberg/hive/ClientPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
package org.apache.iceberg.hive;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.Deque;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class ClientPool<C, E extends Exception> implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(ClientPool.class);
Expand Down Expand Up @@ -80,6 +80,34 @@ public <R> R run(Action<R, C, E> action) throws E, InterruptedException {

protected abstract void close(C client);

@Override
public void close() {
this.closed = true;
try {
while (currentSize > 0) {
if (!clients.isEmpty()) {
synchronized (this) {
if (!clients.isEmpty()) {
C client = clients.removeFirst();
close(client);
currentSize -= 1;
}
}
}
if (clients.isEmpty() && currentSize > 0) {
// wake every second in case this missed the signal
synchronized (signal) {
signal.wait(1000);
}
}
}

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while shutting down pool. Some clients may not be closed.", e);
}
}

private C get() throws InterruptedException {
Preconditions.checkState(!closed, "Cannot get a client from a closed pool");
while (true) {
Expand Down Expand Up @@ -108,32 +136,4 @@ private void release(C client) {
signal.notify();
}
}

@Override
public void close() {
this.closed = true;
try {
while (currentSize > 0) {
if (!clients.isEmpty()) {
synchronized (this) {
if (!clients.isEmpty()) {
C client = clients.removeFirst();
close(client);
currentSize -= 1;
}
}
}
if (clients.isEmpty() && currentSize > 0) {
// wake every second in case this missed the signal
synchronized (signal) {
signal.wait(1000);
}
}
}

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while shutting down pool. Some clients may not be closed.");
}
}
}
81 changes: 44 additions & 37 deletions hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.hive;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -47,8 +51,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.lang.String.format;

/**
* TODO we should be able to extract some more commonalities to BaseMetastoreTableOperations to
* avoid code duplication between this class and Metacat Tables.
Expand All @@ -75,21 +77,23 @@ public TableMetadata refresh() {
String tableType = table.getParameters().get(TABLE_TYPE_PROP);

if (tableType == null || !tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE)) {
throw new IllegalArgumentException(format("Invalid tableName, not Iceberg: %s.%s", database, table));
throw new IllegalArgumentException(String.format("Invalid tableName, not Iceberg: %s.%s", database, table));
}

metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP);
if (metadataLocation == null) {
throw new IllegalArgumentException(format("%s.%s is missing %s property", database, tableName, METADATA_LOCATION_PROP));
String errMsg = String.format("%s.%s is missing %s property", database, tableName, METADATA_LOCATION_PROP);
throw new IllegalArgumentException(errMsg);
}

} catch (NoSuchObjectException e) {
if (currentMetadataLocation() != null) {
throw new NoSuchTableException(format("No such table: %s.%s", database, tableName));
throw new NoSuchTableException(String.format("No such table: %s.%s", database, tableName));
}

} catch (TException e) {
throw new RuntimeException(format("Failed to get table info from metastore %s.%s", database, tableName), e);
String errMsg = String.format("Failed to get table info from metastore %s.%s", database, tableName);
throw new RuntimeException(errMsg, e);

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -127,24 +131,25 @@ public void commit(TableMetadata base, TableMetadata metadata) {
} else {
final long currentTimeMillis = System.currentTimeMillis();
tbl = new Table(tableName,
database,
System.getProperty("user.name"),
(int) currentTimeMillis / 1000,
(int) currentTimeMillis / 1000,
Integer.MAX_VALUE,
storageDescriptor(metadata),
Collections.emptyList(),
new HashMap<>(),
null,
null,
ICEBERG_TABLE_TYPE_VALUE);
database,
System.getProperty("user.name"),
(int) currentTimeMillis / 1000,
(int) currentTimeMillis / 1000,
Integer.MAX_VALUE,
storageDescriptor(metadata),
Collections.emptyList(),
new HashMap<>(),
null,
null,
ICEBERG_TABLE_TYPE_VALUE);
}

tbl.setSd(storageDescriptor(metadata)); // set to pickup any schema changes
final String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP);
if (!Objects.equals(currentMetadataLocation(), metadataLocation)) {
throw new CommitFailedException(format("metadataLocation = %s is not same as table metadataLocation %s for %s.%s",
currentMetadataLocation(), metadataLocation, database, tableName));
String errMsg = String.format("metadataLocation = %s is not same as table metadataLocation %s for %s.%s",
currentMetadataLocation(), metadataLocation, database, tableName);
throw new CommitFailedException(errMsg);
}

setParameters(newMetadataLocation, tbl);
Expand All @@ -162,7 +167,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {
}
threw = false;
} catch (TException | UnknownHostException e) {
throw new RuntimeException(format("Metastore operation failed for %s.%s", database, tableName), e);
throw new RuntimeException(String.format("Metastore operation failed for %s.%s", database, tableName), e);

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -209,16 +214,18 @@ private StorageDescriptor storageDescriptor(TableMetadata metadata) {
return storageDescriptor;
}

private final List<FieldSchema> columns(Schema schema) {
return schema.columns().stream().map(col -> new FieldSchema(col.name(), HiveTypeConverter.convert(col.type()), "")).collect(Collectors.toList());
private List<FieldSchema> columns(Schema schema) {
return schema.columns().stream()
.map(col -> new FieldSchema(col.name(), HiveTypeConverter.convert(col.type()), ""))
.collect(Collectors.toList());
}

private long acquireLock() throws UnknownHostException, TException, InterruptedException {
final LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
lockComponent.setTablename(tableName);
final LockRequest lockRequest = new LockRequest(Lists.newArrayList(lockComponent),
System.getProperty("user.name"),
InetAddress.getLocalHost().getHostName());
System.getProperty("user.name"),
InetAddress.getLocalHost().getHostName());
LockResponse lockResponse = metaClients.run(client -> client.lock(lockRequest));
LockState state = lockResponse.getState();
long lockId = lockResponse.getLockid();
Expand All @@ -229,8 +236,8 @@ private long acquireLock() throws UnknownHostException, TException, InterruptedE
}

if (!state.equals(LockState.ACQUIRED)) {
throw new CommitFailedException(format("Could not acquire the lock on %s.%s, " +
"lock request ended in state %s", database, tableName, state));
throw new CommitFailedException(String.format("Could not acquire the lock on %s.%s, " +
"lock request ended in state %s", database, tableName, state));
}
return lockId;
}
Expand All @@ -243,7 +250,7 @@ private void unlock(Optional<Long> lockId) {
return null;
});
} catch (Exception e) {
throw new RuntimeException(format("Failed to unlock %s.%s", database, tableName) , e);
throw new RuntimeException(String.format("Failed to unlock %s.%s", database, tableName), e);
}
}
}
Expand Down
30 changes: 17 additions & 13 deletions hive/src/main/java/org/apache/iceberg/hive/HiveTables.java
Original file line number Diff line number Diff line change
@@ -1,30 +1,34 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.hive;

import com.google.common.base.Splitter;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseMetastoreTables;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import java.io.Closeable;
import java.util.List;
import java.util.Map;

public class HiveTables extends BaseMetastoreTables implements Closeable {
private static final Splitter DOT = Splitter.on('.').limit(2);
Expand Down
43 changes: 23 additions & 20 deletions hive/src/main/java/org/apache/iceberg/hive/HiveTypeConverter.java
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.hive;

import java.util.stream.Collectors;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import static java.lang.String.format;


public final class HiveTypeConverter {

Expand Down Expand Up @@ -55,21 +57,22 @@ public static String convert(Type type) {
return "binary";
case DECIMAL:
final Types.DecimalType decimalType = (Types.DecimalType) type;
return format("decimal(%s,%s)", decimalType.precision(), decimalType.scale()); //TODO may be just decimal?
// TODO may be just decimal?
return String.format("decimal(%s,%s)", decimalType.precision(), decimalType.scale());
case STRUCT:
final Types.StructType structType = type.asStructType();
final String nameToType = structType.fields().stream().map(
f -> format("%s:%s", f.name(), convert(f.type()))
).collect(Collectors.joining(","));
return format("struct<%s>", nameToType);
final String nameToType = structType.fields().stream()
.map(f -> String.format("%s:%s", f.name(), convert(f.type())))
.collect(Collectors.joining(","));
return String.format("struct<%s>", nameToType);
case LIST:
final Types.ListType listType = type.asListType();
return format("array<%s>", convert(listType.elementType()));
return String.format("array<%s>", convert(listType.elementType()));
case MAP:
final Types.MapType mapType = type.asMapType();
return format("map<%s,%s>", convert(mapType.keyType()), convert(mapType.valueType()));
return String.format("map<%s,%s>", convert(mapType.keyType()), convert(mapType.valueType()));
default:
throw new UnsupportedOperationException(type +" is not supported");
throw new UnsupportedOperationException(type + " is not supported");
}
}
}
Loading

0 comments on commit d6d9e4d

Please sign in to comment.