Skip to content
/ pycond Public

Lightweight condition parsing and building of evaluation expressions

License

Notifications You must be signed in to change notification settings

axiros/pycond

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

author version
gk
20220309

pycond: Lightweight Declarative Condition Expressions

Build Status codecovPyPI    version

Table Of Contents

You have a bunch of data, possibly streaming...

id,first_name,last_name,email,gender,ip_address
1,Rufe,Morstatt,[email protected],Male,216.70.69.120
2,Kaela,Scott,[email protected],Female,73.248.145.44,2
(...)

... and you need to filter. For now lets say we have them already as list of dicts.

You can do it imperatively:

foo_users = [
    u
    for u in users
    if (u['gender'] == 'Male' or u['last_name'] == 'Scott') and '@' in u['email']
]

or you have this module assemble a condition function from a declaration like:

from pycond import make_filter
cond = 'email contains .de and gender eq Male or last_name eq Scott'
is_foo = make_filter(cond) # the built filter function is first

and then apply as often as you need, against varying state / facts / models (...):

foo_users = filter(is_foo, users)

with roughly the same performance (factor 2-3) than the handcrafted python.

In real life performance is often better then using imperative code, due to pycond's lazy evaluation feature.

When the developer can decide upon the filters to apply on data he'll certainly use Python's excellent expressive possibilities directly, e.g. as shown above through list comprehensions.
But what if the filtering conditions are based on decisions outside of the program's control? I.e. from an end user, hitting the program via the network, in a somehow serialized form, which is rarely directly evaluatable Python.

This is the main use case for this module.

But why yet another tool for such a standard job?

There is a list of great tools and frameworks where condition parsing is a (small) part of them, e.g. pyke or durable and many in the django world or from SQL statement parsers.

1. I just needed a very slim tool for only the parsing into functions - but this pretty transparent and customizable

pycond allows to customize

  • the list of condition operators
  • the list of combination operators
  • the general behavior of condition operators via global or condition local wrappers
  • their names
  • the tokenizer
  • the value lookup function

and ships as zero dependency single module.

All evaluation is done via partials and not lambdas, i.e. operations can be introspected and debugged very simply, through breakpoints or custom logging operator or lookup wrappers.

2. Simplicity of the grammar: Easy to type directly, readable by non programmers but also synthesisable from structured data, e.g. from a web framework.

3. Performance: Good enough to have "pyconditions" used within stream filters. With the current feature set we are sometimes a factor 2-3 worse but (due to lazy eval) often better, compared with handcrafted list comprehensions.

pycond parses the condition expressions according to a set of constraints given to the parser in the tokenizer function. The result of the tokenizer is given to the builder.

import pycond as pc

expr = '[a eq b and [c lt 42 or foo eq bar]]'
cond = pc.to_struct(pc.tokenize(expr, sep=' ', brkts='[]'))
print(cond)

# test:
data = [
    {'a': 'b', 'c': 1, 'foo': 42},
    {'a': 'not b', 'c': 1},
]
filtered = list(filter(pc.make_filter(expr), data))
print('matching:', filtered)
return cond, len(filtered)

Output:

[['a', 'eq', 'b', 'and', ['c', 'lt', '42', 'or', 'foo', 'eq', 'bar']]]
matching: [{'a': 'b', 'c': 1, 'foo': 42}]

After parsing the builder is assembling a nested set of operator functions, combined via combining operators. The functions are partials, i.e. not yet evaluated but information about the necessary keys is already available:

f, meta = pc.parse_cond('foo eq bar')
assert meta['keys'] == ['foo']

Other processes may deliver condition structures via serializable formats (e.g. json). If you hand such already tokenized constructs to pycond, then the tokenizer is bypassed:

cond = [['a', 'eq', 'b'], 'or', ['c', 'in', ['foo', 'bar']]]
assert pc.pycond(cond)(state={'a': 'b'}) == True
# json support is built in:
cond_as_json = json.dumps(cond)
assert pc.pycond(cond_as_json)(state={'a': 'b'}) == True

The result of the builder is a 'pycondition', which can be run many times against varying state of the system. How state is evaluated is customizable at build and run time.

The default is to get lookup keys within expressions from an initially empty State dict within the module - which is not thread safe, i.e. not to be used in async or non cooperative multitasking environments.

f = pc.pycond('foo eq bar')
assert f() == False
pc.State['foo'] = 'bar'  # not thread safe!
assert f() == True

(pycond is a shortcut for parse_cond, when meta infos are not required).

Use the state argument at evaluation:

assert pc.pycond('a gt 2')(state={'a': 42}) == True
assert pc.pycond('a gt 2')(state={'a': -2}) == False

You may supply a path seperator for diving into nested structures like so:

m = {'a': {'b': [{'c': 1}]}}
assert pc.pycond('a.b.0.c', deep='.')(state=m) == True
assert pc.pycond('a.b.1.c', deep='.')(state=m) == False
assert pc.pycond('a.b.0.c eq 1', deep='.')(state=m) == True
# convencience argument for string conditions:
assert pc.pycond('deep: a.b.0.c')(state=m) == True

