Skip to content

Commit

Permalink
python: Politely handle misuse of table.condition.
Browse files Browse the repository at this point in the history
Before 46d44cf, it was technically possible to assign a monitor
condition directly to Idl.tables[table_name].condition. If done
before the connection was established, it would successfully apply
the condition (where cond_change() actually would fail).

Although this wasn't meant to be supported, several OpenStack
projects made use of this. After 46d44cf, .condition is no
longer a list, but a ConditionState. Assigning a list to it breaks
the Idl.

The Neutron and ovsdbapp projects have patches in-flight to
use Idl.cond_change() if ConditionState exists, as it now works
before connection as well, but here could be other users that also
start failing when upgrading to OVS 2.17.

Instead of directly adding attributes to TableSchema, this adds
the IdlTable/IdlColumn objects which hold Idl-specific data and
adds a 'condition' property to TableSchema that maintains the old
interface.

Fixes: 46d44cf ("python: idl: Add monitor_cond_since support.")
Signed-off-by: Terry Wilson <[email protected]>
Acked-by: Dumitru Ceara <[email protected]>
Acked-By: Timothy Redaelli <[email protected]>
Signed-off-by: Ilya Maximets <[email protected]>
  • Loading branch information
otherwiseguy authored and igsilya committed Apr 26, 2022
1 parent b21e280 commit 4e3966e
Showing 1 changed file with 67 additions and 35 deletions.
102 changes: 67 additions & 35 deletions python/ovs/db/idl.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,47 @@ def reset(self):
return False


class IdlTable(object):
def __init__(self, idl, table):
assert(isinstance(table, ovs.db.schema.TableSchema))
self._table = table
self.need_table = False
self.rows = custom_index.IndexedRows(self)
self.idl = idl
self._condition_state = ConditionState()
self.columns = {k: IdlColumn(v) for k, v in table.columns.items()}

def __getattr__(self, attr):
return getattr(self._table, attr)

@property
def condition_state(self):
# read-only, no setter
return self._condition_state

@property
def condition(self):
return self.condition_state.latest

@condition.setter
def condition(self, condition):
assert(isinstance(condition, list))
self.idl.cond_change(self.name, condition)

@classmethod
def schema_tables(cls, idl, schema):
return {k: cls(idl, v) for k, v in schema.tables.items()}


class IdlColumn(object):
def __init__(self, column):
self._column = column
self.alert = True

def __getattr__(self, attr):
return getattr(self._column, attr)


class Idl(object):
"""Open vSwitch Database Interface Definition Language (OVSDB IDL).
Expand Down Expand Up @@ -241,7 +282,7 @@ def __init__(self, remote, schema_helper, probe_interval=None,
assert isinstance(schema_helper, SchemaHelper)
schema = schema_helper.get_idl_schema()

self.tables = schema.tables
self.tables = IdlTable.schema_tables(self, schema)
self.readonly = schema.readonly
self._db = schema
remotes = self._parse_remotes(remote)
Expand Down Expand Up @@ -282,15 +323,6 @@ def __init__(self, remote, schema_helper, probe_interval=None,
self.cond_changed = False
self.cond_seqno = 0

for table in schema.tables.values():
for column in table.columns.values():
if not hasattr(column, 'alert'):
column.alert = True
table.need_table = False
table.rows = custom_index.IndexedRows(table)
table.idl = self
table.condition = ConditionState()

def _parse_remotes(self, remote):
# If remote is -
# "tcp:10.0.0.1:6641,unix:/tmp/db.sock,t,s,tcp:10.0.0.2:6642"
Expand Down Expand Up @@ -330,7 +362,7 @@ def close(self):
def ack_conditions(self):
"""Mark all requested table conditions as acked"""
for table in self.tables.values():
table.condition.ack()
table.condition_state.ack()

def sync_conditions(self):
"""Synchronize condition state when the FSM is restarted
Expand Down Expand Up @@ -361,10 +393,10 @@ def sync_conditions(self):

for table in self.tables.values():
if ack_all:
table.condition.request()
table.condition.ack()
table.condition_state.request()
table.condition_state.ack()
else:
if table.condition.reset():
if table.condition_state.reset():
self.last_id = str(uuid.UUID(int=0))
self.cond_changed = True

