Skip to content

Commit

Permalink
Log Topic for Functions (apache#1356)
Browse files Browse the repository at this point in the history
  • Loading branch information
srkukarni authored and sijie committed Mar 7, 2018
1 parent dad679a commit 6b24ab9
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ abstract class FunctionConfigCommand extends BaseCommand {
protected String inputs;
@Parameter(names = "--output", description = "Output Topic Name")
protected String output;
@Parameter(names = "--logTopic", description = "Log Topic")
protected String logTopic;
@Parameter(names = "--customSerdeInputs", description = "Map of input topic to serde classname")
protected String customSerdeInputString;
@Parameter(names = "--outputSerdeClassName", description = "Output SerDe")
Expand Down Expand Up @@ -184,6 +186,9 @@ void processArguments() throws Exception {
if (null != output) {
functionConfigBuilder.setOutput(output);
}
if (null != logTopic) {
functionConfigBuilder.setLogTopic(logTopic);
}
if (null != tenant) {
functionConfigBuilder.setTenant(tenant);
}
Expand Down
153 changes: 109 additions & 44 deletions pulsar-functions/instance/src/main/python/Function_pb2.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

#
# -*- encoding: utf-8 -*-

# Generated by the protocol buffer compiler. DO NOT EDIT!
Expand All @@ -38,10 +38,10 @@


DESCRIPTOR = _descriptor.FileDescriptor(
name='Function.proto',
name='pulsar-functions/proto/src/main/proto/Function.proto',
package='proto',
syntax='proto3',
serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xdb\x05\n\x0e\x46unctionConfig\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x0e\n\x06inputs\x18\x0e \x03(\t\x12G\n\x11\x63ustomSerdeInputs\x18\x05 \x03(\x0b\x32,.proto.FunctionConfig.CustomSerdeInputsEntry\x12\x1c\n\x14outputSerdeClassName\x18\x06 \x01(\t\x12\x0e\n\x06output\x18\x07 \x01(\t\x12H\n\x14processingGuarantees\x18\t \x01(\x0e\x32*.proto.FunctionConfig.ProcessingGuarantees\x12\x39\n\nuserConfig\x18\n \x03(\x0b\x32%.proto.FunctionConfig.UserConfigEntry\x12@\n\x10subscriptionType\x18\x0b \x01(\x0e\x32&.proto.FunctionConfig.SubscriptionType\x12.\n\x07runtime\x18\x0c \x01(\x0e\x32\x1d.proto.FunctionConfig.Runtime\x12\x0f\n\x07\x61utoAck\x18\r \x01(\x08\x1a\x38\n\x16\x43ustomSerdeInputsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x31\n\x0fUserConfigEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"9\n\x14ProcessingGuarantees\x12\x0f\n\x0b\x41TMOST_ONCE\x10\x00\x12\x10\n\x0c\x41TLEAST_ONCE\x10\x01\"-\n\x10SubscriptionType\x12\n\n\x06SHARED\x10\x00\x12\r\n\tEXCLUSIVE\x10\x01\"\x1f\n\x07Runtime\x12\x08\n\x04JAVA\x10\x00\x12\n\n\x06PYTHON\x10\x01\".\n\x17PackageLocationMetaData\x12\x13\n\x0bpackagePath\x18\x01 \x01(\t\"\x9f\x01\n\x10\x46unctionMetaData\x12-\n\x0e\x66unctionConfig\x18\x01 \x01(\x0b\x32\x15.proto.FunctionConfig\x12\x37\n\x0fpackageLocation\x18\x02 \x01(\x0b\x32\x1e.proto.PackageLocationMetaData\x12\x0f\n\x07version\x18\x03 \x01(\x04\x12\x12\n\ncreateTime\x18\x04 \x01(\x04\"_\n\x08Snapshot\x12\x35\n\x14\x66unctionMetaDataList\x18\x01 \x03(\x0b\x32\x17.proto.FunctionMetaData\x12\x1c\n\x14lastAppliedMessageId\x18\x02 \x01(\x0c\"Q\n\nAssignment\x12\x31\n\x10\x66unctionMetaData\x18\x01 \x01(\x0b\x32\x17.proto.FunctionMetaData\x12\x10\n\x08workerId\x18\x02 \x01(\tB-\n!org.apache.pulsar.functions.protoB\x08\x46unctionb\x06proto3')
serialized_pb=_b('\n4pulsar-functions/proto/src/main/proto/Function.proto\x12\x05proto\"\x98\x06\n\x0e\x46unctionConfig\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12G\n\x11\x63ustomSerdeInputs\x18\x05 \x03(\x0b\x32,.proto.FunctionConfig.CustomSerdeInputsEntry\x12\x1c\n\x14outputSerdeClassName\x18\x06 \x01(\t\x12\x0e\n\x06output\x18\x07 \x01(\t\x12\x10\n\x08logTopic\x18\x08 \x01(\t\x12H\n\x14processingGuarantees\x18\t \x01(\x0e\x32*.proto.FunctionConfig.ProcessingGuarantees\x12\x39\n\nuserConfig\x18\n \x03(\x0b\x32%.proto.FunctionConfig.UserConfigEntry\x12@\n\x10subscriptionType\x18\x0b \x01(\x0e\x32&.proto.FunctionConfig.SubscriptionType\x12.\n\x07runtime\x18\x0c \x01(\x0e\x32\x1d.proto.FunctionConfig.Runtime\x12\x0f\n\x07\x61utoAck\x18\r \x01(\x08\x12\x0e\n\x06inputs\x18\x0e \x03(\t\x12\x13\n\x0bparallelism\x18\x0f \x01(\x05\x1a\x38\n\x16\x43ustomSerdeInputsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x31\n\x0fUserConfigEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"O\n\x14ProcessingGuarantees\x12\x10\n\x0c\x41TLEAST_ONCE\x10\x00\x12\x0f\n\x0b\x41TMOST_ONCE\x10\x01\x12\x14\n\x10\x45\x46\x46\x45\x43TIVELY_ONCE\x10\x02\"-\n\x10SubscriptionType\x12\n\n\x06SHARED\x10\x00\x12\r\n\tEXCLUSIVE\x10\x01\"\x1f\n\x07Runtime\x12\x08\n\x04JAVA\x10\x00\x12\n\n\x06PYTHON\x10\x01\".\n\x17PackageLocationMetaData\x12\x13\n\x0bpackagePath\x18\x01 \x01(\t\"\x9f\x01\n\x10\x46unctionMetaData\x12-\n\x0e\x66unctionConfig\x18\x01 \x01(\x0b\x32\x15.proto.FunctionConfig\x12\x37\n\x0fpackageLocation\x18\x02 \x01(\x0b\x32\x1e.proto.PackageLocationMetaData\x12\x0f\n\x07version\x18\x03 \x01(\x04\x12\x12\n\ncreateTime\x18\x04 \x01(\x04\"_\n\x08Snapshot\x12\x35\n\x14\x66unctionMetaDataList\x18\x01 \x03(\x0b\x32\x17.proto.FunctionMetaData\x12\x1c\n\x14lastAppliedMessageId\x18\x02 \x01(\x0c\"Q\n\x08Instance\x12\x31\n\x10\x66unctionMetaData\x18\x01 \x01(\x0b\x32\x17.proto.FunctionMetaData\x12\x12\n\ninstanceId\x18\x02 \x01(\x05\"A\n\nAssignment\x12!\n\x08instance\x18\x01 \x01(\x0b\x32\x0f.proto.Instance\x12\x10\n\x08workerId\x18\x02 \x01(\tB-\n!org.apache.pulsar.functions.protoB\x08\x46unctionb\x06proto3')
)


Expand All @@ -53,18 +53,22 @@
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='ATMOST_ONCE', index=0, number=0,
name='ATLEAST_ONCE', index=0, number=0,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='ATMOST_ONCE', index=1, number=1,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='ATLEAST_ONCE', index=1, number=1,
name='EFFECTIVELY_ONCE', index=2, number=2,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=620,
serialized_end=677,
serialized_start=697,
serialized_end=776,
)
_sym_db.RegisterEnumDescriptor(_FUNCTIONCONFIG_PROCESSINGGUARANTEES)

Expand All @@ -85,8 +89,8 @@
],
containing_type=None,
options=None,
serialized_start=679,
serialized_end=724,
serialized_start=778,
serialized_end=823,
)
_sym_db.RegisterEnumDescriptor(_FUNCTIONCONFIG_SUBSCRIPTIONTYPE)

Expand All @@ -107,8 +111,8 @@
],
containing_type=None,
options=None,
serialized_start=726,
serialized_end=757,
serialized_start=825,
serialized_end=856,
)
_sym_db.RegisterEnumDescriptor(_FUNCTIONCONFIG_RUNTIME)

Expand Down Expand Up @@ -146,8 +150,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=511,
serialized_end=567,
serialized_start=588,
serialized_end=644,
)

_FUNCTIONCONFIG_USERCONFIGENTRY = _descriptor.Descriptor(
Expand Down Expand Up @@ -183,8 +187,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=569,
serialized_end=618,
serialized_start=646,
serialized_end=695,
)

_FUNCTIONCONFIG = _descriptor.Descriptor(
Expand Down Expand Up @@ -223,29 +227,29 @@
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='inputs', full_name='proto.FunctionConfig.inputs', index=4,
number=14, type=9, cpp_type=9, label=3,
name='customSerdeInputs', full_name='proto.FunctionConfig.customSerdeInputs', index=4,
number=5, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='customSerdeInputs', full_name='proto.FunctionConfig.customSerdeInputs', index=5,
number=5, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
name='outputSerdeClassName', full_name='proto.FunctionConfig.outputSerdeClassName', index=5,
number=6, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='outputSerdeClassName', full_name='proto.FunctionConfig.outputSerdeClassName', index=6,
number=6, type=9, cpp_type=9, label=1,
name='output', full_name='proto.FunctionConfig.output', index=6,
number=7, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='output', full_name='proto.FunctionConfig.output', index=7,
number=7, type=9, cpp_type=9, label=1,
name='logTopic', full_name='proto.FunctionConfig.logTopic', index=7,
number=8, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
Expand Down Expand Up @@ -285,6 +289,20 @@
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='inputs', full_name='proto.FunctionConfig.inputs', index=13,
number=14, type=9, cpp_type=9, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='parallelism', full_name='proto.FunctionConfig.parallelism', index=14,
number=15, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
Expand All @@ -300,8 +318,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=26,
serialized_end=757,
serialized_start=64,
serialized_end=856,
)


Expand Down Expand Up @@ -331,8 +349,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=759,
serialized_end=805,
serialized_start=858,
serialized_end=904,
)


Expand Down Expand Up @@ -383,8 +401,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=808,
serialized_end=967,
serialized_start=907,
serialized_end=1066,
)


Expand Down Expand Up @@ -421,8 +439,46 @@
extension_ranges=[],
oneofs=[
],
serialized_start=969,
serialized_end=1064,
serialized_start=1068,
serialized_end=1163,
)


_INSTANCE = _descriptor.Descriptor(
name='Instance',
full_name='proto.Instance',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='functionMetaData', full_name='proto.Instance.functionMetaData', index=0,
number=1, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='instanceId', full_name='proto.Instance.instanceId', index=1,
number=2, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=1165,
serialized_end=1246,
)


Expand All @@ -434,7 +490,7 @@
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='functionMetaData', full_name='proto.Assignment.functionMetaData', index=0,
name='instance', full_name='proto.Assignment.instance', index=0,
number=1, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
Expand All @@ -459,8 +515,8 @@
extension_ranges=[],
oneofs=[
],
serialized_start=1066,
serialized_end=1147,
serialized_start=1248,
serialized_end=1313,
)

_FUNCTIONCONFIG_CUSTOMSERDEINPUTSENTRY.containing_type = _FUNCTIONCONFIG
Expand All @@ -476,31 +532,33 @@
_FUNCTIONMETADATA.fields_by_name['functionConfig'].message_type = _FUNCTIONCONFIG
_FUNCTIONMETADATA.fields_by_name['packageLocation'].message_type = _PACKAGELOCATIONMETADATA
_SNAPSHOT.fields_by_name['functionMetaDataList'].message_type = _FUNCTIONMETADATA
_ASSIGNMENT.fields_by_name['functionMetaData'].message_type = _FUNCTIONMETADATA
_INSTANCE.fields_by_name['functionMetaData'].message_type = _FUNCTIONMETADATA
_ASSIGNMENT.fields_by_name['instance'].message_type = _INSTANCE
DESCRIPTOR.message_types_by_name['FunctionConfig'] = _FUNCTIONCONFIG
DESCRIPTOR.message_types_by_name['PackageLocationMetaData'] = _PACKAGELOCATIONMETADATA
DESCRIPTOR.message_types_by_name['FunctionMetaData'] = _FUNCTIONMETADATA
DESCRIPTOR.message_types_by_name['Snapshot'] = _SNAPSHOT
DESCRIPTOR.message_types_by_name['Instance'] = _INSTANCE
DESCRIPTOR.message_types_by_name['Assignment'] = _ASSIGNMENT
_sym_db.RegisterFileDescriptor(DESCRIPTOR)

FunctionConfig = _reflection.GeneratedProtocolMessageType('FunctionConfig', (_message.Message,), dict(

CustomSerdeInputsEntry = _reflection.GeneratedProtocolMessageType('CustomSerdeInputsEntry', (_message.Message,), dict(
DESCRIPTOR = _FUNCTIONCONFIG_CUSTOMSERDEINPUTSENTRY,
__module__ = 'Function_pb2'
__module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2'
# @@protoc_insertion_point(class_scope:proto.FunctionConfig.CustomSerdeInputsEntry)
))
,

UserConfigEntry = _reflection.GeneratedProtocolMessageType('UserConfigEntry', (_message.Message,), dict(
DESCRIPTOR = _FUNCTIONCONFIG_USERCONFIGENTRY,
__module__ = 'Function_pb2'
__module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2'
# @@protoc_insertion_point(class_scope:proto.FunctionConfig.UserConfigEntry)
))
,
DESCRIPTOR = _FUNCTIONCONFIG,
__module__ = 'Function_pb2'
__module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2'
# @@protoc_insertion_point(class_scope:proto.FunctionConfig)
))
_sym_db.RegisterMessage(FunctionConfig)
Expand All @@ -509,28 +567,35 @@

PackageLocationMetaData = _reflection.GeneratedProtocolMessageType('PackageLocationMetaData', (_message.Message,), dict(
DESCRIPTOR = _PACKAGELOCATIONMETADATA,
__module__ = 'Function_pb2'
__module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2'
# @@protoc_insertion_point(class_scope:proto.PackageLocationMetaData)
))
_sym_db.RegisterMessage(PackageLocationMetaData)

FunctionMetaData = _reflection.GeneratedProtocolMessageType('FunctionMetaData', (_message.Message,), dict(
DESCRIPTOR = _FUNCTIONMETADATA,
__module__ = 'Function_pb2'
__module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2'
# @@protoc_insertion_point(class_scope:proto.FunctionMetaData)
))
_sym_db.RegisterMessage(FunctionMetaData)

Snapshot = _reflection.GeneratedProtocolMessageType('Snapshot', (_message.Message,), dict(
DESCRIPTOR = _SNAPSHOT,
__module__ = 'Function_pb2'
__module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2'
# @@protoc_insertion_point(class_scope:proto.Snapshot)
))
_sym_db.RegisterMessage(Snapshot)

Instance = _reflection.GeneratedProtocolMessageType('Instance', (_message.Message,), dict(
DESCRIPTOR = _INSTANCE,
__module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2'
# @@protoc_insertion_point(class_scope:proto.Instance)
))
_sym_db.RegisterMessage(Instance)

Assignment = _reflection.GeneratedProtocolMessageType('Assignment', (_message.Message,), dict(
DESCRIPTOR = _ASSIGNMENT,
__module__ = 'Function_pb2'
__module__ = 'pulsar_functions.proto.src.main.proto.Function_pb2'
# @@protoc_insertion_point(class_scope:proto.Assignment)
))
_sym_db.RegisterMessage(Assignment)
Expand Down
4 changes: 3 additions & 1 deletion pulsar-functions/instance/src/main/python/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

class LogTopicHandler(logging.Handler):
def __init__(self, topic_name, pulsar_client):
logging.Handler.__init__(self)
Log.info("Setting up producer for log topic %s" % topic_name)
self.producer = pulsar_client.create_producer(
str(topic_name),
Expand All @@ -46,7 +47,8 @@ def __init__(self, topic_name, pulsar_client):
compression_type=pulsar._pulsar.CompressionType.LZ4)

def emit(self, record):
self.producer.send_async(record)
msg = self.format(record)
self.producer.send_async(str(msg), None)

def configure(level=logging.INFO):
""" Configure logger which dumps log on terminal
Expand Down
1 change: 1 addition & 0 deletions pulsar-functions/proto/src/main/proto/Function.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ message FunctionConfig {
map<string, string> customSerdeInputs = 5;
string outputSerdeClassName = 6;
string output = 7;
string logTopic = 8;
ProcessingGuarantees processingGuarantees = 9;
map<string,string> userConfig = 10;
SubscriptionType subscriptionType = 11;
Expand Down
Loading

0 comments on commit 6b24ab9

Please sign in to comment.