# This is how you express deep access via structured conditions:
assert pc.pycond([('a', 'b', 0, 'c'), 'eq', 1])(state=m) == True

# Since tuples are not transferrable in json, we also allow deep paths as list:
# We apply heuristics to exclude expressions or conditions:
c = [[['a', 'b', 0, 'c'], 'eq', 1], 'and', 'a']
f, nfos = pc.parse_cond(c)
# sorting order for keys: tuples at end, sorted by len, rest default py sorted:
assert f(state=m) == True and nfos['keys'] == ['a', ('a', 'b', 0, 'c')]
print(nfos)

Output:

{'keys': ['a', ('a', 'b', 0, 'c')]}

When data is passed through processing pipelines, it often is passed with headers. So it may be useful to pass a global prefix to access the payload like so:

m = {'payload': {'b': [{'c': 1}], 'id': 123}}
assert pc.pycond('b.0.c', deep='.', prefix='payload')(state=m) == True

Since version 20210221 we try attributes when objects are not dicts:

class MyObj:
    val = {'a': 'b'}

m = {'payload': {'obj': MyObj()}}
cond = [['obj.val.a', 'eq', 'b']]
assert pc.pycond(cond, deep='.', prefix='payload')(state=m) == True

Perf Tip: When you have deep nested class or object hirarchies, then a custom lookup function will be faster than pycond's default lookup, which splits the key into parts, then works its way in via getitem, getattr, from root.

You can supply your own function for value acquisition.

  • Signature: See example.
  • Returns: The value for the key from the current state plus the compare value for the operator function.
# must return a (key, value) tuple:
model = {'eve': {'last_host': 'somehost'}}

def my_lu(k, v, req, user, model=model):
    print('user check. locals:', dict(locals()))
    return (model.get(user) or {}).get(k), req[v]

f = pc.pycond('last_host eq host', lookup=my_lu)

req = {'host': 'somehost'}
assert f(req=req, user='joe') == False
assert f(req=req, user='eve') == True

Output:

user check. locals: {'k': 'last_host', 'v': 'host', 'req': {'host': 'somehost'}, 'user': 'joe', 'model': {'eve': {'last_host': 'somehost'}}}
user check. locals: {'k': 'last_host', 'v': 'host', 'req': {'host': 'somehost'}, 'user': 'eve', 'model': {'eve': {'last_host': 'somehost'}}}

as you can see in the example, the state parameter is just a convention for pyconds' default lookup function.

This is avoiding unnecessary calculations in many cases:

When an evaluation branch contains an "and" or "and_not" combinator, then at runtime we evaluate the first expression - and stop if it is already False. Same when first expression is True, followed by "or" or "or_not".

That way expensive deep branch evaluations are omitted or, when the lookup is done lazy, the values won't be even fetched:

evaluated = []

def myget(key, val, cfg, state=None, **kw):
    evaluated.append(key)
    return pc.state_get(key, val, cfg, state, **kw)

f = pc.pycond('[a eq b] or foo eq bar and baz eq bar', lookup=myget)
assert f(state={'foo': 42}) == False
# the value for "baz" is not even fetched and the whole (possibly
# deep) branch after the last and is ignored:
assert evaluated == ['a', 'foo']
print(evaluated)
evaluated.clear()

f = pc.pycond('[[a eq b] or foo eq bar] and baz eq bar', lookup=myget)
assert f(state={'a': 'b', 'baz': 'bar'}) == True
# the value for "baz" is not even fetched and the whole (possibly
# deep) branch after the last and is ignored:
assert evaluated == ['a', 'baz']
print(evaluated)

Output:

['a', 'foo']
['a', 'baz']

Remember that all keys occurring in a condition (which may be provided by the user at runtime) are returned by the condition parser. Means that building of evaluation contexts can be done, based on the data actually needed and not more.

pycond provides a key getter which prints out every lookup.

f = pc.pycond('[[a eq b] or foo eq bar] or [baz eq bar]', lookup=pc.dbg_get)
assert f(state={'foo': 'bar'}) == True

Output:

Lookup: a b -> None
Lookup: foo bar -> bar

Insert booleans like shown:

f = pc.pycond(['foo', 'and', ['bar', 'eq', 1]])
assert f(state={'foo': 1}) == False
f = pc.pycond(['foo', 'and', [True, 'or', ['bar', 'eq', 1]]])
assert f(state={'foo': 1}) == True

Condition functions are created internally from structured expressions - but those are hard to type, involving many apostropies.

The text based condition syntax is intended for situations when end users type them into text boxes directly.

Combine atomic conditions with boolean operators and nesting brackets like:

