Skip to content

Commit

Permalink
Clean up: Remove all unused imports (apache#8766)
Browse files Browse the repository at this point in the history
### Motivation
Clean up project

### Modifications
- Search All unused imports, and remove them.
- Fixed typo and chunked
  • Loading branch information
hezhangjian authored Dec 4, 2020
1 parent 75af6c2 commit 18fd15b
Show file tree
Hide file tree
Showing 33 changed files with 34 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.Range;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import javax.servlet.http.HttpServletResponse;

import com.google.common.collect.Maps;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.api.AuthData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.client.impl.auth;

import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.google.common.annotations.VisibleForTesting;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.time.Clock;
import java.util.Random;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
partitionFuture.complete(new PartitionedTopicMetadata(r.partitions));
} catch (Exception e) {
partitionFuture.completeExceptionally(new PulsarClientException.LookupException(
format("Failed to parse partition-response redirect=%s, topic=%s, partitions with %s",
format("Failed to parse partition-response redirect=%s, topic=%s, partitions with %s, error message %s",
r.redirect, topicName.toString(), r.partitions,
e.getMessage())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@

public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T> {

enum ConsumerType {
PARTITIONED, NON_PARTITIONED
}

protected final String subscription;
protected final ConsumerConfigurationData<T> conf;
protected final String consumerName;
Expand All @@ -71,7 +67,7 @@ enum ConsumerType {
protected final ConsumerEventListener consumerEventListener;
protected final ExecutorService listenerExecutor;
final BlockingQueue<Message<T>> incomingMessages;
protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> unAckedChunckedMessageIdSequenceMap;
protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap;
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
protected int maxReceiverQueueSize;
protected final Schema<T> schema;
Expand All @@ -97,7 +93,7 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
this.consumerEventListener = conf.getConsumerEventListener();
// Always use growable queue since items can exceed the advertised size
this.incomingMessages = new GrowableArrayBlockingQueue<>();
this.unAckedChunckedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();
this.unAckedChunkedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();

this.listenerExecutor = listenerExecutor;
this.pendingReceives = Queues.newConcurrentLinkedQueue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
protected volatile MessageId lastDequeuedMessageId = MessageId.earliest;
private volatile MessageId lastMessageIdInBroker = MessageId.earliest;

private long subscribeTimeout;
private final long subscribeTimeout;
private final int partitionIndex;
private final boolean hasParentConsumer;

Expand Down Expand Up @@ -177,15 +177,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
protected volatile boolean paused;

protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>();
private int pendingChunckedMessageCount = 0;
private int pendingChunkedMessageCount = 0;
protected long expireTimeOfIncompleteChunkedMessageMillis = 0;
private boolean expireChunkMessageTaskScheduled = false;
private int maxPendingChuckedMessage;
private final int maxPendingChunkedMessage;
// if queue size is reasonable (most of the time equal to number of producers try to publish messages concurrently on
// the topic) then it guards against broken chuncked message which was not fully published
private boolean autoAckOldestChunkedMessageOnQueueFull;
// it will be used to manage N outstanding chunked mesage buffers
private final BlockingQueue<String> pendingChunckedMessageUuidQueue;
// the topic) then it guards against broken chunked message which was not fully published
private final boolean autoAckOldestChunkedMessageOnQueueFull;
// it will be used to manage N outstanding chunked message buffers
private final BlockingQueue<String> pendingChunkedMessageUuidQueue;

private final boolean createTopicIfDoesNotExist;

Expand Down Expand Up @@ -254,8 +254,8 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
this.negativeAcksTracker = new NegativeAcksTracker(this, conf);
this.resetIncludeHead = conf.isResetIncludeHead();
this.createTopicIfDoesNotExist = createTopicIfDoesNotExist;
this.maxPendingChuckedMessage = conf.getMaxPendingChuckedMessage();
this.pendingChunckedMessageUuidQueue = new GrowableArrayBlockingQueue<>();
this.maxPendingChunkedMessage = conf.getMaxPendingChuckedMessage();
this.pendingChunkedMessageUuidQueue = new GrowableArrayBlockingQueue<>();
this.expireTimeOfIncompleteChunkedMessageMillis = conf.getExpireTimeOfIncompleteChunkedMessageMillis();
this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull();

Expand Down Expand Up @@ -568,9 +568,6 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack

if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
if (ackType == AckType.Cumulative && txn != null) {
return sendAcknowledge(messageId, ackType, properties, txn);
}
if (markAckForBatchMessage(batchMessageId, ackType, properties, txn)) {
// all messages in batch have been acked so broker can be acked via sendAcknowledge()
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -1246,11 +1243,11 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
int totalChunks = msgMetadata.getNumChunksFromMsg();
chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(),
(key) -> ChunkedMessageCtx.get(totalChunks, chunkedMsgBuffer));
pendingChunckedMessageCount++;
if (maxPendingChuckedMessage > 0 && pendingChunckedMessageCount > maxPendingChuckedMessage) {
pendingChunkedMessageCount++;
if (maxPendingChunkedMessage > 0 && pendingChunkedMessageCount > maxPendingChunkedMessage) {
removeOldestPendingChunkedMessage();
}
pendingChunckedMessageUuidQueue.add(msgMetadata.getUuid());
pendingChunkedMessageUuidQueue.add(msgMetadata.getUuid());
}

ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(msgMetadata.getUuid());
Expand Down Expand Up @@ -1300,8 +1297,8 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
}
// remove buffer from the map, add chucked messageId to unack-message tracker, and reduce pending-chunked-message count
chunkedMessagesMap.remove(msgMetadata.getUuid());
unAckedChunckedMessageIdSequenceMap.put(msgId, chunkedMsgCtx.chunkedMessageIds);
pendingChunckedMessageCount--;
unAckedChunkedMessageIdSequenceMap.put(msgId, chunkedMsgCtx.chunkedMessageIds);
pendingChunkedMessageCount--;
compressedPayload.release();
compressedPayload = chunkedMsgCtx.chunkedMsgBuffer;
chunkedMsgCtx.recycle();
Expand Down Expand Up @@ -2352,9 +2349,9 @@ public void recycle() {
private void removeOldestPendingChunkedMessage() {
ChunkedMessageCtx chunkedMsgCtx = null;
String firstPendingMsgUuid = null;
while (chunkedMsgCtx == null && !pendingChunckedMessageUuidQueue.isEmpty()) {
while (chunkedMsgCtx == null && !pendingChunkedMessageUuidQueue.isEmpty()) {
// remove oldest pending chunked-message group and free memory
firstPendingMsgUuid = pendingChunckedMessageUuidQueue.poll();
firstPendingMsgUuid = pendingChunkedMessageUuidQueue.poll();
chunkedMsgCtx = StringUtils.isNotBlank(firstPendingMsgUuid) ? chunkedMessagesMap.get(firstPendingMsgUuid)
: null;
}
Expand All @@ -2367,11 +2364,11 @@ protected void removeExpireIncompleteChunkedMessages() {
}
ChunkedMessageCtx chunkedMsgCtx = null;
String messageUUID;
while ((messageUUID = pendingChunckedMessageUuidQueue.peek()) != null) {
while ((messageUUID = pendingChunkedMessageUuidQueue.peek()) != null) {
chunkedMsgCtx = StringUtils.isNotBlank(messageUUID) ? chunkedMessagesMap.get(messageUUID) : null;
if (chunkedMsgCtx != null && System
.currentTimeMillis() > (chunkedMsgCtx.receivedTime + expireTimeOfIncompleteChunkedMessageMillis)) {
pendingChunckedMessageUuidQueue.remove(messageUUID);
pendingChunkedMessageUuidQueue.remove(messageUUID);
removeChunkMessage(messageUUID, chunkedMsgCtx, true);
} else {
return;
Expand Down Expand Up @@ -2402,7 +2399,7 @@ private void removeChunkMessage(String msgUUID, ChunkedMessageCtx chunkedMsgCtx,
chunkedMsgCtx.chunkedMsgBuffer.release();
}
chunkedMsgCtx.recycle();
pendingChunckedMessageCount--;
pendingChunkedMessageCount--;
}

private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId messageId, AckType ackType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,14 @@ public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(T
});
}

@Override
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) {
String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions";
return httpClient.get(String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=true",
PartitionedTopicMetadata.class);
}

@Override
public String getServiceUrl() {
return httpClient.getServiceUrl();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -622,7 +621,7 @@ public void redeliverUnacknowledgedMessages() {
try {
consumers.values().stream().forEach(consumer -> {
consumer.redeliverUnacknowledgedMessages();
consumer.unAckedChunckedMessageIdSequenceMap.clear();
consumer.unAckedChunkedMessageIdSequenceMap.clear();
});
incomingMessages.clear();
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand All @@ -42,7 +40,6 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.common.util.FutureUtil;

public class MultiTopicsReaderImpl<T> implements Reader<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,15 +354,15 @@ public void flush() {

// if messageId is checked then all the chunked related to that msg also processed so, ack all of
// them
MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunckedMessageIdSequenceMap.get(msgId);
MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
if (chunkMsgIds != null && chunkMsgIds.length > 1) {
for (MessageIdImpl cMsgId : chunkMsgIds) {
if (cMsgId != null) {
entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
}
}
// messages will be acked so, remove checked message sequence
this.consumer.unAckedChunckedMessageIdSequenceMap.remove(msgId);
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
} else {
entriesToAck.add(Triple.of(msgId.getLedgerId(), msgId.getEntryId(), null));
}
Expand Down Expand Up @@ -402,7 +402,7 @@ public void flush() {

// if messageId is checked then all the chunked related to that msg also processed so, ack all of
// them
MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunckedMessageIdSequenceMap.get(entry.getRight());
MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.get(entry.getRight());
long mostSigBits = entry.getLeft();
long leastSigBits = entry.getMiddle();
MessageIdImpl messageId = entry.getRight();
Expand All @@ -413,7 +413,7 @@ public void flush() {
}
}
// messages will be acked so, remove checked message sequence
this.consumer.unAckedChunckedMessageIdSequenceMap.remove(messageId);
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId);
} else {
newAckCommand(consumer.consumerId, messageId, null, AckType.Individual, null, Collections.emptyMap(), cnx, false, mostSigBits, leastSigBits);
}
Expand Down Expand Up @@ -504,7 +504,7 @@ private void newAckCommand(long consumerId, MessageIdImpl msgId, BitSetRecyclabl
AckType ackType, ValidationError validationError, Map<String, Long> map, ClientCnx cnx,
boolean flush, long txnidMostBits, long txnidLeastBits) {

MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunckedMessageIdSequenceMap.get(msgId);
MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
if (chunkMsgIds != null && txnidLeastBits < 0 && txnidMostBits < 0) {
if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())
&& ackType != AckType.Cumulative) {
Expand All @@ -531,7 +531,7 @@ private void newAckCommand(long consumerId, MessageIdImpl msgId, BitSetRecyclabl
}
}
}
this.consumer.unAckedChunckedMessageIdSequenceMap.remove(msgId);
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
} else {
ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), msgId.getEntryId(), lastCumulativeAckSet,
ackType, validationError, map, txnidLeastBits, txnidMostBits, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,13 @@ public void run(Timeout t) throws Exception {
public static void addChunkedMessageIdsAndRemoveFromSequnceMap(MessageId messageId, Set<MessageId> messageIds,
ConsumerBase<?> consumerBase) {
if (messageId instanceof MessageIdImpl) {
MessageIdImpl[] chunkedMsgIds = consumerBase.unAckedChunckedMessageIdSequenceMap.get((MessageIdImpl) messageId);
MessageIdImpl[] chunkedMsgIds = consumerBase.unAckedChunkedMessageIdSequenceMap.get((MessageIdImpl) messageId);
if (chunkedMsgIds != null && chunkedMsgIds.length > 0) {
for (MessageIdImpl msgId : chunkedMsgIds) {
messageIds.add(msgId);
}
}
consumerBase.unAckedChunckedMessageIdSequenceMap.remove((MessageIdImpl) messageId);
consumerBase.unAckedChunkedMessageIdSequenceMap.remove((MessageIdImpl) messageId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.reader.ProtobufNativeReader;
import org.apache.pulsar.client.impl.schema.writer.ProtobufNativeWriter;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class AcknowledgementsGroupingTrackerTest {
public void setup() {
eventLoopGroup = new NioEventLoopGroup(1);
consumer = mock(ConsumerImpl.class);
consumer.unAckedChunckedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();
consumer.unAckedChunkedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();
cnx = mock(ClientCnx.class);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(cnx.ctx()).thenReturn(ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.functions.instance;

import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.functions.CryptoConfig;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.CryptoConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.ConnectorDefinition;
Expand All @@ -56,7 +55,6 @@

import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee;
import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;

Expand Down
Loading

0 comments on commit 18fd15b

Please sign in to comment.