Skip to content

Commit

Permalink
python/tests: add simple dcerpc co_cancel tests
Browse files Browse the repository at this point in the history
CO_CANCEL is mostly ignored. It's up to the application server
implementation to install a cancel handler.

The only implementation I found so far is the witness server
(see [MS-SWN] WitnessrAsyncNotify), which triggers a FAULT
with DCERPC_FAULT_SERVER_UNAVAILABLE.

Signed-off-by: Stefan Metzmacher <[email protected]>
Reviewed-by: Andreas Schneider <[email protected]>
  • Loading branch information
metze-samba authored and cryptomilk committed Oct 26, 2016
1 parent 04c9343 commit 3c474cd
Showing 1 changed file with 74 additions and 0 deletions.
74 changes: 74 additions & 0 deletions python/samba/tests/dcerpc/raw_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -2317,6 +2317,80 @@ def test_mix_requests(self):
self.assertEquals(rep.u.reserved, 0)
self.assertEquals(len(rep.u.error_and_verifier), 0)

def test_co_cancel_no_request(self):
ndr32 = base.transfer_syntax_ndr()
abstract = samba.dcerpc.mgmt.abstract_syntax()
ctx = self.prepare_presentation(abstract, ndr32, context_id=0xff)

req = self.generate_co_cancel(call_id = 3)
self.send_pdu(req)
rep = self.recv_pdu(timeout=0.01)
self.assertIsNone(rep)
self.assertIsConnected()

# And now try a request
req = self.generate_request(call_id = 1,
context_id=ctx.context_id,
opnum=0,
stub="")
self.send_pdu(req)
rep = self.recv_pdu()
self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
auth_length=0)
self.assertNotEquals(rep.u.alloc_hint, 0)
self.assertEquals(rep.u.context_id, req.u.context_id)
self.assertEquals(rep.u.cancel_count, 0)
self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)

def test_co_cancel_request_after_first(self):
ndr32 = base.transfer_syntax_ndr()
abstract = samba.dcerpc.mgmt.abstract_syntax()
ctx = self.prepare_presentation(abstract, ndr32, context_id=0xff)

req = self.generate_request(call_id = 1,
pfc_flags=dcerpc.DCERPC_PFC_FLAG_FIRST,
context_id=ctx.context_id,
opnum=0,
stub="")
self.send_pdu(req)
rep = self.recv_pdu(timeout=0.01)
self.assertIsNone(rep)
self.assertIsConnected()

req = self.generate_co_cancel(call_id = 1)
self.send_pdu(req)
rep = self.recv_pdu(timeout=0.01)
self.assertIsNone(rep)
self.assertIsConnected()

req = self.generate_request(call_id = 1,
pfc_flags=dcerpc.DCERPC_PFC_FLAG_LAST,
context_id=ctx.context_id,
opnum=0,
stub="")
self.send_pdu(req)
rep = self.recv_pdu()
self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
auth_length=0)
self.assertNotEquals(rep.u.alloc_hint, 0)
self.assertEquals(rep.u.context_id, req.u.context_id)
self.assertEquals(rep.u.cancel_count, 0)
self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)

# And now try a request
req = self.generate_request(call_id = 2,
context_id=ctx.context_id,
opnum=0,
stub="")
self.send_pdu(req)
rep = self.recv_pdu()
self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
auth_length=0)
self.assertNotEquals(rep.u.alloc_hint, 0)
self.assertEquals(rep.u.context_id, req.u.context_id)
self.assertEquals(rep.u.cancel_count, 0)
self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)

def test_spnego_connect_request(self):
ndr32 = base.transfer_syntax_ndr()

Expand Down

0 comments on commit 3c474cd

Please sign in to comment.