Skip to content

Commit

Permalink
Removed client-admin dependency from function-utils (apache#2739)
Browse files Browse the repository at this point in the history
  • Loading branch information
srkukarni authored Oct 6, 2018
1 parent c3c7f0d commit f8e7dfe
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.instance;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
* Utils used for instance.
*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class Utils {

public static final long getSequenceId(MessageId messageId) {
MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl)
? ((TopicMessageIdImpl) messageId).getInnerMessageId()
: messageId);
long ledgerId = msgId.getLedgerId();
long entryId = msgId.getEntryId();

// Combine ledger id and entry id to form offset
// Use less than 32 bits to represent entry id since it will get
// rolled over way before overflowing the max int range
long offset = (ledgerId << 28) | entryId;
return offset;
}

public static final MessageId getMessageId(long sequenceId) {
// Demultiplex ledgerId and entryId from offset
long ledgerId = sequenceId >>> 28;
long entryId = sequenceId & 0x0F_FF_FF_FFL;

return new MessageIdImpl(ledgerId, entryId, -1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.instance.Utils;

@Builder
@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.utils;
package org.apache.pulsar.functions.instance;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down
14 changes: 10 additions & 4 deletions pulsar-functions/utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,20 @@
<dependencies>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-admin-original</artifactId>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-io-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.utils.validation.ConfigValidation;

import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isListEntryCustom;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.HashMap;
import java.util.Map;

import static org.apache.commons.lang.StringUtils.isBlank;
import static org.apache.commons.lang.StringUtils.isNotBlank;
import static org.apache.commons.lang.StringUtils.isNotEmpty;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@

import lombok.*;

import java.util.HashMap;
import java.util.Map;

@Getter
@Setter
@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
import java.net.ServerSocket;
import java.util.Collection;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
import org.apache.pulsar.io.core.Sink;
Expand All @@ -57,28 +54,6 @@ public class Utils {
public static String FILE = "file";
public static String BUILTIN = "builtin";

public static final long getSequenceId(MessageId messageId) {
MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl)
? ((TopicMessageIdImpl) messageId).getInnerMessageId()
: messageId);
long ledgerId = msgId.getLedgerId();
long entryId = msgId.getEntryId();

// Combine ledger id and entry id to form offset
// Use less than 32 bits to represent entry id since it will get
// rolled over way before overflowing the max int range
long offset = (ledgerId << 28) | entryId;
return offset;
}

public static final MessageId getMessageId(long sequenceId) {
// Demultiplex ledgerId and entryId from offset
long ledgerId = sequenceId >>> 28;
long entryId = sequenceId & 0x0F_FF_FF_FFL;

return new MessageIdImpl(ledgerId, entryId, -1);
}

public static String printJson(MessageOrBuilder msg) throws IOException {
return JsonFormat.printer().print(msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pulsar.functions.utils.validation;

import org.apache.pulsar.functions.utils.FunctionConfig;

import java.lang.annotation.ElementType;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.apache.pulsar.io.jdbc;

import static jersey.repackaged.com.google.common.base.Preconditions.checkState;

import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.PreparedStatement;
Expand Down Expand Up @@ -139,8 +137,9 @@ private void flush() {
if (log.isDebugEnabled()) {
log.debug("Starting flush, queue size: {}", incomingList.size());
}
checkState(swapList.isEmpty(),
"swapList should be empty since last flush. swapList.size: " + swapList.size());
if (!swapList.isEmpty()) {
throw new IllegalStateException("swapList should be empty since last flush. swapList.size: " + swapList.size());
}

synchronized (incomingList) {
List<Record<T>> tmpList;
Expand Down

0 comments on commit f8e7dfe

Please sign in to comment.