Skip to content

Commit

Permalink
GEODE-5485: Introduce InternalAsyncEventQueue interface
Browse files Browse the repository at this point in the history
Cleanup AsyncEventQueueImpl and implement InternalAsyncEventQueue.
  • Loading branch information
kirklund committed Aug 9, 2018
1 parent 3f7c777 commit 3f8b7c5
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.concurrent.TimeUnit;

import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayEventSubstitutionFilter;
import org.apache.geode.cache.wan.GatewaySender;
Expand All @@ -29,24 +28,24 @@
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.InternalGatewaySender;
import org.apache.geode.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor;

public class AsyncEventQueueImpl implements AsyncEventQueue {

private GatewaySender sender = null;

private AsyncEventListener asyncEventListener = null;
public class AsyncEventQueueImpl implements InternalAsyncEventQueue {

public static final String ASYNC_EVENT_QUEUE_PREFIX = "AsyncEventQueue_";

public AsyncEventQueueImpl(GatewaySender sender, AsyncEventListener eventListener) {
private final GatewaySender sender;
private final AsyncEventListener asyncEventListener;

public AsyncEventQueueImpl(GatewaySender sender, AsyncEventListener asyncEventListener) {
this.sender = sender;
this.asyncEventListener = eventListener;
this.asyncEventListener = asyncEventListener;
}

@Override
public String getId() {
return getAsyncEventQueueIdFromSenderId(this.sender.getId());
return getAsyncEventQueueIdFromSenderId(sender.getId());
}

@Override
Expand Down Expand Up @@ -133,12 +132,14 @@ public int size() {
return size;
}

public GatewaySender getSender() {
return this.sender;
@Override
public InternalGatewaySender getSender() {
return (InternalGatewaySender) sender;
}

@Override
public AsyncEventQueueStats getStatistics() {
AbstractGatewaySender abstractSender = (AbstractGatewaySender) this.sender;
AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
return ((AsyncEventQueueStats) abstractSender.getStatistics());
}

Expand All @@ -154,7 +155,7 @@ public boolean equals(Object obj) {
return false;
}
AsyncEventQueueImpl asyncEventQueue = (AsyncEventQueueImpl) obj;
if (asyncEventQueue.getId().equals(this.getId())) {
if (asyncEventQueue.getId().equals(getId())) {
return true;
}
return false;
Expand Down Expand Up @@ -193,8 +194,8 @@ public boolean isMetaQueue() {
}

public void stop() {
if (this.sender.isRunning()) {
this.sender.stop();
if (sender.isRunning()) {
sender.stop();
}
}

Expand All @@ -203,8 +204,8 @@ public void destroy() {
}

public void destroy(boolean initiator) {
InternalCache cache = ((AbstractGatewaySender) this.sender).getCache();
((AbstractGatewaySender) this.sender).destroy(initiator);
InternalCache cache = ((AbstractGatewaySender) sender).getCache();
((AbstractGatewaySender) sender).destroy(initiator);
cache.removeAsyncEventQueue(this);
}

Expand All @@ -214,16 +215,16 @@ public boolean isBucketSorted() {
}

public boolean isForwardExpirationDestroy() {
return ((AbstractGatewaySender) this.sender).isForwardExpirationDestroy();
return ((AbstractGatewaySender) sender).isForwardExpirationDestroy();
}

public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws InterruptedException {
return ((AbstractGatewaySender) this.sender).waitUntilFlushed(timeout, unit);
return ((AbstractGatewaySender) sender).waitUntilFlushed(timeout, unit);
}

@Override
public String toString() {
return new StringBuffer().append(getClass().getSimpleName()).append("{").append("id=" + getId())
.append(",isRunning=" + this.sender.isRunning()).append("}").toString();
.append(",isRunning=" + sender.isRunning()).append("}").toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geode.cache.asyncqueue.internal;

import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.internal.cache.wan.InternalGatewaySender;

public interface InternalAsyncEventQueue extends AsyncEventQueue {

InternalGatewaySender getSender();

AsyncEventQueueStats getStatistics();
}

0 comments on commit 3f8b7c5

Please sign in to comment.