Skip to content

Commit

Permalink
transaction manager fix. Support for DBConnection.transaction modes :…
Browse files Browse the repository at this point in the history
…transaction ans :savepoint
  • Loading branch information
mjaric committed May 4, 2018
1 parent 4e78fe5 commit 5b3db0f
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 73 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# v1.1.1
# v1.1.2
* Bugfix
- Float loses precision. Fix will force floats to be encoded as 64bit floats and param type as float(53) in order to keep all bits

- Fix for: Rollback is called twice on failure in explicit transaction. (DBConnection [mode: :savepoint] support)

# v1.1.0
## Breaking Changes
UUID/UNIQUEIDENTIFER column is now stored AS IS in database, meaning that compatibility with Ecto.UUID is broken,
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Add Tds as a dependency in your `mix.exs` file.

```elixir
def deps do
[{:tds, "~> 1.0"} ]
[{:tds, "~> 1.1"} ]
end
```

Expand Down
8 changes: 3 additions & 5 deletions lib/tds.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ defmodule Tds do

def prepare(pid, statement, opts \\ []) do
query = %Query{statement: statement}

case DBConnection.prepare(pid, query, opts) do
{:ok, query} -> {:ok, query}
{:error, err} -> {:error, err}
Expand Down Expand Up @@ -74,12 +74,10 @@ defmodule Tds do
end

def transaction(pid, fun, opts \\ []) do
case DBConnection.transaction(pid, fun, opts) do
{:ok, result} -> result
err -> err
end
DBConnection.transaction(pid, fun, opts)
end

@spec rollback(DBConnection.t, reason :: any) :: no_return
defdelegate rollback(conn, any), to: DBConnection

def child_spec(opts) do
Expand Down
88 changes: 43 additions & 45 deletions lib/tds/messages.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Tds.Messages do
defrecord :msg_ready, [:status]
defrecord :msg_sql, [:query]
defrecord :msg_trans, [:trans]
defrecord :msg_transmgr, [:command]
defrecord :msg_transmgr, [:command, :name]
defrecord :msg_sql_result, [:columns, :rows, :done]
defrecord :msg_sql_empty, []
defrecord :msg_rpc, [:proc, :query, :params]
Expand Down Expand Up @@ -325,34 +325,20 @@ defmodule Tds.Messages do
# pak
end

defp encode(msg_transmgr(command: "TM_BEGIN_XACT"), %{trans: trans}) do
encode_trans(5, trans)
end
defp encode(msg_transmgr(command: "TM_COMMIT_XACT"), %{trans: trans}) do
q_ucs = <<7::little-size(2)-unit(8)>>
_req_type = q_ucs

# Transaction Descriptor header
header_type = <<2::little-size(2)-unit(8)>>
trans_size = byte_size(trans)
padding = 8 - trans_size
transaction_descriptor = trans <> <<0::size(padding)-unit(8)>>
outstanding_request_count = <<1::little-size(4)-unit(8)>>

td_header =
header_type <> transaction_descriptor <> outstanding_request_count

td_header_len = byte_size(td_header) + 4
td_header = <<td_header_len::little-size(4)-unit(8)>> <> td_header

headers = td_header
total_length = byte_size(headers) + 4
all_headers = <<total_length::little-size(32)>> <> headers
data = all_headers <> q_ucs <> <<0::size(2)-unit(8)>>
encode_packets(0x0E, data, [])
encode_trans(7, trans)
end
defp encode(msg_transmgr(command: "TM_ROLLBACK_XACT", name: name), %{trans: trans}) do
encode_trans(8, trans, name)
end
defp encode(msg_transmgr(command: "TM_SAVE_XACT", name: name), %{trans: trans}) do
encode_trans(9, trans, name)
end

defp encode(msg_transmgr(command: "TM_BEGIN_XACT"), %{trans: trans}) do
q_ucs = <<5::little-size(2)-unit(8)>>
_req_type = q_ucs

