Skip to content

Commit

Permalink
[Transaction][Buffer] move the transaction buffer code to the broker …
Browse files Browse the repository at this point in the history
…module (apache#4919)


*Motivation*

Currently, all transaction buffer code in the transaction module. A transaction buffer inherits `PersistentTopic`, it depends on the pulsar-broker module. When we need to create a transaction buffer at the pulsar-broker, the pulsar-broker depends on the transaction module. So it will cause cyclic reference.

*Modifications*

- move the code under transaction buffer module to the broker module
  • Loading branch information
zymap authored and sijie committed Aug 10, 2019
1 parent 18d21ac commit 1ad632d
Show file tree
Hide file tree
Showing 30 changed files with 87 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer;
package org.apache.pulsar.broker.transaction.buffer;

import com.google.common.annotations.Beta;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -52,7 +52,7 @@ public interface TransactionBuffer {
*
* @param txnID the transaction id
* @return a future represents the result of the operation
* @throws org.apache.pulsar.transaction.buffer.exceptions.TransactionNotFoundException if the transaction
* @throws org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotFoundException if the transaction
* is not in the buffer.
*/
CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID);
Expand All @@ -66,7 +66,7 @@ public interface TransactionBuffer {
* @param sequenceId the sequence id of the entry in this transaction buffer.
* @param buffer the entry buffer
* @return a future represents the result of the operation.
* @throws org.apache.pulsar.transaction.buffer.exceptions.TransactionSealedException if the transaction
* @throws org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionSealedException if the transaction
* has been sealed.
*/
CompletableFuture<Void> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer);
Expand All @@ -78,7 +78,7 @@ public interface TransactionBuffer {
* @param txnID transaction id
* @param startSequenceId the sequence id to start read
* @return a future represents the result of open operation.
* @throws org.apache.pulsar.transaction.buffer.exceptions.TransactionNotFoundException if the transaction
* @throws org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotFoundException if the transaction
* is not in the buffer.
*/
CompletableFuture<TransactionBufferReader> openTransactionBufferReader(TxnID txnID, long startSequenceId);
Expand All @@ -92,7 +92,7 @@ public interface TransactionBuffer {
* @param committedAtLedgerId the data ledger id where the commit marker of the transaction was appended to.
* @param committedAtEntryId the data ledger id where the commit marker of the transaction was appended to.
* @return a future represents the result of commit operation.
* @throws org.apache.pulsar.transaction.buffer.exceptions.TransactionNotFoundException if the transaction
* @throws org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotFoundException if the transaction
* is not in the buffer.
*/
CompletableFuture<Void> commitTxn(TxnID txnID, long committedAtLedgerId, long committedAtEntryId);
Expand All @@ -103,7 +103,7 @@ public interface TransactionBuffer {
*
* @param txnID the transaction id
* @return a future represents the result of abort operation.
* @throws org.apache.pulsar.transaction.buffer.exceptions.TransactionNotFoundException if the transaction
* @throws org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotFoundException if the transaction
* is not in the buffer.
*/
CompletableFuture<Void> abortTxn(TxnID txnID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer;
package org.apache.pulsar.broker.transaction.buffer;

import static com.google.common.base.Preconditions.checkArgument;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer;
package org.apache.pulsar.broker.transaction.buffer;

import com.google.common.annotations.Beta;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.transaction.buffer.exceptions.EndOfTransactionException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.EndOfTransactionException;

/**
* A reader to read entries of a given transaction from transaction buffer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer;
package org.apache.pulsar.broker.transaction.buffer;

import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer;
package org.apache.pulsar.broker.transaction.buffer;

import com.google.common.annotations.Beta;
import io.netty.buffer.ByteBuf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer;
package org.apache.pulsar.broker.transaction.buffer;

import com.google.common.annotations.Beta;
import java.util.SortedMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer.exceptions;
package org.apache.pulsar.broker.transaction.buffer.exceptions;

/**
* Exception thrown when reaching end of a transaction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer.exceptions;
package org.apache.pulsar.broker.transaction.buffer.exceptions;

/**
* Exception is thrown when no transactions found committed at a given ledger.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer.exceptions;
package org.apache.pulsar.broker.transaction.buffer.exceptions;

/**
* The base exception class for the errors thrown from Transaction Buffer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer.exceptions;
package org.apache.pulsar.broker.transaction.buffer.exceptions;

/**
* Exception is thrown when the transaction is not found in the transaction buffer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer.exceptions;
package org.apache.pulsar.broker.transaction.buffer.exceptions;

/**
* Exception is thrown when opening a reader on a transaction that is not sealed yet.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer.exceptions;
package org.apache.pulsar.broker.transaction.buffer.exceptions;

/**
* Exception thrown if a transaction is already sealed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer.exceptions;
package org.apache.pulsar.broker.transaction.buffer.exceptions;

import org.apache.pulsar.transaction.impl.common.TxnID;
import org.apache.pulsar.transaction.impl.common.TxnStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@
/**
* Exceptions thrown when encountering errors in transaction buffer.
*/
package org.apache.pulsar.transaction.buffer.exceptions;
package org.apache.pulsar.broker.transaction.buffer.exceptions;
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer.impl;
package org.apache.pulsar.broker.transaction.buffer.impl;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
Expand All @@ -32,13 +32,13 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.transaction.buffer.TransactionMeta;
import org.apache.pulsar.transaction.buffer.exceptions.TransactionNotFoundException;
import org.apache.pulsar.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.transaction.buffer.exceptions.TransactionSealedException;
import org.apache.pulsar.transaction.buffer.exceptions.UnexpectedTxnStatusException;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotFoundException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionSealedException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.UnexpectedTxnStatusException;
import org.apache.pulsar.transaction.impl.common.TxnID;
import org.apache.pulsar.transaction.impl.common.TxnStatus;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer.impl;
package org.apache.pulsar.broker.transaction.buffer.impl;

import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;

/**
* A provider that provides in-memory implementations of {@link TransactionBuffer}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer.impl;
package org.apache.pulsar.broker.transaction.buffer.impl;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.transaction.buffer.TransactionEntry;
import org.apache.pulsar.transaction.buffer.exceptions.EndOfTransactionException;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionEntry;
import org.apache.pulsar.broker.transaction.buffer.exceptions.EndOfTransactionException;
import org.apache.pulsar.transaction.impl.common.TxnID;

/**
* A {@link TransactionBufferReader} implementation that reads entries from {@link InMemTransactionBuffer}.
*/
class InMemTransactionBufferReader implements TransactionBufferReader {
public class InMemTransactionBufferReader implements TransactionBufferReader {

private final TxnID txnId;
private final Iterator<Entry<Long, ByteBuf>> entries;
Expand All @@ -41,10 +41,8 @@ class InMemTransactionBufferReader implements TransactionBufferReader {

// the iterator should hold the references to the entries
// so when the reader is closed, all the entries can be released.
InMemTransactionBufferReader(TxnID txnId,
Iterator<Entry<Long, ByteBuf>> entries,
long committedAtLedgerId,
long committedAtEntryId) {
public InMemTransactionBufferReader(TxnID txnId, Iterator<Entry<Long, ByteBuf>> entries, long committedAtLedgerId,
long committedAtEntryId) {
this.txnId = txnId;
this.entries = entries;
this.committedAtLedgerId = committedAtLedgerId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer.impl;
package org.apache.pulsar.broker.transaction.buffer.impl;

import io.netty.buffer.ByteBuf;
import java.util.List;
Expand All @@ -38,11 +38,11 @@
import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.transaction.buffer.TransactionCursor;
import org.apache.pulsar.transaction.buffer.TransactionMeta;
import org.apache.pulsar.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionCursor;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.transaction.impl.common.TxnID;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer.impl;
package org.apache.pulsar.broker.transaction.buffer.impl;

import java.util.ArrayList;
import java.util.Comparator;
Expand All @@ -34,10 +34,10 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.transaction.buffer.TransactionEntry;
import org.apache.pulsar.transaction.buffer.TransactionMeta;
import org.apache.pulsar.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionEntry;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.transaction.impl.common.TxnStatus;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer.impl;
package org.apache.pulsar.broker.transaction.buffer.impl;

import java.util.HashSet;
import java.util.Map;
Expand All @@ -26,10 +26,10 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.transaction.buffer.TransactionCursor;
import org.apache.pulsar.transaction.buffer.TransactionMeta;
import org.apache.pulsar.transaction.buffer.exceptions.NoTxnsCommittedAtLedgerException;
import org.apache.pulsar.transaction.buffer.exceptions.TransactionNotFoundException;
import org.apache.pulsar.broker.transaction.buffer.TransactionCursor;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
import org.apache.pulsar.broker.transaction.buffer.exceptions.NoTxnsCommittedAtLedgerException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotFoundException;
import org.apache.pulsar.transaction.impl.common.TxnID;

public class TransactionCursorImpl implements TransactionCursor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer.impl;
package org.apache.pulsar.broker.transaction.buffer.impl;

import io.netty.buffer.ByteBuf;
import org.apache.pulsar.transaction.buffer.TransactionEntry;
import org.apache.pulsar.broker.transaction.buffer.TransactionEntry;
import org.apache.pulsar.transaction.impl.common.TxnID;

/**
* A simple implementation of {@link TransactionEntry}.
*/
class TransactionEntryImpl implements TransactionEntry {
public class TransactionEntryImpl implements TransactionEntry {

private final TxnID txnId;
private final long sequenceId;
private final long committedAtLedgerId;
private final long committedAtEntryId;
private final ByteBuf entryBuf;

TransactionEntryImpl(TxnID txnId,
public TransactionEntryImpl(TxnID txnId,
long sequenceId,
ByteBuf entryBuf,
long committedAtLedgerId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.buffer.impl;
package org.apache.pulsar.broker.transaction.buffer.impl;

import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.transaction.buffer.TransactionMeta;
import org.apache.pulsar.transaction.buffer.exceptions.EndOfTransactionException;
import org.apache.pulsar.transaction.buffer.exceptions.TransactionSealedException;
import org.apache.pulsar.transaction.buffer.exceptions.UnexpectedTxnStatusException;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
import org.apache.pulsar.broker.transaction.buffer.exceptions.EndOfTransactionException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionSealedException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.UnexpectedTxnStatusException;
import org.apache.pulsar.transaction.impl.common.TxnID;
import org.apache.pulsar.transaction.impl.common.TxnStatus;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@
/**
* The implementation of a transaction buffer.
*/
package org.apache.pulsar.transaction.buffer.impl;
package org.apache.pulsar.broker.transaction.buffer.impl;
Loading

0 comments on commit 1ad632d

Please sign in to comment.