Skip to content

Commit

Permalink
Update WriteStream implementations with callback removals
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 15, 2023
1 parent 0229fae commit f601576
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -808,15 +808,16 @@ public void testResponseBodyStream(boolean close) throws Exception {
});
}
@Override
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
public Future<Void> write(Buffer data) {
size.addAndGet(data.length());
super.write(data, handler);
return super.write(data);
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
public Future<Void> end() {
ended.set(true);
super.end(handler);
return super.end();
}

@Override
public boolean writeQueueFull() {
return paused;
Expand Down Expand Up @@ -849,9 +850,9 @@ public void testResponseBodyStreamError() throws Exception {
AtomicInteger received = new AtomicInteger();
WriteStream<Buffer> stream = new WriteStreamBase() {
@Override
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
public Future<Void> write(Buffer data) {
received.addAndGet(data.length());
super.write(data, handler);
return super.write(data);
}
};
HttpRequest<Buffer> get = webClient.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath");
Expand Down Expand Up @@ -880,9 +881,9 @@ public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
return this;
}
@Override
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
public Future<Void> write(Buffer data) {
exceptionHandler.handle(cause);
handler.handle(Future.failedFuture(cause));
return Future.failedFuture(cause);
}
};
HttpRequest<Buffer> get = webClient.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath");
Expand Down Expand Up @@ -1015,14 +1016,14 @@ public void testResponseWriteStreamMissingBody() throws Exception {
AtomicBoolean ended = new AtomicBoolean();
WriteStream<Buffer> stream = new WriteStreamBase() {
@Override
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
public Future<Void> write(Buffer data) {
length.addAndGet(data.length());
super.write(data, handler);
return super.write(data);
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
public Future<Void> end() {
ended.set(true);
super.end(handler);
return super.end();
}
};
testResponseMissingBody(BodyCodec.pipe(stream));
Expand Down Expand Up @@ -2059,19 +2060,12 @@ public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {

@Override
public Future<Void> write(Buffer data) {
Promise<Void> promise = Promise.promise();
write(data, promise);
return promise.future();
}

@Override
public void write(Buffer buffer, Handler<AsyncResult<Void>> handler) {
handler.handle(Future.succeededFuture());
return Future.succeededFuture();
}

@Override
public void end(Handler<AsyncResult<Void>> handler) {
handler.handle(Future.succeededFuture());
public Future<Void> end() {
return Future.succeededFuture();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,39 +103,31 @@ public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
return this;
}

@Override
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
buffer.appendBuffer(data);
handler.handle(Future.succeededFuture());
}

@Override
public Future<Void> write(Buffer data) {
buffer.appendBuffer(data);
return Future.succeededFuture();
}

@Override
public void end(Handler<AsyncResult<Void>> handler) {
public Future<Void> end() {

if (!state.future().isComplete()) {
T result;
if (buffer.length() > 0) {
try {
result = decoder.apply(buffer);
} catch (Throwable t) {
state.fail(t);
if (handler != null) {
handler.handle(Future.failedFuture(t));
}
return;
return Future.failedFuture(t);
}
} else {
result = null;
}
state.complete(result);
if (handler != null) {
handler.handle(Future.succeededFuture());
}
return Future.succeededFuture();
} else {
return Future.failedFuture("Already ended");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,9 @@ public Future<Void> write(Buffer buffer) {
}

@Override
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
parser.handle(data);
if (handler != null) {
handler.handle(Future.succeededFuture());
}
}

@Override
public void end(Handler<AsyncResult<Void>> handler) {
public Future<Void> end() {
parser.end();
if (handler != null) {
handler.handle(Future.succeededFuture());
}
return Future.succeededFuture();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,36 +77,26 @@ public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
return this;
}

@Override
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
stream.write(data).onComplete(handler);
}

@Override
public Future<Void> write(Buffer data) {
Promise<Void> promise = Promise.promise();
write(data, promise);
return promise.future();
return stream.write(data);
}

@Override
public void end(Handler<AsyncResult<Void>> handler) {
public Future<Void> end() {
if (close) {
stream.end().onComplete(ar -> {
if (ar.succeeded()) {
promise.tryComplete();
} else {
promise.tryFail(ar.cause());
}
if (handler != null) {
handler.handle(ar);
}
});
return stream
.end()
.onComplete(ar -> {
if (ar.succeeded()) {
promise.tryComplete();
} else {
promise.tryFail(ar.cause());
}
});
} else {
promise.tryComplete();
if (handler != null) {
handler.handle(Future.succeededFuture());
}
return Future.succeededFuture();
}
}

Expand Down

0 comments on commit f601576

Please sign in to comment.