def encode_trans(request_type, trans, savepoint \\ nil) do
# Transaction Descriptor header
header_type = <<2::little-size(2)-unit(8)>>
trans_size = byte_size(trans)
Expand All @@ -369,32 +355,44 @@ defmodule Tds.Messages do
headers = td_header
total_length = byte_size(headers) + 4
all_headers = <<total_length::little-size(32)>> <> headers
data = all_headers <> q_ucs <> <<0::size(2)-unit(8)>>
request_payload = encode_trans_request(request_type, savepoint)
data = all_headers <> <<request_type::little-size(2)-unit(8)>> <> request_payload
encode_packets(0x0E, data, [])
end

defp encode(msg_transmgr(command: "TM_ROLLBACK_XACT"), %{trans: trans}) do
q_ucs = <<8::little-size(2)-unit(8)>>
_req_type = q_ucs
@trans_iso_no_isolation_level 0x00
# @trans_iso_read_uncommited 0x01
# @trans_iso_read_commited 0x02
# @trans_iso_repeatable_read 0x03
# @trans_iso_serializable 0x03
# @trans_iso_snapshot 0x05

# Transaction Descriptor header
header_type = <<2::little-size(2)-unit(8)>>
trans_size = byte_size(trans)
padding = 8 - trans_size
transaction_descriptor = trans <> <<0::size(padding)-unit(8)>>
outstanding_request_count = <<1::little-size(4)-unit(8)>>
@trans_iso_level @trans_iso_no_isolation_level

td_header =
header_type <> transaction_descriptor <> outstanding_request_count

td_header_len = byte_size(td_header) + 4
td_header = <<td_header_len::little-size(4)-unit(8)>> <> td_header
# begin transaction
defp encode_trans_request(5, _) do
<<@trans_iso_level::size(1)-unit(8), 0x0::size(1)-unit(8)>>
end
# commit transaction
defp encode_trans_request(7, _) do
<<0x00::size(2)-unit(8)>>
end
# rollback transaction
defp encode_trans_request(8, savepoint) when savepoint > 0 do
# rollback to save point

headers = td_header
total_length = byte_size(headers) + 4
all_headers = <<total_length::little-size(32)>> <> headers
data = all_headers <> q_ucs <> <<0::size(2)-unit(8)>>
encode_packets(0x0E, data, [])
<<
2::unsigned-8, savepoint::little-size(2)-unit(8),
0x0::size(1)-unit(8)
>>
end
defp encode_trans_request(8, _) do
<<0x00::size(2)-unit(8)>>
end
# save trans [name]
defp encode_trans_request(9, savepoint) when is_number(savepoint) do
<<2::unsigned-8, savepoint::little-size(2)-unit(8)>>
end

defp encode_rpc(:sp_executesql, params) do
Expand Down
73 changes: 56 additions & 17 deletions lib/tds/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ defmodule Tds.Protocol do
result: nil,
query: nil,
transaction: nil,
env: %{trans: <<0x00>>}
env: %{trans: <<0x00>>, savepoint: 0}

def connect(opts) do
opts =
Expand Down Expand Up @@ -131,22 +131,49 @@ defmodule Tds.Protocol do
send_close(query, params, s)
end

def handle_begin(_opts, %{sock: _sock} = s) do
send_transaction("TM_BEGIN_XACT", %{s | transaction: :started})
def handle_begin(opts, %{sock: _, env: env}=s) do
case Keyword.get(opts, :mode, :transaction) do
:transaction ->
send_transaction("TM_BEGIN_XACT", nil, %{s|transaction: :started})

:savepoint ->
savepoint = env.savepoint + 1
env = %{env| savepoint: savepoint}
s = %{s | transaction: :started, env: env}
send_transaction("TM_SAVE_XACT", savepoint, s)
end
end

def handle_commit(_opts, %{transaction: status} = s) do
case status do
:failed ->
handle_rollback([], s)
def handle_commit(opts, %{transaction: transaction} = s) do
case Keyword.get(opts, :mode, :transaction) do
:transaction when transaction == :failed ->
handle_rollback(opts, s)

