Skip to content

Commit

Permalink
Parse & Sync and Close & Sync
Browse files Browse the repository at this point in the history
  • Loading branch information
marat-gainullin committed Jan 21, 2019
1 parent 5c4b37f commit 0e06eb2
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions src/main/java/com/github/pgasync/netty/NettyPgProtocolStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ public CompletableFuture<Message> send(Message message) {
return offerRoundTrip(() -> {
lastSentMessage = message;
write(message);
if (message instanceof ExtendedQueryMessage) {
write(FIndicators.SYNC);
}
});
}

Expand All @@ -154,8 +157,9 @@ public CompletableFuture<Integer> send(Bind bind, Describe describe, Consumer<Ro
this.onRow = onRow;
this.onAffected = null;
return offerRoundTrip(() -> {
lastSentMessage = new Execute();
write(bind, describe, lastSentMessage, FIndicators.SYNC);
Execute execute;
lastSentMessage = execute = new Execute();
write(bind, describe, execute, FIndicators.SYNC);
}).thenApply(commandComplete -> ((CommandComplete) commandComplete).getAffectedRows());
}

Expand All @@ -165,8 +169,9 @@ public CompletableFuture<Integer> send(Bind bind, Consumer<DataRow> onRow) {
this.onRow = onRow;
this.onAffected = null;
return offerRoundTrip(() -> {
lastSentMessage = new Execute();
write(bind, lastSentMessage, FIndicators.SYNC);
Execute execute;
lastSentMessage = execute = new Execute();
write(bind, execute, FIndicators.SYNC);
}).thenApply(commandComplete -> ((CommandComplete) commandComplete).getAffectedRows());
}

Expand Down Expand Up @@ -230,6 +235,8 @@ private void respondWithMessage(Message message) {
Logger.getLogger(NettyPgProtocolStream.class.getName()).log(Level.WARNING, message.toString());
} else if (message == BIndicators.BIND_COMPLETE) {
// op op since bulk message sequence
} else if (message == BIndicators.PARSE_COMPLETE || message == BIndicators.CLOSE_COMPLETE) {
readyForQueryPendingMessage = message;
} else if (message instanceof RowDescription) {
onColumns.accept(((RowDescription) message).getColumns());
} else if (message == BIndicators.NO_DATA) {
Expand Down

0 comments on commit 0e06eb2

Please sign in to comment.