Skip to content

Commit

Permalink
fix #71 - pool a wrapped vibe.d TCPConnection, as there's no way to c…
Browse files Browse the repository at this point in the history
…leanup the pool if PostgreSQL restarts
  • Loading branch information
pinver committed Aug 24, 2017
1 parent f816288 commit 34ff74a
Showing 1 changed file with 80 additions and 21 deletions.
101 changes: 80 additions & 21 deletions source/ddb/postgres.d
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,14 @@ class PGStream
{
version (Have_vibe_d_core)
{
private TCPConnection m_socket;
private TCPConnectionWrapper m_socket;

@property TCPConnection socket()
@property TCPConnectionWrapper socket()
{
return m_socket;
}

this(TCPConnection socket)
this(TCPConnectionWrapper socket)
{
m_socket = socket;
}
Expand Down Expand Up @@ -1664,7 +1664,7 @@ class PGConnection

version(Have_vibe_d_core)
{
stream = new PGStream(connectTCP(params["host"], port));
stream = new PGStream(new TCPConnectionWrapper(params["host"], port));
}
else
{
Expand Down Expand Up @@ -2335,27 +2335,86 @@ class PGResultSet(Specs...)

version(Have_vibe_d_core)
{
import vibe.core.connectionpool;
import vibe.core.connectionpool;

class PostgresDB {
private {
string[string] m_params;
ConnectionPool!PGConnection m_pool;
}
// wrap vibe.d TCPConnection class with the scope of reopening the tcp connection if closed
// by PostgreSQL it for some reason.
// see https://forum.rejectedsoftware.com/groups/rejectedsoftware.vibed/thread/44097/
private class TCPConnectionWrapper
{
this(string host, ushort port, string bindInterface = null, ushort bindPort = cast(ushort)0u)
{
this.host = host;
this.port = port;
this.bindInterface = bindInterface;
this.bindPort = bindPort;

this(string[string] conn_params)
{
m_params = conn_params.dup;
m_pool = new ConnectionPool!PGConnection(&createConnection);
}
connect();
}

auto lockConnection() { return m_pool.lockConnection(); }
void close(){ tcpConnection.close(); }

private PGConnection createConnection()
{
return new PGConnection(m_params);
}
}
void write(const(ubyte[]) bytes)
{
// Vibe: "... If connected is false, writing to the connection will trigger an exception ..."
if (!tcpConnection.connected)
{
// Vibe: " ... Note that close must always be called, even if the remote has already closed the
// connection. Failure to do so will result in resource and memory leakage.
tcpConnection.close();
connect();
}
tcpConnection.write(bytes);
}

void read(ubyte[] dst)
{
if (!tcpConnection.connected)
{
tcpConnection.close();
connect();
}
if (!tcpConnection.empty)
{
tcpConnection.read(dst);
}
}

private
{
void connect()
{
tcpConnection = connectTCP(host, port, bindInterface, bindPort);
}

string host;
string bindInterface;
ushort port;
ushort bindPort;

TCPConnection tcpConnection;
}
}

class PostgresDB {
private {
string[string] m_params;
ConnectionPool!PGConnection m_pool;
}

this(string[string] conn_params)
{
m_params = conn_params.dup;
m_pool = new ConnectionPool!PGConnection(&createConnection);
}

auto lockConnection() { return m_pool.lockConnection(); }

private PGConnection createConnection()
{
return new PGConnection(m_params);
}
}
}
else
{
Expand Down

0 comments on commit 34ff74a

Please sign in to comment.