Skip to content

Commit

Permalink
cosmetic
Browse files Browse the repository at this point in the history
  • Loading branch information
exe-dealer committed May 26, 2023
1 parent a8c06ba commit 8b9f182
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions mod.js
Original file line number Diff line number Diff line change
Expand Up @@ -1433,7 +1433,7 @@ class MessageReader extends BinaryReader {
}
}
_readDataRow() {
const nfields = this._readInt16()
const nfields = this._readInt16();
const row = Array(nfields);
for (let i = 0; i < nfields; i++) {
const valsize = this._readInt32();
Expand All @@ -1460,6 +1460,7 @@ class MessageReader extends BinaryReader {
_readCopyResponse() {
const binary = this._readUint8();
const columns = this._array(this._readInt16(), _ => ({
// TODO make consistent with _readParameterDescription, object[] or number[]
binary: this._readInt16(),
}));
return { binary, columns };
Expand Down Expand Up @@ -1520,10 +1521,12 @@ class MessageReader extends BinaryReader {
const readbuf = new Uint8Array(1);
// Deno doc says that nread never equals 0, so don't need to repeat
const nread = await _net.read(socket, readbuf);
if (nread == null) throw new PgError({
message: 'Postgres unexpectedly closed connection, ssl response expected',
code: 'protocol_violation',
});
if (nread == null) {
throw new PgError({
message: 'Postgres unexpectedly closed connection, ssl response expected',
code: 'protocol_violation',
});
}
const [resp] = readbuf;
if (resp == 0x53 /*S*/) return true;
if (resp == 0x4e /*N*/) return false;
Expand Down Expand Up @@ -1571,7 +1574,8 @@ class ReplicationStream extends BinaryReader {
.join(',')
.replace(/.+/, '($&)')
);
// TODO get wal_sender_timeout
// TODO get wal_sender_timeout,
// but user may have no access to wal_sender_timeout setting
const startReplSql = `START_REPLICATION SLOT ${pgident(slot)} LOGICAL ${startLsn} ${optionsSql}`;
const rx = conn.stream(startReplSql, { stdin: this._tx });

Expand Down Expand Up @@ -1620,7 +1624,7 @@ class ReplicationStream extends BinaryReader {
if (lsn > this._ackingLsn) {
this._ackingLsn = lsn;
}
let nlsn = BigInt('0x' + this._ackingLsn.replace('/', ''));
const nlsn = BigInt('0x' + this._ackingLsn.replace('/', ''));
// https://github.com/postgres/postgres/blob/0526f2f4c38cb50d3e2a6e0aa5d51354158df6e3/src/backend/replication/logical/worker.c#L2473-L2478
// https://github.com/postgres/postgres/blob/0526f2f4c38cb50d3e2a6e0aa5d51354158df6e3/src/backend/replication/walsender.c#L2021-L2023
// TODO accept { written, flushed, applied, immediate }
Expand Down Expand Up @@ -2602,7 +2606,7 @@ function md5(/** @type {Uint8Array} */ input) {
}

export const _net = {
async connect(options) {
connect(options) {
return Deno.connect(options);
},
reconnectable(err) {
Expand All @@ -2627,7 +2631,7 @@ export const _net = {
nwritten += await socket.write(arr.subarray(nwritten));
}
},
async read(socket, buf) {
read(socket, buf) {
return socket.read(buf);
},
closeNullable(socket) {
Expand Down

0 comments on commit 8b9f182

Please sign in to comment.