forked from django/django
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_utils.py
139 lines (124 loc) · 5.43 KB
/
test_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""Tests for django.db.backends.utils"""
from decimal import Decimal, Rounded
from django.db import NotSupportedError, connection
from django.db.backends.utils import (
format_number,
split_identifier,
split_tzname_delta,
truncate_name,
)
from django.test import (
SimpleTestCase,
TransactionTestCase,
skipIfDBFeature,
skipUnlessDBFeature,
)
class TestUtils(SimpleTestCase):
def test_truncate_name(self):
self.assertEqual(truncate_name("some_table", 10), "some_table")
self.assertEqual(truncate_name("some_long_table", 10), "some_la38a")
self.assertEqual(truncate_name("some_long_table", 10, 3), "some_loa38")
self.assertEqual(truncate_name("some_long_table"), "some_long_table")
# "user"."table" syntax
self.assertEqual(
truncate_name('username"."some_table', 10), 'username"."some_table'
)
self.assertEqual(
truncate_name('username"."some_long_table', 10), 'username"."some_la38a'
)
self.assertEqual(
truncate_name('username"."some_long_table', 10, 3), 'username"."some_loa38'
)
def test_split_identifier(self):
self.assertEqual(split_identifier("some_table"), ("", "some_table"))
self.assertEqual(split_identifier('"some_table"'), ("", "some_table"))
self.assertEqual(
split_identifier('namespace"."some_table'), ("namespace", "some_table")
)
self.assertEqual(
split_identifier('"namespace"."some_table"'), ("namespace", "some_table")
)
def test_format_number(self):
def equal(value, max_d, places, result):
self.assertEqual(format_number(Decimal(value), max_d, places), result)
equal("0", 12, 3, "0.000")
equal("0", 12, 8, "0.00000000")
equal("1", 12, 9, "1.000000000")
equal("0.00000000", 12, 8, "0.00000000")
equal("0.000000004", 12, 8, "0.00000000")
equal("0.000000008", 12, 8, "0.00000001")
equal("0.000000000000000000999", 10, 8, "0.00000000")
equal("0.1234567890", 12, 10, "0.1234567890")
equal("0.1234567890", 12, 9, "0.123456789")
equal("0.1234567890", 12, 8, "0.12345679")
equal("0.1234567890", 12, 5, "0.12346")
equal("0.1234567890", 12, 3, "0.123")
equal("0.1234567890", 12, 1, "0.1")
equal("0.1234567890", 12, 0, "0")
equal("0.1234567890", None, 0, "0")
equal("1234567890.1234567890", None, 0, "1234567890")
equal("1234567890.1234567890", None, 2, "1234567890.12")
equal("0.1234", 5, None, "0.1234")
equal("123.12", 5, None, "123.12")
with self.assertRaises(Rounded):
equal("0.1234567890", 5, None, "0.12346")
with self.assertRaises(Rounded):
equal("1234567890.1234", 5, None, "1234600000")
def test_split_tzname_delta(self):
tests = [
("Asia/Ust+Nera", ("Asia/Ust+Nera", None, None)),
("Asia/Ust-Nera", ("Asia/Ust-Nera", None, None)),
("Asia/Ust+Nera-02:00", ("Asia/Ust+Nera", "-", "02:00")),
("Asia/Ust-Nera+05:00", ("Asia/Ust-Nera", "+", "05:00")),
("America/Coral_Harbour-01:00", ("America/Coral_Harbour", "-", "01:00")),
("America/Coral_Harbour+02:30", ("America/Coral_Harbour", "+", "02:30")),
("UTC+15:00", ("UTC", "+", "15:00")),
("UTC-04:43", ("UTC", "-", "04:43")),
("UTC", ("UTC", None, None)),
("UTC+1", ("UTC+1", None, None)),
]
for tzname, expected in tests:
with self.subTest(tzname=tzname):
self.assertEqual(split_tzname_delta(tzname), expected)
class CursorWrapperTests(TransactionTestCase):
available_apps = []
def _test_procedure(self, procedure_sql, params, param_types, kparams=None):
with connection.cursor() as cursor:
cursor.execute(procedure_sql)
# Use a new cursor because in MySQL a procedure can't be used in the
# same cursor in which it was created.
with connection.cursor() as cursor:
cursor.callproc("test_procedure", params, kparams)
with connection.schema_editor() as editor:
editor.remove_procedure("test_procedure", param_types)
@skipUnlessDBFeature("create_test_procedure_without_params_sql")
def test_callproc_without_params(self):
self._test_procedure(
connection.features.create_test_procedure_without_params_sql, [], []
)
@skipUnlessDBFeature("create_test_procedure_with_int_param_sql")
def test_callproc_with_int_params(self):
self._test_procedure(
connection.features.create_test_procedure_with_int_param_sql,
[1],
["INTEGER"],
)
@skipUnlessDBFeature(
"create_test_procedure_with_int_param_sql", "supports_callproc_kwargs"
)
def test_callproc_kparams(self):
self._test_procedure(
connection.features.create_test_procedure_with_int_param_sql,
[],
["INTEGER"],
{"P_I": 1},
)
@skipIfDBFeature("supports_callproc_kwargs")
def test_unsupported_callproc_kparams_raises_error(self):
msg = (
"Keyword parameters for callproc are not supported on this database "
"backend."
)
with self.assertRaisesMessage(NotSupportedError, msg):
with connection.cursor() as cursor:
cursor.callproc("test_procedure", [], {"P_I": 1})