[  <atom1> <and|or|and not|...> <atom2> ] <and|or...> [ [ <atom3> ....
[not] <lookup_key> [ [rev] [not] <condition operator (co)> <value> ]
  • When just lookup_key is given, then co is set to the truthy function:
def truthy(key, val=None):
    return operatur.truth(k)

so such an expression is valid and True:

pc.State.update({'foo': 1, 'bar': 'a', 'baz': []})
assert pc.pycond('[ foo and bar and not baz]')() == True
  • When not lookup_key is given, then co is set to the falsy function:
m = {'x': 'y', 'falsy_val': {}}
# normal way
assert pc.pycond(['foo', 'eq', None])(state=m) == True
# using "not" as prefix:
assert pc.pycond('not foo')(state=m) == True
assert pc.pycond(['not', 'foo'])(state=m) == True
assert pc.pycond('not falsy_val')(state=m) == True
assert pc.pycond('x and not foo')(state=m) == True
assert pc.pycond('y and not falsy_val')(state=m) == False

All boolean standardlib operators are available by default:

from pytest2md import html_table as tbl  # just a table gen.
from pycond import get_ops

for k in 'nr', 'str':
    s = 'Default supported ' + k + ' operators...(click to extend)'
    print(tbl(get_ops()[k], [k + ' operator', 'alias'], summary=s))
Default supported nr operators...(click to extend)
nr operatoralias
add+
and_&
eq==
floordiv//
ge>=
gt>
iadd+=
iand&=
ifloordiv//=
ilshift<<=
imod%=
imul*=
ior|=
ipow**=
irshift>>=
is_is
is_notis
isub-=
itruediv/=
ixor^=
le<=
lshift<<
lt<
mod%
mul*
ne!=
or_|
pow**
rshift>>
sub-
truediv/
xor^
itemgetter
length_hint
Default supported str operators...(click to extend)
str operatoralias
attrgetter
concat+
contains
countOf
iconcat+=
indexOf
methodcaller

By default pycond uses text style operators.

  • ops_use_symbolic switches processwide to symbolic style only.
  • ops_use_symbolic_and_txt switches processwide to both notations allowed.
pc.ops_use_symbolic()
pc.State['foo'] = 'bar'
assert pc.pycond('foo == bar')() == True
try:
    # this raises now, text ops not known anymore:
    pc.pycond('foo eq bar')
except:
    pc.ops_use_symbolic_and_txt(allow_single_eq=True)
    assert pc.pycond('foo = bar')() == True
    assert pc.pycond('foo == bar')() == True
    assert pc.pycond('foo eq bar')() == True
    assert pc.pycond('foo != baz')() == True

Operator namespace(s) should be assigned at process start, they are global.

pc.OPS['maybe'] = lambda a, b: int(time.time()) % 2
# valid expression now:
assert pc.pycond('a maybe b')() in (True, False)

Negates the result of the condition operator:

pc.State['foo'] = 'abc'
assert pc.pycond('foo eq abc')() == True
assert pc.pycond('foo not eq abc')() == False

Reverses the arguments before calling the operator

pc.State['foo'] = 'abc'
assert pc.pycond('foo contains a')() == True
assert pc.pycond('foo rev contains abc')() == True

rev and not can be combined in any order.

You may globally wrap all evaluation time condition operations through a custom function:

l = []

def hk(f_op, a, b, l=l):
    l.append((getattr(f_op, '__name__', ''), a, b))
    return f_op(a, b)

pc.run_all_ops_thru(hk)  # globally wrap the operators

pc.State.update({'a': 1, 'b': 2, 'c': 3})
f = pc.pycond('a gt 0 and b lt 3 and not c gt 4')
assert l == []
f()
expected_log = [('gt', 1, 0.0), ('lt', 2, 3.0), ('gt', 3, 4.0)]
assert l == expected_log
pc.ops_use_symbolic_and_txt()

You may compose such wrappers via repeated application of the run_all_ops_thru API function.

This is done through the ops_thru parameter as shown:

def myhk(f_op, a, b):
    return True

pc.State['a'] = 1
f = pc.pycond('a eq 2')
assert f() == False
f = pc.pycond('a eq 2', ops_thru=myhk)
assert f() == True

Using ops_thru is a good way to debug unexpected results, since you can add breakpoints or loggers there.

You can combine single conditions with

  • and
  • and not
  • or
  • or not
  • xor by default.

The combining functions are stored in pycond.COMB_OPS dict and may be extended.

Do not use spaces for the names of combining operators. The user may use them but they are replaced at before tokenizing time, like and not -> and_not.

Combined conditions may be arbitrarily nested using brackets "[" and "]".

Via the brkts config parameter you may change those to other separators at build time.

Brackets as strings in this flat list form, e.g. ['[', 'a', 'and' 'b', ']'...]

The tokenizers job is to take apart expression strings for the builder.

Separates the different parts of an expression. Default is ' '.

pc.State['a'] = 42
assert pc.pycond('a.eq.42', sep='.')() == True

sep can be a any single character including binary.

Bracket characters do not need to be separated, the tokenizer will do:

# equal:
assert (
    pc.pycond('[[a eq 42] and b]')() == pc.pycond('[ [ a eq 42 ] and b ]')()
)

The condition functions themselves do not evaluate equal - those had been assembled two times.

By putting strings into Apostrophes you can tell the tokenizer to not further inspect them, e.g. for the seperator:

pc.State['a'] = 'Hello World'
assert pc.pycond('a eq "Hello World"')() == True

Tell the tokenizer to not interpret the next character:

pc.State['b'] = 'Hello World'
assert pc.pycond('b eq Hello\ World')() == True

Expression string values are automatically cast into bools and numbers via the public pycond.py_type function.

This can be prevented by setting the autoconv parameter to False or by using Apostrophes:

pc.State['a'] = '42'
assert pc.pycond('a eq 42')() == False
# compared as string now
assert pc.pycond('a eq "42"')() == True
# compared as string now
assert pc.pycond('a eq 42', autoconv=False)() == True

If you do not want to provide a custom lookup function (where you can do what you want) but want to have looked up keys autoconverted then use:

for id in '1', 1:
    pc.State['id'] = id
    assert pc.pycond('id lt 42', autoconv_lookups=True)

Often the conditions are in user space, applied on data streams under the developer's control only at development time.

The end user might pick only a few keys from many offered within an API.

pycond's ctx_builder allows to only calculate those keys at runtime, the user decided to base conditions upon: At condition build time hand over a namespace for all functions which are available to build the ctx.

pycon will return a context builder function for you, calling only those functions which the condition actually requires.

pc.ops_use_symbolic_and_txt(allow_single_eq=True)

# Condition the end user configured, e.g. at program run time:
cond = [
    ['group_type', 'in', ['lab', 'first1k', 'friendly', 'auto']],
    'and',
    [
        [
            [
                [['cur_q', '<', 0.5], 'and', ['delta_q', '>=', 0.15],],
                'and',
                ['dt_last_enforce', '>', 28800],
            ],
            'and',
            ['cur_hour', 'in', [3, 4, 5]],
        ],
        'or',
        [
            [
                [['cur_q', '<', 0.5], 'and', ['delta_q', '>=', 0.15],],
                'and',
                ['dt_last_enforce', '>', 28800],
            ],
            'and',
            ['clients', '=', 0],
        ],
    ],
]

# Getters for API keys offered to the user, involving potentially
# expensive to fetch context delivery functions:
# Signature must provide minimum a positional for the current
# state:
class ApiCtxFuncs:
    def expensive_but_not_needed_here(ctx):
        raise Exception("Won't run with cond. from above")

    def cur_q(ctx):
        print('Calculating cur_q')
        return 0.1

    def cur_hour(ctx):
        print('Calculating cur_hour')
        return 4

    def dt_last_enforce(ctx):
        print('Calculating dt_last_enforce')
        return 10000000

    def delta_q(ctx):
        print('Calculating (expensive) delta_q')
        time.sleep(0.1)
        return 1

    def clients(ctx):
        print('Calculating clients')
        return 0

if sys.version_info[0] < 3:
    # we don't think it is a good idea to make the getter API stateful:
    p2m.convert_to_staticmethods(ApiCtxFuncs)

f, nfos = pc.parse_cond(cond, ctx_provider=ApiCtxFuncs)

# now we create (incomplete) data..
data1 = {'group_type': 'xxx'}, False
data2 = {'group_type': 'lab'}, True

# this key stores a context builder function, calculating the complete data:
make_ctx = nfos['complete_ctx']

t0 = time.time()
for event, expected in data1, data2:
    assert f(state=make_ctx(event)) == expected

print('Calc.Time (delta_q was called twice):', round(time.time() - t0, 4)),
return cond, ApiCtxFuncs

Output:

Calculating clients
Calculating cur_hour
Calculating cur_q
Calculating (expensive) delta_q
Calculating dt_last_enforce
Calculating clients
Calculating cur_hour
Calculating cur_q
Calculating (expensive) delta_q
Calculating dt_last_enforce
Calc.Time (delta_q was called twice): 0.2009

ContextBuilders are interesting but we can do better.

We still calculated values for keys which might (dependent on the data) be not needed in dead ends of a lazily evaluated condition.

Lets avoid calculating these values, remembering the custom lookup function feature.

This is where lookup providers come in, providing namespaces for functions to be called conditionally.

Pycond treats the condition keys as function names within that namespace and calls them, when needed.

Lookup provider functions may have the following signatures:

class F:
    # simple data passing
    def f1(data):
        """simple return a value being compared, getting passed the state/data"""
        return data['a']

    # simple, with ctx
    def f2(data, **kw):
        """
        simple return a value being compared, getting passed the state/data
        All context information within kw, compare value not modifiable
        """
        return data['b']

    # full pycond compliant signature,
    def f3(key, val, cfg, data, **kw):
        """
        full pycond signature.
        val is the value as defined by the condition, and which you could return modified
        kw holds the cache, cfg holds the setup
        v has to be returned:
        """
        return data['c'], 100  # not 45!

    # applied al
    def f4(*a, **kw):
        """
        Full variant (always when varargs are involved)
        """
        return a[3]['d'], 'foo'

_ = 'and'
f = pc.pycond(
    [
        [':f1', 'eq', 42],
        _,
        [':f2', 'eq', 43, _, ':f3', 'eq', 45],
        _,
        [':f4', 'eq', 'foo'],
    ],
    lookup_provider=F,
)
assert f(state={'a': 42, 'b': 43, 'c': 100, 'd': 'foo'}) == True

Via the 'params' parameter you may supply keyword args to lookup functions:

class F:
    def hello(k, v, cfg, data, count, **kw):
        return data['foo'] == count, 0

m = pc.pycond(
    [':hello'], lookup_provider=F, params={'hello': {'count': 2}}
)(state={'foo': 2})
assert m == True
  • Lookup functions can be found in nested class hirarchies or dicts. Separator is colon (':')
  • As shown above, if they are flat within a toplevel class or dict you should still prefix with ':', to get build time exception (MissingLookupFunction) when not present
  • You can switch that behaviour off per condition build as config arg, as shown below
  • You can switch that behaviour off globally via pc.prefixed_lookup_funcs = False

Warning: This is a breaking API change with pre-20200610 versions, where the prefix was not required to find functions in, back then, only flat namespaces. Use the global switch after import to get the old behaviour.

class F:
    a = lambda data: data['foo']

    class inner:
        b = lambda data: data['bar']

m = {'c': {'d': {'func': lambda data: data['baz']}}}

# for the inner lookup the first prefix may be omitted:
_ = 'and'
cond = [
    [':a', 'eq', 'foo1'],
    _,
    ['inner:b', 'eq', 'bar1'],
    _,
    ['c:d', 'eq', 'baz1',],
]
c = pc.pycond(cond, lookup_provider=F, lookup_provider_dict=m)
assert c(state={'foo': 'foo1', 'bar': 'bar1', 'baz': 'baz1'}) == True

# Prefix checking on / off:
try:
    pc.pycond([':xx', 'and', cond])
    i = 9 / 0  # above will raise this:
except pc.MissingLookupFunction:
    pass
try:
    pc.pycond([':xx', 'and', cond], prefixed_lookup_funcs=False)
    i = 9 / 0  # above will raise this:
except pc.MissingLookupFunction:
    pass
cond[0] = 'a'  # remove prefix, will still be found
c = pc.pycond(
    ['xx', 'or', cond],
    lookup_provider=F,
    lookup_provider_dict=m,
    prefixed_lookup_funcs=False,
)
assert c(state={'foo': 'foo1', 'bar': 'bar1', 'baz': 'baz1'}) == True

You can switch that prefix needs off - and pycond will then check the state for key presence:

# we let pycond generate the lookup function (we use the simple signature type):
f = pc.pycond(
    cond, lookup_provider=ApiCtxFuncs, prefixed_lookup_funcs=False
)
# Same events as above:
data1 = {'group_type': 'xxx'}, False
data2 = {'group_type': 'lab'}, True

t0 = time.time()
for event, expected in data1, data2:
    # we will lookup only once:
    assert f(state=event) == expected

print(
    'Calc.Time (delta_q was called just once):', round(time.time() - t0, 4),
)

# The deep switch keeps working:
cond2 = [cond, 'or', ['a-0-b', 'eq', 42]]
f = pc.pycond(
    cond2,
    lookup_provider=ApiCtxFuncs,
    deep='-',
    prefixed_lookup_funcs=False,
)
data2[0]['a'] = [{'b': 42}]
print('sample:', data2[0])
assert f(state=data2[0]) == True

Output:

Calculating cur_q
Calculating (expensive) delta_q
Calculating dt_last_enforce
Calculating cur_hour
Calc.Time (delta_q was called just once): 0.1004
sample: {'group_type': 'lab', 'a': [{'b': 42}]}
Calculating cur_q
Calculating (expensive) delta_q
Calculating dt_last_enforce
Calculating cur_hour

The output demonstrates that we did not even call the value provider functions for the dead branches of the condition.

NOTE: Instead of providing a class tree you may also provide a dict of functions as lookup_provider_dict argument, see qualify examples below.

Note: Currently you cannot override these defaults. Drop an issue if you need to.

  • Builtin state lookups: Not cached
  • Custom lookup functions: Not cached (you can implement caching within those functions)
  • Lookup provider return values: Cached, i.e. called only once, per data set
  • Named condition sets (see below): Cached

We deliver a few lookup function extensions

  • for time checks
  • for os.environ checks (re-evaluated at runtime)
from datetime import datetime as dt
from os import environ as env

this_sec = dt.now().second
this_utc_hour = dt.utcnow().hour
f = pc.pycond(
    [
        ['env:foo', 'eq', 'bar'],
        'and',
        # not breaking the build when the sec just jumps:
        ['dt:second', 'in', [this_sec, this_sec + 1, 0]],
        'and',
        ['utc:hour', 'eq', this_utc_hour],
    ]
)
env['foo'] = 'bar'
assert f(state={'a': 1}) == True

Instead of just delivering booleans, pycond can be used to determine a whole set of information about data declaratively, like so:

# We accept different forms of delivery.
# The first full text is restricted to simple flat dicts only:
for c in [
    'one: a gt 10, two: a gt 10 or foo eq bar',
    {'one': 'a gt 10', 'two': 'a gt 10 or foo eq bar'},
    {
        'one': ['a', 'gt', 10],
        'two': ['a', 'gt', 10, 'or', 'foo', 'eq', 'bar'],
    },
]:
    f = pc.qualify(c)
    r = f({'foo': 'bar', 'a': 0})
    assert r == {'one': False, 'two': True}

We may refer to results of other named conditions and also can pass named condition sets as lists instead of dicts:

def run(q):
    print('Running', q)

    class F:
        def custom(data):
            return data.get('a')

    f = pc.qualify(q, lookup_provider=F)

    assert f({'a': 'b'}) == {
        'first': True,
        'listed': [False, False],
        'thrd': True,
        'zero': True,
        'last': True,
    }
    res = f({'c': 'foo', 'x': 1})
    assert res == {
        'first': False,
        'listed': [False, True],
        'thrd': False,
        'zero': True,
        'last': True,
    }

q = {
    'thrd': ['k', 'or', ':first'],
    'listed': [['foo'], ['c', 'eq', 'foo']],
    'zero': [['x', 'eq', 1], 'or', ':thrd'],
    'first': [':custom', 'eq', 'b'],
    'last': True,  # you might want to do this to always get at least one matcher, e.g. for data streaming
}
# as list of conditions:
run(q)

# as dict:
q = dict([[k, v] for k, v in q.items()])
run(q)

Output:

Running {'thrd': ['k', 'or', ':first'], 'listed': [['foo'], ['c', 'eq', 'foo']], 'zero': [['x', 'eq', 1], 'or', ':thrd'], 'first': [':custom', 'eq', 'b'], 'last': True}
Running {'thrd': ['k', 'or', ':first'], 'listed': [['foo'], ['c', 'eq', 'foo']], 'zero': [['x', 'eq', 1], 'or', ':thrd'], 'first': [':custom', 'eq', 'b'], 'last': True}

WARNING: For performance reasons there is no built in circular reference check. You'll run into python's built in recursion checker!

  • into: Put the matched named conditions into the original data
  • prefix: Work from a prefix nested in the root
  • add_cached: Return also the data from function result cache

Here a few variants to parametrize behaviour, by example:

conds = {
    0: ['foo'],
    1: ['bar'],
    2: ['func'],
    3: ['n'],
    'n': ['bar'],
}

class F:
    def func(*a, **kw):
        return True, 0

q = lambda d, **kw: pc.qualify(
    conds, lookup_provider=F, prefixed_lookup_funcs=False, **kw
)(d)

m = q({'bar': 1})
assert m == {0: False, 1: True, 2: True, 3: True, 'n': True}

# return data, with matched conds in:
m = q({'bar': 1}, into='conds')
assert m == {
    'bar': 1,
    'conds': {0: False, 1: True, 2: True, 3: True, 'n': True},
}

msg = lambda: {'bar': 1, 'pl': {'a': 1}}

# add_cached == True -> it's put into the cond results:
m = q(msg(), into='conds', add_cached=True)
assert m == {
    'bar': 1,
    'conds': {0: False, 1: True, 2: True, 3: True, 'n': True, 'func': True},
    'pl': {'a': 1},
}

m = q(msg(), into='conds', add_cached='pl')
assert m == {
    'bar': 1,
    'conds': {0: False, 1: True, 2: True, 3: True, 'n': True},
    # n had been put into the cache, was not evaled twice:
    'pl': {'a': 1, 'func': True, 'n': True},
}

m = q({'bar': 1}, add_cached='pl')
assert m == {0: False, 1: True, 2: True, 3: True, 'n': True, 'func': True}

# prefix -> Nr 1, bar,  should NOT be True, since not in pl now:
m = q(msg(), prefix='pl', into='conds', add_cached='pl',)
assert m == {
    'bar': 1,
    'conds': {0: False, 1: False, 2: True, 3: False, 'n': False},
    'pl': {'a': 1, 'func': True, 'n': False},
}

If you either supply a key called 'root' OR supply it as argument to qualify, pycond will only evaluate named conditions required to calculate the root key:

called = []

def expensive_func(k, v, cfg, data, **kw):
    called.append(data)
    return 1, v

def xx(k, v, cfg, data, **kw):
    called.append(data)
    return data.get('a'), v

funcs = {'exp': {'func': expensive_func}, 'xx': {'func': xx}}
q = {
    'root': ['foo', 'and', ':bar'],
    'bar': [['somecond'], 'or', [[':exp', 'eq', 1], 'and', ':baz'],],
    'x': [':xx'],
    'baz': [':exp', 'lt', 10],
}
qualifier = pc.qualify(q, lookup_provider_dict=funcs, add_cached=True)

d = {'foo': 1}
r = qualifier(d)

# root, bar, baz had been calculated, not x
assert r == {'root': True, 'bar': True, 'baz': True, 'exp': 1}
# expensive_func result, which was cached, is also returned.
# expensive_func only called once allthough result evaluated for bar and baz:
assert len(called) == 1

called.clear()
f = pc.qualify(q, lookup_provider_dict=funcs, root='x', add_cached=True)
assert f({'a': 1}) == {'x': True, 'xx': 1}
assert f({'b': 1}) == {'x': False, 'xx': None}
assert called == [{'a': 1}, {'b': 1}]

This means pycond can be used as a lightweight declarative function dispatching framework.

Since version 20200601 and Python 3.x versions, pycond can deliver ReactiveX compliant stream operators.

Lets first set up a test data stream, by defining a function rx_setup like so:

# simply `import rx as Rx and rx = rx.operators`:
# import pycond as pc, like always:
Rx, rx, GS = pc.import_rx('GS')

def push_through(*test_pipe, items=4):
    """
    Function which takes a set of operators and runs an 'rx.interval' stream, until count items are through
    """

    # stream sink result holder plus a stream completer:
    l, compl = [], rx.take(items)
    l.clear()  # clear any previous results

    def next_(x):
        # simply remember what went through in a list:
        l.append(x)

    def err(*a):
        # should never happen:
        print('exception', a)

    stream = Rx.interval(0.01)  # numbers, each on its own thread

    # turns the ints into dicts: {'i': 1}, then {'i': 2} and so on:
    # (we start from 1, the first 0 we filter out)
    stream = stream.pipe(
        rx.filter(lambda i: i > 0), rx.map(lambda i: {'i': i})
    )

    # defines the stream through the tested operators:
    test_pipe = test_pipe + (compl,)
    s = stream.pipe(*test_pipe)

    # runs the stream:
    d = s.subscribe(
        on_error=err,
        on_next=next_,
        on_completed=lambda: l.append('completed'),
    )

    # blocks until completed:
    while not (l and l[-1] == 'completed'):
        time.sleep(0.001)
    l.pop()  # removes completed indicator

    return l  # returns all processed messages

return Rx, rx, push_through

Lets test the setup by having some messages streamed through:

Rx, rx, push_through = rx_setup()
# test test setup:
r = push_through(items=3)
assert r == [{'i': 1}, {'i': 2}, {'i': 3}]

-> test setup works.

This is the most simple operation: A simple stream filter.

Rx, rx, push_through = rx_setup()

# ask pycond for a stream filter based on a condition:
pcfilter = partial(pc.rxop, ['i', 'mod', 2])

r = push_through(pcfilter())
odds = [{'i': 1}, {'i': 3}, {'i': 5}, {'i': 7}]
assert r == odds

# try the stream filter with message headered data:
pl = 'payload'
r = push_through(rx.map(lambda i: {pl: i}), pcfilter(prefix=pl))
print('Full messages passed:', r)
r = [m[pl] for m in r]
assert len(r) == 4
assert r == odds

Output:

Full messages passed: [{'payload': {'i': 1}}, {'payload': {'i': 3}}, {'payload': {'i': 5}}, {'payload': {'i': 7}}]

Using named condition dicts we can classify data, i.e. tag it, in order to process subsequently:

Rx, rx, push_through = rx_setup()

# generate a set of classifiers:
conds = [['i', 'mod', i] for i in range(2, 4)]

def run(offs=0):

    # and get a classifying operator from pycond, adding the results in place, at key 'mod':
    r = push_through(pc.rxop(conds, into='mod'))
    i, j = 0 + offs, 1 + offs
    assert r == [
        {'i': 1, 'mod': {i: 1, j: 1}},
        {'i': 2, 'mod': {i: 0, j: 2}},
        {'i': 3, 'mod': {i: 1, j: 0}},
        {'i': 4, 'mod': {i: 0, j: 1}},
    ]

# this will automatically number the classifiers, from 0:
run()

# we can also provide the names of the classifiers by passing a dict:
# here we pass 2 and 3 as those names:
conds = dict([(i, ['i', 'mod', i]) for i in range(2, 4)])
run(2)

Normally the data has headers, so thats a good place to keep the classification tags.

We fall back to an alternative condition evaluation (which could be a function call) only when a previous condition evaluation returns something falsy - by providing a root condition. When it evaluated, possibly requiring evaluation of other conditions, we return:

Rx, rx, push_through = rx_setup()

# using the list style:
conds = [[i, [['i', 'mod', i], 'or', ':alt']] for i in range(2, 4)]
conds.append(['alt', ['i', 'gt', 1]])

# provide the root condition. Only when it evals falsy, the named "alt" condiction will be evaluated:
r = push_through(pc.rxop(conds, into='mod', root=2, add_cached=True))

assert r == [
    # evaluation of alt was not required:
    {'i': 1, 'mod': {2: True}},
    # evaluation of alt was required:
    {'i': 2, 'mod': {2: True, 'alt': True}},
    {'i': 3, 'mod': {2: True}},
    {'i': 4, 'mod': {2: True, 'alt': True}},
]

For the special case of booleans in a condition list we do not treat them as names.

# 2 unnamed conditions -> keys will be positional
qs = pc.qualify([True, False])
res = qs({'a': 1})
assert res == {0: True, 1: False}  # and not {True: False}
# 2 named conds
qs = pc.qualify([[1, ['a', 'eq', 1]], [2, ['b', 'eq', 42]]])
res = qs({'a': 1})
assert res == {1: True, 2: False}

WARNING: Early Version. Only for the gevent platform.

Selective classification allows to call condition functions only when other criteria are met. That makes it possible to read e.g. from a database only when data is really required - and not always, "just in case".

pycond allows to define, that blocking operations should be run async within the stream, possibly giving up order.

First a simple filter, which gives up order but does not block:

Rx, rx, push_through = rx_setup()

class F:
    def check(k, v, cfg, data, t0=[], **kw):
        # will be on different thread:
        i, pointer = data['i'], ''
        if not t0:
            t0.append(now())
        if i == 1:
            # ints are fired at 0.01, i.e. the 1 will land 4 after 1:
            time.sleep(0.048)
            pointer = '   <----- not in order, blocked'
        # demonstrate that item 1 is not blocking anything - just order is disturbed:
        print('item %s: %.3fs %s' % (i, now() - t0[0], pointer))
        return i % 2, v

# have the operator built for us - with a single condition filter:
rxop = pc.rxop([':check'], into='mod', lookup_provider=F, asyn=['check'],)
r = push_through(rxop, items=5)
assert [m['i'] for m in r] == [3, 5, 1, 7, 9]

Output:

item 2: 0.011s 
item 3: 0.021s 
item 4: 0.032s 
item 5: 0.043s 
item 1: 0.048s    <----- not in order, blocked
item 6: 0.054s 
item 7: 0.065s 
item 8: 0.076s 
item 9: 0.087s

Finally asyncronous classification, i.e. evaluation of multiple conditions:

_thn = lambda msg, data: print('thread:', cur_thread().name, msg, data)

# push_through just runs a stream of {'i': <nr>} through a given operator:
Rx, rx, push_through = rx_setup()

# Defining a simple 'set' of classifiers, here as list, with one single key: 42:
conds = [
    [
        42,
        [
            ['i', 'lt', 100],
            'and',
            [[':odd', 'eq', 1], 'or', ['i', 'eq', 2]],
            'and_not',
            [':blocking', 'eq', 3],
        ],
    ]
]

class F:
    """
    Namespace for condition lookup functions.
    You may also pass a dict (lookup_provider_dict)

    We provide the functions for 'odd' and 'blocking'.
    """

    def odd(k, v, cfg, data, **kw):
        # just print the threadname.
        # will go up, interval stream has each nr on its own thread:
        _thn('odd', data)
        # fullfill condition only for odd numbers
        # -> even nrs won't even run func 'blocking':
        return data['i'] % 2, v

    def blocking(k, v, cfg, data, **kw):
        i = data['i']
        # will be on different thread:
        _thn('blocking', data)
        if i == 1:
            # two others will "overtake the i=1 item,
            # since the interval stream is firing every 0.01 secs:
            time.sleep(0.028)
        elif i == 2:
            # Exceptions, incl. timeouts, will simply be forwarded to cfg['err_handler']
            # i.e. also timeout mgmt have to be done here, in the custom functions themselves.

            # Rationale for not providing a timeout monitoring within pycond itself:
            # Async ops are done with libs, which ship with their own timeout params.
            # No need to re-invent / overlay with our own monitoring of that.

            # In the err handler, then further arrangements can be done.
            raise TimeoutError('ups')
        elif i == 5:
            1 / 0
        return data['i'], v

errors = []

def handle_err(item, cfg, ctx, exc, t=errors, **kw):
    # args are: [item, cfg]
    if 'ups' in str(exc):
        assert item['i'] == 2
        assert exc.__class__ == TimeoutError
        t.append(item)
    else:
        assert item['i'] == 5
        assert exc.__class__ == ZeroDivisionError
        t.append(item)

# have the operator built for us:
rxop = pc.rxop(
    conds,
    into='mod',
    lookup_provider=F,
    err_handler=handle_err,
    asyn=['blocking'],
)
r = push_through(rxop, items=5)
assert [m['i'] for m in r] == [3, 1, 4, 6, 7]
assert [m['mod'][42] for m in r] == [False, True, False, False, True]
# item 2 caused a timeout:
assert [t['i'] for t in errors] == [2, 5]

Output:

thread: Thread-10053 odd {'i': 1}
thread: Dummy-10055 blocking {'i': 1}
thread: Thread-10054 odd {'i': 2}
thread: Dummy-10057 blocking {'i': 2}
thread: Thread-10056 odd {'i': 3}
thread: Dummy-10059 blocking {'i': 3}
thread: Thread-10058 odd {'i': 4}
thread: Thread-10060 odd {'i': 5}
thread: Dummy-10062 blocking {'i': 5}
thread: Thread-10061 odd {'i': 6}
thread: Thread-10063 odd {'i': 7}
thread: Dummy-10065 blocking {'i': 7}

*Auto generated by pytest2md, running ./tests/test_tutorial.py

Releases

No releases published

Packages

No packages published