From 5d6a52b5766fdd2413aa720d92bd51609c746b46 Mon Sep 17 00:00:00 2001 From: Romain Rigaux Date: Mon, 24 Sep 2018 20:53:41 -0700 Subject: [PATCH] HUE-8578 [importer] Implement Flume output --- desktop/libs/indexer/src/indexer/api3.py | 3 + .../indexer/src/indexer/indexers/flume.py | 147 ++++++++++++++++ .../src/indexer/indexers/flume_tests.py | 162 ++++++++++++++++++ .../src/indexer/templates/importer.mako | 7 +- 4 files changed, 318 insertions(+), 1 deletion(-) create mode 100644 desktop/libs/indexer/src/indexer/indexers/flume.py create mode 100644 desktop/libs/indexer/src/indexer/indexers/flume_tests.py diff --git a/desktop/libs/indexer/src/indexer/api3.py b/desktop/libs/indexer/src/indexer/api3.py index 9e2e56c0d18..af69022586b 100644 --- a/desktop/libs/indexer/src/indexer/api3.py +++ b/desktop/libs/indexer/src/indexer/api3.py @@ -46,6 +46,7 @@ from indexer.indexers.rdbms import run_sqoop, _get_api from indexer.indexers.sql import SQLIndexer from indexer.solr_client import SolrClient, MAX_UPLOAD_SIZE +from indexer.indexers.flume import FlumeIndexer LOG = logging.getLogger(__name__) @@ -481,6 +482,8 @@ def _large_indexing(request, file_format, collection_name, query=None, start_tim db = dbms.get(request.user) table_metadata = db.get_table(database=file_format['databaseName'], table_name=file_format['tableName']) input_path = table_metadata.path_location + elif file_format['inputFormat'] == 'stream' and file_format['streamSelection'] == 'flume': + return FlumeIndexer(user=request.user).start(collection_name, file_format, destination) elif file_format['inputFormat'] == 'stream': return _envelope_job(request, file_format, destination, start_time=start_time, lib_path=lib_path) elif file_format['inputFormat'] == 'file': diff --git a/desktop/libs/indexer/src/indexer/indexers/flume.py b/desktop/libs/indexer/src/indexer/indexers/flume.py new file mode 100644 index 00000000000..8365536ed6e --- /dev/null +++ b/desktop/libs/indexer/src/indexer/indexers/flume.py @@ -0,0 +1,147 @@ +# Licensed to Cloudera, Inc. under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. Cloudera, Inc. licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License.import logging + +import logging +import os + +from django.contrib.auth.models import User +from django.core.urlresolvers import reverse +from django.utils.translation import ugettext as _ + +from desktop.lib.exceptions_renderable import PopupException +from libzookeeper.conf import zkensemble +from metadata.manager_client import ManagerApi + +from indexer.conf import config_morphline_path + + +LOG = logging.getLogger(__name__) + + +class FlumeIndexer(object): + + def __init__(self, user): + self.user = user + + + def start(self, destination_name, file_format, destination): + responses = {'status': 0} + + api = ManagerApi(self.user) + + for config_name, config_value in self.generate_config(file_format, destination): + responses[config_name] = api.update_flume_config(cluster_name=None, config_name=config_name, config_value=config_value) + + responses['refresh_flume'] = api.refresh_flume(cluster_name=None, restart=True) + + if file_format['ouputFormat'] == 'index': + responses['pubSubUrl'] = 'assist.collections.refresh' + responses['on_success_url'] = reverse('search:browse', kwargs={'name': destination_name}) + + return responses + + + def generate_config(self, source, destination): + configs = [] + + if source['channelSourceType'] == 'directory': + agent_source = ''' + tier1.sources.source1.type = exec + tier1.sources.source1.command = tail -F /var/log/hue-httpd/access_log + tier1.sources.source1.channels = channel1 + ''' + else: + raise PopupException(_('Input format not recognized: %(channelSourceType)s') % source) + + if destination['ouputFormat'] == 'file': + agent_sink = ''' + a1.channels = c1 + a1.sinks = k1 + a1.sinks.k1.type = hdfs + a1.sinks.k1.channel = c1 + a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S + a1.sinks.k1.hdfs.filePrefix = events- + a1.sinks.k1.hdfs.round = true + a1.sinks.k1.hdfs.roundValue = 10 + a1.sinks.k1.hdfs.roundUnit = minute''' + elif destination['ouputFormat'] == 'table': + agent_sink = ''' + a1.channels = c1 + a1.channels.c1.type = memory + a1.sinks = k1 + a1.sinks.k1.type = hive + a1.sinks.k1.channel = c1 + a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083 + a1.sinks.k1.hive.database = logsdb + a1.sinks.k1.hive.table = weblogs + a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M + a1.sinks.k1.useLocalTimeStamp = false + a1.sinks.k1.round = true + a1.sinks.k1.roundValue = 10 + a1.sinks.k1.roundUnit = minute + a1.sinks.k1.serializer = DELIMITED + a1.sinks.k1.serializer.delimiter = "\t" + a1.sinks.k1.serializer.serdeSeparator = '\t' + a1.sinks.k1.serializer.fieldnames =id,,msg''' + elif destination['ouputFormat'] == 'kafka': + agent_sink = ''' + tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink +tier1.sinks.sink1.topic = hueAccessLogs +tier1.sinks.sink1.brokerList = spark2-envelope515-1.gce.cloudera.com:9092,spark2-envelope515-2.gce.cloudera.com:9092,spark2-envelope515-3.gce.cloudera.com:9092 +tier1.sinks.sink1.channel = channel1 +tier1.sinks.sink1.batchSize = 20''' + elif destination['ouputFormat'] == 'index': + # Morphline file + configs.append(self.generate_morphline_config(destination)) + # Flume config + agent_sink = ''' + tier1.sinks.sink1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink + tier1.sinks.sink1.morphlineFile = morphlines.conf + tier1.sinks.sink1.morphlineId = hue_accesslogs_no_geo + tier1.sinks.sink1.channel = channel1''' + else: + raise PopupException(_('Output format not recognized: %(ouputFormat)s') % destination) + + # TODO: use agent id: input + output and do not overide all the configs + # TODO: use Kafka channel if possible + flume_config = '''tier1.sources = source1 + tier1.channels = channel1 + tier1.sinks = sink1 + + + tier1.channels.channel1.type = memory + tier1.channels.channel1.capacity = 10000 + tier1.channels.channel1.transactionCapacity = 1000 + + %(sinks)s''' % { + 'sources': agent_source, + 'sinks': agent_sink, + } + + configs.append(('agent_config_file', flume_config)) + + return configs + + + def generate_morphline_config(self, destination): + # TODO manage generic config, cf. MorphlineIndexer + morphline_config = open(os.path.join(config_morphline_path(), 'hue_accesslogs_no_geo.morphline.conf')).read() + morphline_config = morphline_config.replace( + '${SOLR_COLLECTION}', 'log_analytics_demo' + ).replace( + '${ZOOKEEPER_ENSEMBLE}', '%s/solr' % zkensemble() + ) + return ('agent_morphlines_conf_file', morphline_config) diff --git a/desktop/libs/indexer/src/indexer/indexers/flume_tests.py b/desktop/libs/indexer/src/indexer/indexers/flume_tests.py new file mode 100644 index 00000000000..a118943d8e5 --- /dev/null +++ b/desktop/libs/indexer/src/indexer/indexers/flume_tests.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Licensed to Cloudera, Inc. under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. Cloudera, Inc. licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from django.contrib.auth.models import User + +from nose.plugins.skip import SkipTest +from nose.tools import assert_equal, assert_true + +from indexer.indexers.flume import FlumeIndexer + + +def test_generate_from_directory_to_solr_index(): + raise SkipTest + + source = { + 'channelSourceType': 'directory', + } + destination = { + 'ouputFormat': 'index', + } + + configs = FlumeIndexer(user=None).generate_config(source=source, destination=destination) + + assert_equal( + '''SOLR_LOCATOR : { + # Name of solr collection + collection : log_analytics_demo + # ZooKeeper ensemble + zkHost : "spark2-envelope515-1.gce.cloudera.com:2181/solr" +} + + +morphlines : [ +{ + id : hue_accesslogs_no_geo + + importCommands : ["org.kitesdk.**", "org.apache.solr.**"] + commands : [ + { + ## Read the email stream and break it up into individual messages. + ## The beginning of a message is marked by regex clause below + ## The reason we use this command is that one event can have multiple + ## messages + readCSV { + + ## Hue HTTPD load balancer + ## 172.18.18.3 - - [27/Aug/2018:05:47:12 -0700] "GET /static/desktop/js/jquery.rowselector.a04240f7cc48.js HTTP/1.1" 200 2321 + + separator: " " + columns: [client_ip,C1,C2,time,dummy1,request,code,bytes] + ignoreFirstLine : false + quoteChar : "\"" + commentPrefix : "" + trim : true + charset : UTF-8 + } + } + { + split { + inputField : request + outputFields : [method, url, protocol] + separator : " " + isRegex : false + #separator : """\s*,\s*""" + # #isRegex : true + addEmptyStrings : false + trim : true + } + } + { + split { + inputField : url + outputFields : ["", app, subapp] + separator : "\/" + isRegex : false + #separator : """\s*,\s*""" + # #isRegex : true + addEmptyStrings : false + trim : true + } + } + { + userAgent { + inputField : user_agent + outputFields : { + user_agent_family : "@{ua_family}" + user_agent_major : "@{ua_major}" + device_family : "@{device_family}" + os_family : "@{os_family}" + os_major : "@{os_major}" + } + } + } + + #{logInfo { format : "BODY : {}", args : ["@{}"] } } + # add Unique ID, in case our message_id field from above is not present + { + generateUUID { + field:id + } + } + + # convert the timestamp field to "yyyy-MM-dd'T'HH:mm:ss.SSSZ" format + { + # 21/Nov/2014:22:08:27 + convertTimestamp { + field : time + inputFormats : ["[dd/MMM/yyyy:HH:mm:ss", "EEE, d MMM yyyy HH:mm:ss Z", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", "yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd"] + #inputTimezone : America/Los_Angeles + inputTimezone : UTC + outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" + outputTimezone : UTC + } + } + + # Consume the output record of the previous command and pipe another + # record downstream. + # + # This command sanitizes record fields that are unknown to Solr schema.xml + # by deleting them. Recall that Solr throws an exception on any attempt to + # load a document that contains a field that isn't specified in schema.xml + { + sanitizeUnknownSolrFields { + # Location from which to fetch Solr schema + solrLocator : ${SOLR_LOCATOR} + } + } + + # load the record into a SolrServer or MapReduce SolrOutputFormat. + { + loadSolr { + solrLocator : ${SOLR_LOCATOR} + } + } + ] +} +] +'''.strip() + , + configs[0][1].strip() # 'agent_morphlines_conf_file' + ) + + assert_equal( + ('agent_config_file', 'tier1.sources = source1\n tier1.channels = channel1\n tier1.sinks = sink1\n\n\n tier1.channels.channel1.type = memory\n tier1.channels.channel1.capacity = 10000\n tier1.channels.channel1.transactionCapacity = 1000\n\n \n tier1.sinks.sink1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink\n tier1.sinks.sink1.morphlineFile = morphlines.conf\n tier1.sinks.sink1.morphlineId = hue_accesslogs_no_geo\n tier1.sinks.sink1.channel = channel1') + , + configs['agent_config_file'] + ) diff --git a/desktop/libs/indexer/src/indexer/templates/importer.mako b/desktop/libs/indexer/src/indexer/templates/importer.mako index 6b514863387..b49bf28db05 100644 --- a/desktop/libs/indexer/src/indexer/templates/importer.mako +++ b/desktop/libs/indexer/src/indexer/templates/importer.mako @@ -1915,6 +1915,8 @@ ${ assist.assistPanel() } wizard.guessFormat(); if (self.streamSelection() === 'kafka') { wizard.destination.tableFormat('kudu'); + } else { + wizard.destination.tableFormat('text'); } } }); @@ -2102,7 +2104,7 @@ ${ assist.assistPanel() } if (format.value === 'altus' && ['table'].indexOf(wizard.source.inputFormat()) === -1) { return false; } - if (format.value === 'stream' && ['file'].indexOf(wizard.source.inputFormat()) === -1) { + if (format.value === 'stream' && ['file', 'stream'].indexOf(wizard.source.inputFormat()) === -1) { return false; } if (format.value === 'hbase' && (wizard.source.inputFormat() !== 'rdbms' || wizard.source.rdbmsAllTablesSelected())) { @@ -2606,6 +2608,9 @@ ${ assist.assistPanel() } self.isIndexing(false); } else if (resp.on_success_url) { $.jHueNotify.info("${ _('Creation success.') }"); + if (resp.pubSubUrl) { + huePubSub.publish(notebook.pubSubUrl); + } huePubSub.publish('open.link', resp.on_success_url); } else { self.showCreate(true);