forked from cztomczak/cefpython
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtask.pyx
90 lines (67 loc) · 2.38 KB
/
task.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# Copyright (c) 2014 CEF Python, see the Authors file.
# All rights reserved. Licensed under BSD 3-clause license.
# Project website: https://github.com/cztomczak/cefpython
include "cefpython.pyx"
g_taskMaxId = 0
g_tasks = {}
def PostTask(int thread, object func, *args):
global g_tasks, g_taskMaxId
# Validate threadId.
if thread not in g_browserProcessThreads:
raise Exception("PostTask failed: requires a browser process thread")
# Validate func.
if not IsFunctionOrMethod(type(func)):
raise Exception("PostTask failed: not a function nor method")
# Params.
cdef list params = list(args)
# Keep func and params until PyTaskRunnable is called.
g_taskMaxId += 1
g_tasks[str(g_taskMaxId)] = {
"func": func,
"params": params
}
# Call C++ wrapper.
cdef int cTaskId = int(g_taskMaxId)
with nogil:
PostTaskWrapper(thread, cTaskId)
def PostDelayedTask(int thread, int delay_ms, object func, *args):
global g_tasks, g_taskMaxId
# Validate threadId.
if thread not in g_browserProcessThreads:
raise Exception("PostTask failed: requires a browser process thread")
# Validate func.
if not IsFunctionOrMethod(type(func)):
raise Exception("PostTask failed: not a function nor method")
# Params.
cdef list params = list(args)
# Keep func and params until PyTaskRunnable is called.
g_taskMaxId += 1
g_tasks[str(g_taskMaxId)] = {
"func": func,
"params": params
}
# Call C++ wrapper.
cdef int cTaskId = int(g_taskMaxId)
with nogil:
PostDelayedTaskWrapper(thread, delay_ms, cTaskId)
cdef public void PyTaskRunnable(int taskId) except * with gil:
cdef object func
cdef list params
cdef object task
try:
global g_tasks
# Validate if task exist.
if str(taskId) not in g_tasks:
raise Exception("PyTaskRunnable failed: invalid taskId=%s" \
% taskId)
# Fetch task: func and params.
task = g_tasks[str(taskId)]
func = task["func"]
params = task["params"]
del g_tasks[str(taskId)]
# Execute user func.
Debug("PyTaskRunnable: taskId=%s, func=%s" % (taskId, func.__name__))
func(*params)
except:
(exc_type, exc_value, exc_trace) = sys.exc_info()
sys.excepthook(exc_type, exc_value, exc_trace)