Expand Down Expand Up @@ -485,7 +517,7 @@ def run(self):
sh.register_table(self._server_db_table)
schema = sh.get_idl_schema()
self._server_db = schema
self.server_tables = schema.tables
self.server_tables = IdlTable.schema_tables(self, schema)
self.__send_server_monitor_request()
except error.Error as e:
vlog.err("%s: error receiving server schema: %s"
Expand Down Expand Up @@ -591,10 +623,10 @@ def compose_cond_change(self):
for table in self.tables.values():
# Always use the most recent conditions set by the IDL client when
# requesting monitor_cond_change
if table.condition.new is not None:
if table.condition_state.new is not None:
change_requests[table.name] = [
{"where": table.condition.new}]
table.condition.request()
{"where": table.condition_state.new}]
table.condition_state.request()

if not change_requests:
return
Expand Down Expand Up @@ -630,19 +662,20 @@ def cond_change(self, table_name, cond):
cond = [False]

# Compare the new condition to the last known condition
if table.condition.latest != cond:
table.condition.init(cond)
if table.condition_state.latest != cond:
table.condition_state.init(cond)
self.cond_changed = True

# New condition will be sent out after all already requested ones
# are acked.
if table.condition.new:
any_reqs = any(t.condition.request for t in self.tables.values())
if table.condition_state.new:
any_reqs = any(t.condition_state.request
for t in self.tables.values())
return self.cond_seqno + int(any_reqs) + 1

# Already requested conditions should be up to date at
# self.cond_seqno + 1 while acked conditions are already up to date
return self.cond_seqno + int(bool(table.condition.requested))
return self.cond_seqno + int(bool(table.condition_state.requested))

def wait(self, poller):
"""Arranges for poller.block() to wake up when self.run() has something
Expand Down Expand Up @@ -814,8 +847,8 @@ def __send_monitor_request(self, max_version=Monitor.monitor_cond_since):
columns.append(column)
monitor_request = {"columns": columns}
if method in ("monitor_cond", "monitor_cond_since") and (
not ConditionState.is_true(table.condition.acked)):
monitor_request["where"] = table.condition.acked
not ConditionState.is_true(table.condition_state.acked)):
monitor_request["where"] = table.condition_state.acked
monitor_requests[table.name] = [monitor_request]

args = [self._db.name, str(self.uuid), monitor_requests]
Expand Down Expand Up @@ -1151,13 +1184,6 @@ def __txn_process_reply(self, msg):
return True


def _uuid_to_row(atom, base):
if base.ref_table:
return base.ref_table.rows.get(atom)
else:
return atom


def _row_to_uuid(value):
if isinstance(value, Row):
return value.uuid
Expand Down Expand Up @@ -1271,6 +1297,12 @@ def __str__(self):
data=", ".join("{col}={val}".format(col=c, val=getattr(self, c))
for c in sorted(self._table.columns)))

def _uuid_to_row(self, atom, base):
if base.ref_table:
return self._idl.tables[base.ref_table.name].rows.get(atom)
else:
return atom

def __getattr__(self, column_name):
assert self._changes is not None
assert self._mutations is not None
Expand Down Expand Up @@ -1312,7 +1344,7 @@ def __getattr__(self, column_name):
datum = data.Datum.from_python(column.type, dlist,
_row_to_uuid)
elif column.type.is_map():
dmap = datum.to_python(_uuid_to_row)
dmap = datum.to_python(self._uuid_to_row)
if inserts is not None:
dmap.update(inserts)
if removes is not None:
Expand All @@ -1329,7 +1361,7 @@ def __getattr__(self, column_name):
else:
datum = inserts

return datum.to_python(_uuid_to_row)
return datum.to_python(self._uuid_to_row)

def __setattr__(self, column_name, value):
assert self._changes is not None
Expand Down Expand Up @@ -1413,7 +1445,7 @@ def delkey(self, column_name, key, value=None):
if value:
try:
old_value = data.Datum.to_python(self._data[column_name],
_uuid_to_row)
self._uuid_to_row)
except error.Error:
return
if key not in old_value:
Expand Down

0 comments on commit 4e3966e

Please sign in to comment.