:transaction ->
send_transaction("TM_COMMIT_XACT", nil, %{s | transaction: :successful})

:savepoint when transaction == :failed ->
handle_rollback(opts, s)

:savepoint ->
# we don't need to call release savepoint as in postgresql for instance,
# when transaction DIDN'T failed. SQL will wait for
{:ok, %Tds.Result{rows: [], num_rows: 0}, s}

_ ->
send_transaction("TM_COMMIT_XACT", %{s | transaction: :successful})
end
end

def handle_rollback(_opts, %{sock: _sock} = s) do
send_transaction("TM_ROLLBACK_XACT", %{s | transaction: :failed})
def handle_rollback(opts, %{sock: _sock, env: env} = s) do
case Keyword.get(opts, :mode, :transaction) do
:transaction ->
env = %{env| savepoint: 0}
s = %{s | transaction: :failed, env: env}
send_transaction("TM_ROLLBACK_XACT", 0, s)

:savepoint ->
send_transaction("TM_ROLLBACK_XACT", env.savepoint, %{s | transaction: :failed})

end
end

def handle_first(_query, _cursor, _opts, state) do
Expand Down Expand Up @@ -454,8 +481,8 @@ defmodule Tds.Protocol do
end
end

def send_transaction(command, s) do
msg = msg_transmgr(command: command)
def send_transaction(command, name, s) do
msg = msg_transmgr(command: command, name: name)

case msg_send(msg, s) do
{:ok, %{result: result} = s} ->
Expand Down Expand Up @@ -667,10 +694,10 @@ defmodule Tds.Protocol do
{:ok, %{s | state: :executing, result: result}}
end

def message(:executing, msg_trans(trans: trans), %{} = s) do
def message(:executing, msg_trans(trans: trans), %{env: env} = s) do
result = %Tds.Result{columns: [], rows: [], num_rows: 0}

{:ok, %{s | state: :ready, result: result, env: %{trans: trans}}}
{:ok, %{s | state: :ready, result: result, env: %{trans: trans, savepoint: env.savepoint}}}
end

def message(:executing, msg_prepared(params: params), %{} = s) do
Expand Down Expand Up @@ -756,8 +783,20 @@ defmodule Tds.Protocol do
(buffer <> header)
|> package_recv(s, length - 8)

{:ok, _} ->
raise("Other statuses todo!")
{:ok, <<
_type::int8,
status::int8,
length::int16,
_spid::int16,
_package::int8,
_window::int8
>> = header } ->
(buffer <> header)
|> package_recv(s, length - 8)
raise "Status #{inspect(status)} of tds package is not yer supported!"

{:error, :closed} ->
raise DBConnection.ConnectionError, "connection is closed"

{:error, exception} ->
{:disconnect, Tds.Error.exception(exception), s}
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Tds.Mixfile do
def project do
[
app: :tds,
version: "1.1.1",
version: "1.1.2",
elixir: "~> 1.0",
deps: deps(),
test_coverage: [tool: ExCoveralls],
Expand Down
5 changes: 3 additions & 2 deletions test/transaction_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Tds.TransactionTest do
use ExUnit.Case, async: true
# import ExUnit.CaptureLog
import ExUnit.CaptureLog
import Tds.TestHelper

setup context do
Expand Down Expand Up @@ -31,7 +31,7 @@ defmodule Tds.TransactionTest do
assert {:ok, %Tds.Result{columns: [""], num_rows: 1, rows: ['*']}} =
Tds.query(conn, "SELECT 42", [])
:hi
end) == :hi
end) == {:ok, :hi}
assert [[42]] = query("SELECT 42", [])
end

Expand Down Expand Up @@ -66,6 +66,7 @@ defmodule Tds.TransactionTest do
# end
# end

# @tag :transaction_disconnec
# @tag mode: :transaction
# test "idle status during transaction returns error and disconnects", context do
# Process.flag(:trap_exit, true)
Expand Down

0 comments on commit 5b3db0f

Please sign in to comment.