Skip to content

Commit

Permalink
Atomic test wip
Browse files Browse the repository at this point in the history
  • Loading branch information
waxzce committed Nov 20, 2013
1 parent d2eae59 commit 8f9e482
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,29 @@ import org.ancelin.play2.couchbase.CouchbaseBucket
import scala.concurrent.{ Future, ExecutionContext }
import net.spy.memcached.ops.OperationStatus
import org.ancelin.play2.couchbase.client.CouchbaseFutures._
import net.spy.memcached.transcoders.Transcoder
import net.spy.memcached.CASValue
import collection.JavaConversions._
import play.api.libs.json.Reads


//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Counter Operations
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
trait Atomic {

def getAndLock(key: String, exp: Int)(implicit bucket: CouchbaseBucket, ec: ExecutionContext): Future[OperationStatus] = {
waitForOperationStatus(bucket.couchbaseClient.asyncGetAndLock(key, exp), ec)
def getAndLock[T](key: String, exp: Int)(implicit r:Reads[T], bucket: CouchbaseBucket, ec: ExecutionContext): Future[Option[CASValue[T]]] = {
// waitForOperationStatus(bucket.couchbaseClient.asyncGetAndLock(key, exp), ec)
waitForGetAndCas[T](bucket.couchbaseClient.asyncGetAndLock(key, exp), ec, r) map {
case value: CASValue[T] =>
println(value); Some[CASValue[T]](value)
case _ => None
}
}

def unlock(key: String, casId: Long)(implicit bucket: CouchbaseBucket, ec: ExecutionContext): Future[OperationStatus] = {
waitForOperationStatus(bucket.couchbaseClient.asyncUnlock(key, casId), ec)
}

}

Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package org.ancelin.play2.couchbase.client

import play.api.libs.json._
import scala.concurrent.{Future, ExecutionContext}
import com.couchbase.client.protocol.views.{DesignDocument, SpatialView, View, Query}
import scala.concurrent.{ Future, ExecutionContext }
import com.couchbase.client.protocol.views.{ DesignDocument, SpatialView, View, Query }
import play.api.libs.iteratee.Enumerator
import net.spy.memcached.ops.OperationStatus
import play.api.libs.json.JsObject
import net.spy.memcached.{PersistTo, ReplicateTo}
import org.ancelin.play2.couchbase.{Couchbase, CouchbaseBucket}
import net.spy.memcached.{ PersistTo, ReplicateTo }
import org.ancelin.play2.couchbase.{ Couchbase, CouchbaseBucket }
import java.util.concurrent.TimeUnit
import net.spy.memcached.CASValue

trait BucketAPI {
self: CouchbaseBucket =>
self: CouchbaseBucket =>

def find[T](docName:String, viewName: String)(query: Query)(implicit r: Reads[T], ec: ExecutionContext): Future[List[T]] = {
def find[T](docName: String, viewName: String)(query: Query)(implicit r: Reads[T], ec: ExecutionContext): Future[List[T]] = {
Couchbase.find[T](docName, viewName)(query)(self, r, ec)
}

Expand All @@ -39,11 +40,11 @@ trait BucketAPI {
def fetch[T](keys: Seq[String])(implicit r: Reads[T], ec: ExecutionContext): QueryEnumerator[(String, T)] = Couchbase.fetch[T](keys)(this, r, ec)
def fetchValues[T](keys: Seq[String])(implicit r: Reads[T], ec: ExecutionContext): QueryEnumerator[T] = Couchbase.fetchValues[T](keys)(this, r, ec)
def getWithKey[T](key: String)(implicit r: Reads[T], ec: ExecutionContext): Future[Option[(String, T)]] = Couchbase.getWithKey[T](key)(this, r, ec)
def rawSearch(docName:String, viewName: String)(query: Query)(implicit ec: ExecutionContext): QueryEnumerator[RawRow] = Couchbase.rawSearch(docName, viewName)(query)(this, ec)
def rawSearch(docName: String, viewName: String)(query: Query)(implicit ec: ExecutionContext): QueryEnumerator[RawRow] = Couchbase.rawSearch(docName, viewName)(query)(this, ec)
def rawSearch(view: View)(query: Query)(implicit ec: ExecutionContext): QueryEnumerator[RawRow] = Couchbase.rawSearch(view)(query)(this, ec)
def search[T](docName:String, viewName: String)(query: Query)(implicit r: Reads[T], ec: ExecutionContext): QueryEnumerator[TypedRow[T]] = Couchbase.search[T](docName, viewName)(query)(this, r, ec)
def search[T](docName: String, viewName: String)(query: Query)(implicit r: Reads[T], ec: ExecutionContext): QueryEnumerator[TypedRow[T]] = Couchbase.search[T](docName, viewName)(query)(this, r, ec)
def search[T](view: View)(query: Query)(implicit r: Reads[T], ec: ExecutionContext): QueryEnumerator[TypedRow[T]] = Couchbase.search[T](view)(query)(this, r, ec)
def searchValues[T](docName:String, viewName: String)(query: Query)(implicit r: Reads[T], ec: ExecutionContext): QueryEnumerator[T] = Couchbase.searchValues[T](docName, viewName)(query)(this, r, ec)
def searchValues[T](docName: String, viewName: String)(query: Query)(implicit r: Reads[T], ec: ExecutionContext): QueryEnumerator[T] = Couchbase.searchValues[T](docName, viewName)(query)(this, r, ec)
def searchValues[T](view: View)(query: Query)(implicit r: Reads[T], ec: ExecutionContext): QueryEnumerator[T] = Couchbase.searchValues[T](view)(query)(this, r, ec)

//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -112,7 +113,7 @@ trait BucketAPI {
Couchbase.setWithKey[T](key, value, exp, persistTo, replicateTo)(self, w, ec)
}

def setWithId[T <: {def id: String}](value: T, exp: Int = Constants.expiration, persistTo: PersistTo = PersistTo.ZERO, replicateTo: ReplicateTo = ReplicateTo.ZERO)(implicit w: Writes[T], ec: ExecutionContext): Future[OperationStatus] = {
def setWithId[T <: { def id: String }](value: T, exp: Int = Constants.expiration, persistTo: PersistTo = PersistTo.ZERO, replicateTo: ReplicateTo = ReplicateTo.ZERO)(implicit w: Writes[T], ec: ExecutionContext): Future[OperationStatus] = {
Couchbase.set[T](value.id, value, exp, persistTo, replicateTo)(self, w, ec)
}

Expand All @@ -136,7 +137,7 @@ trait BucketAPI {
Couchbase.addWithKey[T](key, value, exp, persistTo, replicateTo)(self, w, ec)
}

def addWithId[T <: {def id: String}](value: T, exp: Int = Constants.expiration, persistTo: PersistTo = PersistTo.ZERO, replicateTo: ReplicateTo = ReplicateTo.ZERO)(implicit w: Writes[T], ec: ExecutionContext): Future[OperationStatus] = {
def addWithId[T <: { def id: String }](value: T, exp: Int = Constants.expiration, persistTo: PersistTo = PersistTo.ZERO, replicateTo: ReplicateTo = ReplicateTo.ZERO)(implicit w: Writes[T], ec: ExecutionContext): Future[OperationStatus] = {
Couchbase.add[T](value.id, value, exp, persistTo, replicateTo)(self, w, ec)
}

Expand All @@ -160,7 +161,7 @@ trait BucketAPI {
Couchbase.replaceWithKey[T](key, value, exp, persistTo, replicateTo)(self, w, ec)
}

def replaceWithId[T <: {def id: String}](value: T, exp: Int = Constants.expiration, persistTo: PersistTo = PersistTo.ZERO, replicateTo: ReplicateTo = ReplicateTo.ZERO)(implicit w: Writes[T], ec: ExecutionContext): Future[OperationStatus] = {
def replaceWithId[T <: { def id: String }](value: T, exp: Int = Constants.expiration, persistTo: PersistTo = PersistTo.ZERO, replicateTo: ReplicateTo = ReplicateTo.ZERO)(implicit w: Writes[T], ec: ExecutionContext): Future[OperationStatus] = {
Couchbase.replace[T](value.id, value, exp, persistTo, replicateTo)(self, w, ec)
}

Expand All @@ -176,7 +177,7 @@ trait BucketAPI {
Couchbase.delete(key, persistTo, replicateTo)(self, ec)
}

def deleteWithId[T <: {def id: String}](value: T, persistTo: PersistTo = PersistTo.ZERO, replicateTo: ReplicateTo = ReplicateTo.ZERO)(implicit ec: ExecutionContext): Future[OperationStatus] = {
def deleteWithId[T <: { def id: String }](value: T, persistTo: PersistTo = PersistTo.ZERO, replicateTo: ReplicateTo = ReplicateTo.ZERO)(implicit ec: ExecutionContext): Future[OperationStatus] = {
Couchbase.delete(value.id, persistTo, replicateTo)(self, ec)
}

Expand All @@ -190,4 +191,8 @@ trait BucketAPI {

def flush(delay: Int)(implicit ec: ExecutionContext): Future[OperationStatus] = Couchbase.flush(delay)(self, ec)
def flush()(implicit ec: ExecutionContext): Future[OperationStatus] = Couchbase.flush()(self, ec)

def getAndLock[T](key: String, exp: Int)(implicit r: Reads[T], ec: ExecutionContext): Future[Option[CASValue[T]]] = {
Couchbase.getAndLock(key, exp)(r, self, ec)
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.ancelin.play2.couchbase.client

import net.spy.memcached.internal._
import scala.concurrent.{Promise, Future, ExecutionContext}
import com.couchbase.client.internal.{HttpCompletionListener, HttpFuture}
import net.spy.memcached.CASValue
import scala.concurrent.{ Promise, Future, ExecutionContext }
import com.couchbase.client.internal.{ HttpCompletionListener, HttpFuture }
import net.spy.memcached.ops.OperationStatus
import play.api.Logger
import play.api.libs.json.Reads

object CouchbaseFutures {

Expand Down Expand Up @@ -57,6 +59,30 @@ object CouchbaseFutures {
promise.future
}

def waitForGetAndCas[T](future: OperationFuture[CASValue[Object]], ec: ExecutionContext, r:Reads[T]): Future[CASValue[T]] = {
val promise = Promise[CASValue[T]]()
future.addListener(new OperationCompletionListener() {
def onComplete(f: OperationFuture[_]) = {
if (Constants.failWithOpStatus && (!f.getStatus.isSuccess)) {
promise.failure(new OperationFailedException(f.getStatus))
} else {
if (!f.getStatus.isSuccess) logger.error(f.getStatus.getMessage)
if (f.isDone || f.isCancelled) {
//val g =
promise.success(f.get().asInstanceOf[CASValue[T]])
} else {
if (checkFutures) promise.failure(new Throwable(s"GetFuture epic fail !!! ${f.isDone} : ${f.isCancelled}"))
else {
logger.info(s"GetFuture not completed yet, success anyway : ${f.isDone} : ${f.isCancelled}")
promise.success(f.get().asInstanceOf[CASValue[T]])
}
}
}
}
})
promise.future
}

def waitForHttpStatus[T](future: HttpFuture[T], ec: ExecutionContext): Future[OperationStatus] = {
val promise = Promise[OperationStatus]()
future.addListener(new HttpCompletionListener() {
Expand Down
38 changes: 36 additions & 2 deletions plugin/src/test/scala/AtomicTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,55 @@ import play.api.test._
import play.api.test.Helpers._
import org.ancelin.play2.couchbase.{ Couchbase, CouchbaseController }
import play.GlobalSettings
import play.api.libs.json.{ Reads, Json }
import org.specs2.execute.AsResult
import net.spy.memcached.ops.OperationException
import scala.concurrent.Await
import scala.concurrent.duration.Duration


@RunWith(classOf[JUnitRunner])
class AtomicTest extends Specification {

case class TestValue(value: String, number: Int, some: List[TestValue])

implicit val fmt = Json.format[TestValue]
implicit val urlReader = Json.reads[TestValue]
implicit val urlWriter = Json.writes[TestValue]

val fakeapp = FakeApplication(
additionalPlugins = Seq("org.ancelin.play2.couchbase.CouchbasePlugin"))

val tk = "mylocktestkey2"

"This test assume you have a default configuration of couchbase 2.1 running on your localhost computer" in ok

"Couchbase" should {

"connect" in new WithApplication(fakeapp) {
Couchbase.buckets must not empty
}

"set key \"" + tk + "\" in default bucket" in new WithApplication(fakeapp) {
implicit val e = Couchbase.couchbaseExecutor

val tv = new TestValue("testValue", 42, List())
val s = Couchbase.defaultBucket.set[TestValue](tk, tv)
Await.result(s, Duration(20000, "millis")).isSuccess must equalTo(true)

}

"lock the key \"" + tk + "\" in default bucket" in new WithApplication(fakeapp) {

implicit val e = Couchbase.couchbaseExecutor

val s = Couchbase.defaultBucket.getAndLock(tk, 3600)

Await.result(s, Duration(20000, "millis")).fold(assert(false))(x => {
println(x.getCas())
x.getValue().value in ok
assert(true)
})
}

}

}

0 comments on commit 8f9e482

Please sign in to comment.