forked from apache/airflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathplugins_manager.py
114 lines (97 loc) · 3.6 KB
/
plugins_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from builtins import object
import imp
import inspect
import logging
import os
import re
import sys
from itertools import chain
merge = chain.from_iterable
from airflow import configuration
class AirflowPluginException(Exception):
pass
class AirflowPlugin(object):
name = None
operators = []
hooks = []
executors = []
macros = []
admin_views = []
flask_blueprints = []
menu_links = []
@classmethod
def validate(cls):
if not cls.name:
raise AirflowPluginException("Your plugin needs a name.")
plugins_folder = configuration.get('core', 'plugins_folder')
if not plugins_folder:
plugins_folder = configuration.get('core', 'airflow_home') + '/plugins'
plugins_folder = os.path.expanduser(plugins_folder)
if plugins_folder not in sys.path:
sys.path.append(plugins_folder)
plugins = []
norm_pattern = re.compile(r'[/|.]')
# Crawl through the plugins folder to find AirflowPlugin derivatives
for root, dirs, files in os.walk(plugins_folder, followlinks=True):
for f in files:
try:
filepath = os.path.join(root, f)
if not os.path.isfile(filepath):
continue
mod_name, file_ext = os.path.splitext(
os.path.split(filepath)[-1])
if file_ext != '.py':
continue
# normalize root path as namespace
namespace = '_'.join([re.sub(norm_pattern, '__', root), mod_name])
m = imp.load_source(namespace, filepath)
for obj in list(m.__dict__.values()):
if (
inspect.isclass(obj) and
issubclass(obj, AirflowPlugin) and
obj is not AirflowPlugin):
obj.validate()
if obj not in plugins:
plugins.append(obj)
except Exception as e:
logging.exception(e)
logging.error('Failed to import plugin ' + filepath)
def make_module(name, objects):
name = name.lower()
module = imp.new_module(name)
module._name = name.split('.')[-1]
module._objects = objects
module.__dict__.update((o.__name__, o) for o in objects)
return module
operators, hooks, executors, macros, admin_views = [], [], [], [], []
flask_blueprints, menu_links = [], []
for p in plugins:
operators.append(make_module('airflow.operators.' + p.name, p.operators))
hooks.append(make_module('airflow.hooks.' + p.name, p.hooks))
executors.append(make_module('airflow.executors.' + p.name, p.executors))
macros.append(make_module('airflow.macros.' + p.name, p.macros))
admin_views.append(
make_module('airflow.www.admin_views' + p.name, p.admin_views))
flask_blueprints.append(
make_module(
'airflow.www.flask_blueprints' + p.name, p.flask_blueprints))
menu_links.append(
make_module('airflow.www.menu_links' + p.name, p.menu_links))