Skip to content

Commit

Permalink
HUE-8578 [importer] Implement Flume output
Browse files Browse the repository at this point in the history
  • Loading branch information
romainr committed Sep 26, 2018
1 parent 77447ff commit 5d6a52b
Show file tree
Hide file tree
Showing 4 changed files with 318 additions and 1 deletion.
3 changes: 3 additions & 0 deletions desktop/libs/indexer/src/indexer/api3.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from indexer.indexers.rdbms import run_sqoop, _get_api
from indexer.indexers.sql import SQLIndexer
from indexer.solr_client import SolrClient, MAX_UPLOAD_SIZE
from indexer.indexers.flume import FlumeIndexer


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -481,6 +482,8 @@ def _large_indexing(request, file_format, collection_name, query=None, start_tim
db = dbms.get(request.user)
table_metadata = db.get_table(database=file_format['databaseName'], table_name=file_format['tableName'])
input_path = table_metadata.path_location
elif file_format['inputFormat'] == 'stream' and file_format['streamSelection'] == 'flume':
return FlumeIndexer(user=request.user).start(collection_name, file_format, destination)
elif file_format['inputFormat'] == 'stream':
return _envelope_job(request, file_format, destination, start_time=start_time, lib_path=lib_path)
elif file_format['inputFormat'] == 'file':
Expand Down
147 changes: 147 additions & 0 deletions desktop/libs/indexer/src/indexer/indexers/flume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Licensed to Cloudera, Inc. under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. Cloudera, Inc. licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.import logging

import logging
import os

from django.contrib.auth.models import User
from django.core.urlresolvers import reverse
from django.utils.translation import ugettext as _

from desktop.lib.exceptions_renderable import PopupException
from libzookeeper.conf import zkensemble
from metadata.manager_client import ManagerApi

from indexer.conf import config_morphline_path


LOG = logging.getLogger(__name__)


class FlumeIndexer(object):

def __init__(self, user):
self.user = user


def start(self, destination_name, file_format, destination):
responses = {'status': 0}

api = ManagerApi(self.user)

for config_name, config_value in self.generate_config(file_format, destination):
responses[config_name] = api.update_flume_config(cluster_name=None, config_name=config_name, config_value=config_value)

responses['refresh_flume'] = api.refresh_flume(cluster_name=None, restart=True)

if file_format['ouputFormat'] == 'index':
responses['pubSubUrl'] = 'assist.collections.refresh'
responses['on_success_url'] = reverse('search:browse', kwargs={'name': destination_name})

return responses


def generate_config(self, source, destination):
configs = []

if source['channelSourceType'] == 'directory':
agent_source = '''
tier1.sources.source1.type = exec
tier1.sources.source1.command = tail -F /var/log/hue-httpd/access_log
tier1.sources.source1.channels = channel1
'''
else:
raise PopupException(_('Input format not recognized: %(channelSourceType)s') % source)

if destination['ouputFormat'] == 'file':
agent_sink = '''
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute'''
elif destination['ouputFormat'] == 'table':
agent_sink = '''
a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =id,,msg'''
elif destination['ouputFormat'] == 'kafka':
agent_sink = '''
tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.topic = hueAccessLogs
tier1.sinks.sink1.brokerList = spark2-envelope515-1.gce.cloudera.com:9092,spark2-envelope515-2.gce.cloudera.com:9092,spark2-envelope515-3.gce.cloudera.com:9092
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.batchSize = 20'''
elif destination['ouputFormat'] == 'index':
# Morphline file
configs.append(self.generate_morphline_config(destination))
# Flume config
agent_sink = '''
tier1.sinks.sink1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
tier1.sinks.sink1.morphlineFile = morphlines.conf
tier1.sinks.sink1.morphlineId = hue_accesslogs_no_geo
tier1.sinks.sink1.channel = channel1'''
else:
raise PopupException(_('Output format not recognized: %(ouputFormat)s') % destination)

# TODO: use agent id: input + output and do not overide all the configs
# TODO: use Kafka channel if possible
flume_config = '''tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
%(sinks)s''' % {
'sources': agent_source,
'sinks': agent_sink,
}

configs.append(('agent_config_file', flume_config))

return configs


def generate_morphline_config(self, destination):
# TODO manage generic config, cf. MorphlineIndexer
morphline_config = open(os.path.join(config_morphline_path(), 'hue_accesslogs_no_geo.morphline.conf')).read()
morphline_config = morphline_config.replace(
'${SOLR_COLLECTION}', 'log_analytics_demo'
).replace(
'${ZOOKEEPER_ENSEMBLE}', '%s/solr' % zkensemble()
)
return ('agent_morphlines_conf_file', morphline_config)
162 changes: 162 additions & 0 deletions desktop/libs/indexer/src/indexer/indexers/flume_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Licensed to Cloudera, Inc. under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. Cloudera, Inc. licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from django.contrib.auth.models import User

from nose.plugins.skip import SkipTest
from nose.tools import assert_equal, assert_true

from indexer.indexers.flume import FlumeIndexer


def test_generate_from_directory_to_solr_index():
raise SkipTest

source = {
'channelSourceType': 'directory',
}
destination = {
'ouputFormat': 'index',
}

configs = FlumeIndexer(user=None).generate_config(source=source, destination=destination)

assert_equal(
'''SOLR_LOCATOR : {
# Name of solr collection
collection : log_analytics_demo
# ZooKeeper ensemble
zkHost : "spark2-envelope515-1.gce.cloudera.com:2181/solr"
}
morphlines : [
{
id : hue_accesslogs_no_geo
importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
commands : [
{
## Read the email stream and break it up into individual messages.
## The beginning of a message is marked by regex clause below
## The reason we use this command is that one event can have multiple
## messages
readCSV {
## Hue HTTPD load balancer
## 172.18.18.3 - - [27/Aug/2018:05:47:12 -0700] "GET /static/desktop/js/jquery.rowselector.a04240f7cc48.js HTTP/1.1" 200 2321
separator: " "
columns: [client_ip,C1,C2,time,dummy1,request,code,bytes]
ignoreFirstLine : false
quoteChar : "\""
commentPrefix : ""
trim : true
charset : UTF-8
}
}
{
split {
inputField : request
outputFields : [method, url, protocol]
separator : " "
isRegex : false
#separator : """\s*,\s*"""
# #isRegex : true
addEmptyStrings : false
trim : true
}
}
{
split {
inputField : url
outputFields : ["", app, subapp]
separator : "\/"
isRegex : false
#separator : """\s*,\s*"""
# #isRegex : true
addEmptyStrings : false
trim : true
}
}
{
userAgent {
inputField : user_agent
outputFields : {
user_agent_family : "@{ua_family}"
user_agent_major : "@{ua_major}"
device_family : "@{device_family}"
os_family : "@{os_family}"
os_major : "@{os_major}"
}
}
}
#{logInfo { format : "BODY : {}", args : ["@{}"] } }
# add Unique ID, in case our message_id field from above is not present
{
generateUUID {
field:id
}
}
# convert the timestamp field to "yyyy-MM-dd'T'HH:mm:ss.SSSZ" format
{
# 21/Nov/2014:22:08:27
convertTimestamp {
field : time
inputFormats : ["[dd/MMM/yyyy:HH:mm:ss", "EEE, d MMM yyyy HH:mm:ss Z", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", "yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd"]
#inputTimezone : America/Los_Angeles
inputTimezone : UTC
outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
outputTimezone : UTC
}
}
# Consume the output record of the previous command and pipe another
# record downstream.
#
# This command sanitizes record fields that are unknown to Solr schema.xml
# by deleting them. Recall that Solr throws an exception on any attempt to
# load a document that contains a field that isn't specified in schema.xml
{
sanitizeUnknownSolrFields {
# Location from which to fetch Solr schema
solrLocator : ${SOLR_LOCATOR}
}
}
# load the record into a SolrServer or MapReduce SolrOutputFormat.
{
loadSolr {
solrLocator : ${SOLR_LOCATOR}
}
}
]
}
]
'''.strip()
,
configs[0][1].strip() # 'agent_morphlines_conf_file'
)

assert_equal(
('agent_config_file', 'tier1.sources = source1\n tier1.channels = channel1\n tier1.sinks = sink1\n\n\n tier1.channels.channel1.type = memory\n tier1.channels.channel1.capacity = 10000\n tier1.channels.channel1.transactionCapacity = 1000\n\n \n tier1.sinks.sink1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink\n tier1.sinks.sink1.morphlineFile = morphlines.conf\n tier1.sinks.sink1.morphlineId = hue_accesslogs_no_geo\n tier1.sinks.sink1.channel = channel1')
,
configs['agent_config_file']
)
7 changes: 6 additions & 1 deletion desktop/libs/indexer/src/indexer/templates/importer.mako
Original file line number Diff line number Diff line change
Expand Up @@ -1915,6 +1915,8 @@ ${ assist.assistPanel() }
wizard.guessFormat();
if (self.streamSelection() === 'kafka') {
wizard.destination.tableFormat('kudu');
} else {
wizard.destination.tableFormat('text');
}
}
});
Expand Down Expand Up @@ -2102,7 +2104,7 @@ ${ assist.assistPanel() }
if (format.value === 'altus' && ['table'].indexOf(wizard.source.inputFormat()) === -1) {
return false;
}
if (format.value === 'stream' && ['file'].indexOf(wizard.source.inputFormat()) === -1) {
if (format.value === 'stream' && ['file', 'stream'].indexOf(wizard.source.inputFormat()) === -1) {
return false;
}
if (format.value === 'hbase' && (wizard.source.inputFormat() !== 'rdbms' || wizard.source.rdbmsAllTablesSelected())) {
Expand Down Expand Up @@ -2606,6 +2608,9 @@ ${ assist.assistPanel() }
self.isIndexing(false);
} else if (resp.on_success_url) {
$.jHueNotify.info("${ _('Creation success.') }");
if (resp.pubSubUrl) {
huePubSub.publish(notebook.pubSubUrl);
}
huePubSub.publish('open.link', resp.on_success_url);
} else {
self.showCreate(true);
Expand Down

0 comments on commit 5d6a52b

Please sign in to comment.