Skip to content

Commit

Permalink
Merge pull request apache#10964: Make logger naming consistent with A…
Browse files Browse the repository at this point in the history
…pache Beam LOG standard
  • Loading branch information
iemejia authored Feb 26, 2020
2 parents 4e35fb6 + 67ad5ce commit d577e41
Show file tree
Hide file tree
Showing 20 changed files with 83 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@

public class Log {

private static final Logger LOGGER = LoggerFactory.getLogger(Log.class);
private static final Logger LOG = LoggerFactory.getLogger(Log.class);

private Log() {
}
private Log() {}

public static <T> PTransform<PCollection<T>, PCollection<T>> ofElements() {
return new LoggingTransform<>();
Expand All @@ -56,26 +55,25 @@ private LoggingTransform(String prefix) {

@Override
public PCollection<T> expand(PCollection<T> input) {
return input.apply(ParDo.of(new DoFn<T, T>() {

@ProcessElement
public void processElement(@Element T element, OutputReceiver<T> out,
BoundedWindow window) {
return input.apply(
ParDo.of(
new DoFn<T, T>() {

String message = prefix + element.toString();
@ProcessElement
public void processElement(
@Element T element, OutputReceiver<T> out, BoundedWindow window) {

if (!(window instanceof GlobalWindow)) {
message = message + " Window:" + window.toString();
}
String message = prefix + element.toString();

LOGGER.info(message);
if (!(window instanceof GlobalWindow)) {
message = message + " Window:" + window.toString();
}

out.output(element);
}
LOG.info(message);

}));
out.output(element);
}
}));
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
*/
public class BeamFnControlService extends BeamFnControlGrpc.BeamFnControlImplBase
implements BeamFnService, Supplier<FnApiControlClient> {
private static final Logger LOGGER = LoggerFactory.getLogger(BeamFnControlService.class);
private static final Logger LOG = LoggerFactory.getLogger(BeamFnControlService.class);
private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
private final Function<
StreamObserver<BeamFnApi.InstructionRequest>,
Expand All @@ -57,7 +57,7 @@ public BeamFnControlService(
this.newClients = new SynchronousQueue<>(true /* fair */);
this.streamObserverFactory = streamObserverFactory;
this.apiServiceDescriptor = serviceDescriptor;
LOGGER.info("Launched Beam Fn Control service {}", this.apiServiceDescriptor);
LOG.info("Launched Beam Fn Control service {}", this.apiServiceDescriptor);
}

@Override
Expand All @@ -68,7 +68,7 @@ public Endpoints.ApiServiceDescriptor getApiServiceDescriptor() {
@Override
public StreamObserver<BeamFnApi.InstructionResponse> control(
StreamObserver<BeamFnApi.InstructionRequest> outboundObserver) {
LOGGER.info("Beam Fn Control client connected with id {}", headerAccessor.getSdkWorkerId());
LOG.info("Beam Fn Control client connected with id {}", headerAccessor.getSdkWorkerId());
FnApiControlClient newClient =
FnApiControlClient.forRequestObserver(
headerAccessor.getSdkWorkerId(), streamObserverFactory.apply(outboundObserver));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
*/
public class BeamFnLoggingService extends BeamFnLoggingGrpc.BeamFnLoggingImplBase
implements BeamFnService {
private static final Logger LOGGER = LoggerFactory.getLogger(BeamFnLoggingService.class);
private static final Logger LOG = LoggerFactory.getLogger(BeamFnLoggingService.class);
private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
private final Consumer<BeamFnApi.LogEntry> clientLogger;
private final Function<StreamObserver<BeamFnApi.LogControl>, StreamObserver<BeamFnApi.LogControl>>
Expand All @@ -64,7 +64,7 @@ public BeamFnLoggingService(
this.headerAccessor = headerAccessor;
this.connectedClients = new ConcurrentHashMap<>();
this.apiServiceDescriptor = apiServiceDescriptor;
LOGGER.info("Launched Beam Fn Logging service {}", this.apiServiceDescriptor);
LOG.info("Launched Beam Fn Logging service {}", this.apiServiceDescriptor);
}

@Override
Expand All @@ -76,7 +76,7 @@ public Endpoints.ApiServiceDescriptor getApiServiceDescriptor() {
public void close() throws Exception {
Set<InboundObserver> remainingClients = ImmutableSet.copyOf(connectedClients.keySet());
if (!remainingClients.isEmpty()) {
LOGGER.info(
LOG.info(
"{} Beam Fn Logging clients still connected during shutdown.", remainingClients.size());

// Signal server shutting down to all remaining connected clients.
Expand All @@ -92,7 +92,7 @@ public void close() throws Exception {
@Override
public StreamObserver<BeamFnApi.LogEntry.List> logging(
StreamObserver<BeamFnApi.LogControl> outboundObserver) {
LOGGER.info("Beam Fn Logging client connected for client {}", headerAccessor.getSdkWorkerId());
LOG.info("Beam Fn Logging client connected for client {}", headerAccessor.getSdkWorkerId());
InboundObserver inboundObserver = new InboundObserver(headerAccessor.getSdkWorkerId());
connectedClients.put(inboundObserver, streamObserverFactory.apply(outboundObserver));
return inboundObserver;
Expand All @@ -104,7 +104,7 @@ private void completeIfNotNull(StreamObserver<BeamFnApi.LogControl> outboundObse
outboundObserver.onCompleted();
} catch (RuntimeException ignored) {
// Completing outbound observer failed, ignoring failure and continuing
LOGGER.warn("Beam Fn Logging client failed to be complete.", ignored);
LOG.warn("Beam Fn Logging client failed to be complete.", ignored);
}
}
}
Expand Down Expand Up @@ -133,7 +133,7 @@ public void onNext(BeamFnApi.LogEntry.List value) {

@Override
public void onError(Throwable t) {
LOGGER.warn("Logging client failed unexpectedly. ClientId: {}", t, sdkWorkerId);
LOG.warn("Logging client failed unexpectedly. ClientId: {}", t, sdkWorkerId);
// We remove these from the connected clients map to prevent a race between
// the close method and this InboundObserver calling a terminal method on the
// StreamObserver. If we removed it, then we are responsible for the terminal call.
Expand All @@ -142,7 +142,7 @@ public void onError(Throwable t) {

@Override
public void onCompleted() {
LOGGER.info("Logging client hanged up. ClientId: {}", sdkWorkerId);
LOG.info("Logging client hanged up. ClientId: {}", sdkWorkerId);
// We remove these from the connected clients map to prevent a race between
// the close method and this InboundObserver calling a terminal method on the
// StreamObserver. If we removed it, then we are responsible for the terminal call.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
/** A Fn API control service which adds incoming SDK harness connections to a sink. */
public class FnApiControlClientPoolService extends BeamFnControlGrpc.BeamFnControlImplBase
implements FnService {
private static final Logger LOGGER = LoggerFactory.getLogger(FnApiControlClientPoolService.class);
private static final Logger LOG = LoggerFactory.getLogger(FnApiControlClientPoolService.class);

private final Object lock = new Object();
private final ControlClientPool.Sink clientSink;
Expand Down Expand Up @@ -78,10 +78,10 @@ public StreamObserver<BeamFnApi.InstructionResponse> control(
final String workerId = headerAccessor.getSdkWorkerId();
if (Strings.isNullOrEmpty(workerId)) {
// TODO(BEAM-4149): Enforce proper worker id.
LOGGER.warn("No worker_id header provided in control request");
LOG.warn("No worker_id header provided in control request");
}

LOGGER.info("Beam Fn Control client connected with id {}", workerId);
LOG.info("Beam Fn Control client connected with id {}", workerId);
FnApiControlClient newClient = FnApiControlClient.forRequestObserver(workerId, requestObserver);
try {
// Add the client to the pool of vended clients before making it available - we should close
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
/** An implementation of the Beam Fn Logging Service over gRPC. */
public class GrpcLoggingService extends BeamFnLoggingGrpc.BeamFnLoggingImplBase
implements FnService {
private static final Logger LOGGER = LoggerFactory.getLogger(GrpcLoggingService.class);
private static final Logger LOG = LoggerFactory.getLogger(GrpcLoggingService.class);

public static GrpcLoggingService forWriter(LogWriter writer) {
return new GrpcLoggingService(writer);
Expand All @@ -50,7 +50,7 @@ private GrpcLoggingService(LogWriter logWriter) {
public void close() throws Exception {
Set<InboundObserver> remainingClients = ImmutableSet.copyOf(connectedClients.keySet());
if (!remainingClients.isEmpty()) {
LOGGER.info(
LOG.info(
"{} Beam Fn Logging clients still connected during shutdown.", remainingClients.size());

// Signal server shutting down to all remaining connected clients.
Expand All @@ -66,7 +66,7 @@ public void close() throws Exception {
@Override
public StreamObserver<BeamFnApi.LogEntry.List> logging(
StreamObserver<BeamFnApi.LogControl> outboundObserver) {
LOGGER.info("Beam Fn Logging client connected.");
LOG.info("Beam Fn Logging client connected.");
InboundObserver inboundObserver = new InboundObserver();
connectedClients.put(inboundObserver, outboundObserver);
return inboundObserver;
Expand All @@ -78,7 +78,7 @@ private void completeIfNotNull(StreamObserver<BeamFnApi.LogControl> outboundObse
outboundObserver.onCompleted();
} catch (RuntimeException ignored) {
// Completing outbound observer failed, ignoring failure and continuing
LOGGER.warn("Beam Fn Logging client failed to be complete.", ignored);
LOG.warn("Beam Fn Logging client failed to be complete.", ignored);
}
}
}
Expand All @@ -98,7 +98,7 @@ public void onNext(BeamFnApi.LogEntry.List value) {

@Override
public void onError(Throwable t) {
LOGGER.warn("Logging client failed unexpectedly.", t);
LOG.warn("Logging client failed unexpectedly.", t);
// We remove these from the connected clients map to prevent a race between
// the close method and this InboundObserver calling a terminal method on the
// StreamObserver. If we removed it, then we are responsible for the terminal call.
Expand All @@ -107,7 +107,7 @@ public void onError(Throwable t) {

@Override
public void onCompleted() {
LOGGER.info("Logging client hanged up.");
LOG.info("Logging client hanged up.");
// We remove these from the connected clients map to prevent a race between
// the close method and this InboundObserver calling a terminal method on the
// StreamObserver. If we removed it, then we are responsible for the terminal call.
Expand Down
8 changes: 8 additions & 0 deletions sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ page at http://checkstyle.sourceforge.net/config.html -->
<property name="message" value="You should not use org.spark_project classes in Beam."/>
</module>

<!-- Forbid calling Loggers different than LOG. -->
<module name="RegexpSinglelineJava">
<property name="id" value="LoggerNamingStyle"/>
<property name="format" value="(\sprivate static final Logger )(?!LOG )"/>
<property name="severity" value="error"/>
<property name="message" value="You should name sfl4j Loggers as LOG."/>
</module>

<!-- Forbid TestNG imports that may leak because of dependencies. -->
<module name="RegexpSinglelineJava">
<property name="id" value="ForbidTestNG"/>
Expand Down
4 changes: 2 additions & 2 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
* allows us to split the sub-source over and over yet still receive "source" objects as inputs.
*/
static class BoundedSourceAsSDFWrapperFn<T> extends DoFn<BoundedSource<T>, T> {
private static final Logger LOGGER = LoggerFactory.getLogger(BoundedSourceAsSDFWrapperFn.class);
private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceAsSDFWrapperFn.class);
private static final long DEFAULT_DESIRED_BUNDLE_SIZE_BYTES = 64 * (1 << 20);

@GetInitialRestriction
Expand Down Expand Up @@ -338,7 +338,7 @@ protected void finalize() throws Throwable {
try {
currentReader.close();
} catch (IOException e) {
LOGGER.error("Failed to close BoundedReader due to failure processing bundle.", e);
LOG.error("Failed to close BoundedReader due to failure processing bundle.", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class BigQueryTable extends SchemaBaseBeamTable implements Serializable {
@VisibleForTesting final String bqLocation;
private final ConversionOptions conversionOptions;
private BeamTableStatistics rowCountStatistics = null;
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTable.class);
private static final Logger LOG = LoggerFactory.getLogger(BigQueryTable.class);
@VisibleForTesting final Method method;

BigQueryTable(Table table, BigQueryUtils.ConversionOptions options) {
Expand Down Expand Up @@ -98,7 +98,7 @@ class BigQueryTable extends SchemaBaseBeamTable implements Serializable {
method = Method.DEFAULT;
}

LOGGER.info("BigQuery method is set to: " + method.toString());
LOG.info("BigQuery method is set to: " + method.toString());
}

@Override
Expand All @@ -125,7 +125,7 @@ public PCollection<Row> buildIOReader(PBegin begin) {
public PCollection<Row> buildIOReader(
PBegin begin, BeamSqlTableFilter filters, List<String> fieldNames) {
if (!method.equals(Method.DIRECT_READ)) {
LOGGER.info("Predicate/project push-down only available for `DIRECT_READ` method, skipping.");
LOG.info("Predicate/project push-down only available for `DIRECT_READ` method, skipping.");
return buildIOReader(begin);
}

Expand All @@ -140,7 +140,7 @@ public PCollection<Row> buildIOReader(
if (!bigQueryFilter.getSupported().isEmpty()) {
String rowRestriction = generateRowRestrictions(getSchema(), bigQueryFilter.getSupported());
if (!rowRestriction.isEmpty()) {
LOGGER.info("Pushing down the following filter: " + rowRestriction);
LOG.info("Pushing down the following filter: " + rowRestriction);
typedRead = typedRead.withRowRestriction(rowRestriction);
}
}
Expand Down Expand Up @@ -225,7 +225,7 @@ private static BeamTableStatistics getRowCountFromBQ(PipelineOptions o, String b
return BeamTableStatistics.createBoundedTableStatistics(rowCount.doubleValue());

} catch (IOException | InterruptedException e) {
LOGGER.warn("Could not get the row count for the table " + bqLocation, e);
LOG.warn("Could not get the row count for the table " + bqLocation, e);
}

return BeamTableStatistics.BOUNDED_UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
class DataStoreV1Table extends SchemaBaseBeamTable implements Serializable {
public static final String KEY_FIELD_PROPERTY = "keyField";
@VisibleForTesting static final String DEFAULT_KEY_FIELD = "__key__";
private static final Logger LOGGER = LoggerFactory.getLogger(DataStoreV1Table.class);
private static final Logger LOG = LoggerFactory.getLogger(DataStoreV1Table.class);
// Should match: `projectId/kind`.
private static final Pattern locationPattern = Pattern.compile("(?<projectId>.+)/(?<kind>.+)");
@VisibleForTesting final String keyField;
Expand Down Expand Up @@ -166,7 +166,7 @@ private EntityToRow(Schema schema, String keyField) {
+ "` should of type `VARBINARY`. Please change the type or specify a field to"
+ " store the KEY value.");
}
LOGGER.info("Entity KEY will be stored under `" + keyField + "` field.");
LOG.info("Entity KEY will be stored under `" + keyField + "` field.");
}
}

Expand Down Expand Up @@ -300,7 +300,7 @@ public PCollection<Entity> expand(PCollection<Row> input) {
+ "` should of type `VARBINARY`. Please change the type or specify a field to"
+ " write the KEY value from via TableProperties.");
}
LOGGER.info("Field to use as Entity KEY is set to: `" + keyField + "`.");
LOG.info("Field to use as Entity KEY is set to: `" + keyField + "`.");
}
return input.apply(ParDo.of(new RowToEntityConverter(isFieldPresent)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public abstract class BeamKafkaTable extends SchemaBaseBeamTable {
private List<TopicPartition> topicPartitions;
private Map<String, Object> configUpdates;
private BeamTableStatistics rowCountStatistics = null;
private static final Logger LOGGER = LoggerFactory.getLogger(BeamKafkaTable.class);
private static final Logger LOG = LoggerFactory.getLogger(BeamKafkaTable.class);
// This is the number of records looked from each partition when the rate is estimated
protected int numberOfRecordsForRate = 50;

Expand Down Expand Up @@ -163,7 +163,7 @@ public BeamTableStatistics getTableStatistics(PipelineOptions options) {
BeamTableStatistics.createUnboundedTableStatistics(
this.computeRate(numberOfRecordsForRate));
} catch (Exception e) {
LOGGER.warn("Could not get the row count for the topics " + getTopics(), e);
LOG.warn("Could not get the row count for the topics " + getTopics(), e);
rowCountStatistics = BeamTableStatistics.UNBOUNDED_UNKNOWN;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@

@Experimental
public class MongoDbTable extends SchemaBaseBeamTable implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbTable.class);
private static final Logger LOG = LoggerFactory.getLogger(MongoDbTable.class);
// Should match: mongodb://username:password@localhost:27017/database/collection
@VisibleForTesting
final Pattern locationPattern =
Expand Down Expand Up @@ -134,7 +134,7 @@ public PCollection<Row> buildIOReader(
MongoDbFilter mongoFilter = (MongoDbFilter) filters;
if (!mongoFilter.getSupported().isEmpty()) {
Bson filter = constructPredicate(mongoFilter.getSupported());
LOGGER.info("Pushing down the following filter: " + filter.toString());
LOG.info("Pushing down the following filter: " + filter.toString());
findQuery = findQuery.withFilters(filter);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class TextTable extends SchemaBaseBeamTable {
new TextRowCountEstimator.LimitNumberOfTotalBytes(1024 * 1024L);
private final String filePattern;
private BeamTableStatistics rowCountStatistics = null;
private static final Logger LOGGER = LoggerFactory.getLogger(TextTable.class);
private static final Logger LOG = LoggerFactory.getLogger(TextTable.class);

/** Text table with the specified read and write transforms. */
public TextTable(
Expand Down Expand Up @@ -90,7 +90,7 @@ private static BeamTableStatistics getTextRowEstimate(
Double rows = textRowCountEstimator.estimateRowCount(options);
return BeamTableStatistics.createBoundedTableStatistics(rows);
} catch (IOException | TextRowCountEstimator.NoEstimationException e) {
LOGGER.warn("Could not get the row count for the text table " + filePattern, e);
LOG.warn("Could not get the row count for the text table " + filePattern, e);
}
return BeamTableStatistics.BOUNDED_UNKNOWN;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@

/** ZetaSQLPlannerImpl. */
public class ZetaSQLPlannerImpl implements Planner {
private static final Logger logger = Logger.getLogger(ZetaSQLPlannerImpl.class.getName());
private static final Logger LOG = Logger.getLogger(ZetaSQLPlannerImpl.class.getName());

private final SchemaPlus defaultSchemaPlus;

Expand Down
Loading

0 comments on commit d577e41

Please sign in to comment.