Skip to content

Commit

Permalink
upgrade udf support to python3
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Jan 14, 2020
1 parent 0a71f83 commit 0852649
Show file tree
Hide file tree
Showing 10 changed files with 457 additions and 590 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Features

- [#2202](https://github.com/influxdata/kapacitor/pull/2202): Add templating for MQTT topics.
- [#2276](https://github.com/influxdata/kapacitor/pull/2276): Upgrade to support python 3 for UDFs, Thanks @N-Coder !

### Bugfixes

Expand Down
12 changes: 10 additions & 2 deletions Dockerfile_build_ubuntu32
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@ RUN apt-get -qq update && apt-get -qq install -y \
autoconf \
automake \
libtool \
python \
python-setuptools \
python3 \
python3-setuptools \
zip \
curl

RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 10

# Install protobuf3 protoc binary

# Install protobuf3 protoc binary
ENV PROTO_VERSION 3.4.0
ENV PROTO_VERSION 3.11.1
RUN wget -q https://github.com/google/protobuf/releases/download/v${PROTO_VERSION}/protoc-${PROTO_VERSION}-linux-x86_32.zip\
&& unzip -j protoc-${PROTO_VERSION}-linux-x86_32.zip bin/protoc -d /bin \
rm protoc-${PROTO_VERSION}-linux-x86_64.zip
Expand All @@ -28,7 +35,8 @@ RUN wget -q https://github.com/google/protobuf/releases/download/v${PROTO_VERSIO
RUN wget -q https://github.com/google/protobuf/releases/download/v${PROTO_VERSION}/protobuf-python-${PROTO_VERSION}.tar.gz \
&& tar -xf protobuf-python-${PROTO_VERSION}.tar.gz \
&& cd /protobuf-${PROTO_VERSION}/python \
&& python setup.py install \
&& python2 setup.py install \
&& python3 setup.py install \
&& rm -rf /protobuf-${PROTO_VERSION} protobuf-python-${PROTO_VERSION}.tar.gz

# Install go
Expand Down
16 changes: 10 additions & 6 deletions Dockerfile_build_ubuntu64
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
FROM ubuntu:16.04
FROM ubuntu:18.04

# This dockerfile is capabable of performing all
# build/test/package/deploy actions needed for Kapacitor.

MAINTAINER [email protected]

RUN apt-get -qq update && apt-get -qq install -y \
python-software-properties \
software-properties-common \
wget \
unzip \
Expand All @@ -18,18 +17,22 @@ RUN apt-get -qq update && apt-get -qq install -y \
rpm \
zip \
python \
python-boto \
python-setuptools \
python3 \
python3-setuptools \
python3-boto \
build-essential \
autoconf \
automake \
libtool \
python-setuptools \
curl

RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 10

RUN gem install fpm

# Install protobuf3 protoc binary
ENV PROTO_VERSION 3.4.0
ENV PROTO_VERSION 3.11.1
RUN wget -q https://github.com/google/protobuf/releases/download/v${PROTO_VERSION}/protoc-${PROTO_VERSION}-linux-x86_64.zip \
&& unzip -j protoc-${PROTO_VERSION}-linux-x86_64.zip bin/protoc -d /bin \
rm protoc-${PROTO_VERSION}-linux-x86_64.zip
Expand All @@ -38,7 +41,8 @@ RUN wget -q https://github.com/google/protobuf/releases/download/v${PROTO_VERSIO
RUN wget -q https://github.com/google/protobuf/releases/download/v${PROTO_VERSION}/protobuf-python-${PROTO_VERSION}.tar.gz \
&& tar -xf protobuf-python-${PROTO_VERSION}.tar.gz \
&& cd /protobuf-${PROTO_VERSION}/python \
&& python setup.py install \
&& python2 setup.py install \
&& python3 setup.py install \
&& rm -rf /protobuf-${PROTO_VERSION} protobuf-python-${PROTO_VERSION}.tar.gz

# Install go
Expand Down
3 changes: 2 additions & 1 deletion server/server_const_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ package server_test

const (
ExecutableSuffix = ""
PythonExecutable = "python2"
Python2Executable = "python2"
PythonExecutable = "python"
LogFileExpectedMode = 0604
AlertLogPath = `/var/log/alert.log`
)
4 changes: 3 additions & 1 deletion server/server_const_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package server_test

const (
ExecutableSuffix = ".exe"
ExecutableSuffix = ".exe"
// For windows we won't test python2 explicitly
Python2Executable = "python"
PythonExecutable = "python"
LogFileExpectedMode = 0666
AlertLogPath = `c:\alert.log`
Expand Down
57 changes: 57 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5320,6 +5320,21 @@ func TestServer_UDFStreamAgents(t *testing.T) {
},
},
},
// Python 2
{
buildFunc: func() error { return nil },
config: udf.FunctionConfig{
Prog: Python2Executable,
Args: []string{"-u", filepath.Join(udfDir, "agent/examples/moving_avg/moving_avg.py")},
Timeout: toml.Duration(time.Minute),
Env: map[string]string{
"PYTHONPATH": strings.Join(
[]string{filepath.Join(udfDir, "agent/py"), os.Getenv("PYTHONPATH")},
string(filepath.ListSeparator),
),
},
},
},
}
for _, agent := range agents {
err := agent.buildFunc()
Expand Down Expand Up @@ -5497,6 +5512,33 @@ func TestServer_UDFStreamAgentsSocket(t *testing.T) {
Timeout: toml.Duration(time.Minute),
},
},
// Python 2
{
startFunc: func() *exec.Cmd {
cmd := exec.Command(
Python2Executable,
"-u",
filepath.Join(udfDir, "agent/examples/mirror/mirror.py"),
filepath.Join(tdir, "mirror.py.sock"),
)
cmd.Stderr = os.Stderr
env := os.Environ()
env = append(env, fmt.Sprintf(
"%s=%s",
"PYTHONPATH",
strings.Join(
[]string{filepath.Join(udfDir, "agent/py"), os.Getenv("PYTHONPATH")},
string(filepath.ListSeparator),
),
))
cmd.Env = env
return cmd
},
config: udf.FunctionConfig{
Socket: filepath.Join(tdir, "mirror.py.sock"),
Timeout: toml.Duration(time.Minute),
},
},
}
for _, agent := range agents {
cmd := agent.startFunc()
Expand Down Expand Up @@ -5645,6 +5687,21 @@ func TestServer_UDFBatchAgents(t *testing.T) {
},
},
},
// Python 2
{
buildFunc: func() error { return nil },
config: udf.FunctionConfig{
Prog: Python2Executable,
Args: []string{"-u", filepath.Join(udfDir, "agent/examples/outliers/outliers.py")},
Timeout: toml.Duration(time.Minute),
Env: map[string]string{
"PYTHONPATH": strings.Join(
[]string{filepath.Join(udfDir, "agent/py"), os.Getenv("PYTHONPATH")},
string(filepath.ListSeparator),
),
},
},
},
}
for _, agent := range agents {
err := agent.buildFunc()
Expand Down
12 changes: 6 additions & 6 deletions udf/agent/examples/outliers/outliers.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ def median(self, data):
l = len(data)
m = l / 2
if l%2 == 0:
left = m
right = m + 1
median = (data[left][0]+ data[right][0]) / 2.0
left = int(m)
right = int(min(m + 1,l-1))
median = (data[left][0] + data[right][0]) / 2.0
else:
left = m
right = m
median = data[m][0]
left = int(m)
right = int(m)
median = data[int(m)][0]
return left, right, median


Expand Down
38 changes: 29 additions & 9 deletions udf/agent/py/kapacitor/udf/agent.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,34 @@
# Kapacitor UDF Agent implementation in Python
#
# Requires protobuf v3
# pip install protobuf==3.0.0b2
# pip install protobuf==3.11.1
from __future__ import absolute_import

import sys
import udf_pb2
from . import udf_pb2
from threading import Lock, Thread
from Queue import Queue

try:
from queue import Queue
except ImportError:
from Queue import Queue


# Setup default in/out io
defaultIn = sys.stdin
defaultOut = sys.stdout

# Check for python3
# https://stackoverflow.com/a/38939320/703144
if sys.version_info >= (3, 0):
defaultIn = sys.stdin.buffer
defaultOut = sys.stdout.buffer

import io
import traceback
import socket
import os
import struct

import logging
logger = logging.getLogger()
Expand Down Expand Up @@ -42,15 +60,17 @@ def end_batch(self, end_req):
pass



# Python implementation of a Kapacitor UDF agent.
# This agent is responsible for reading and writing
# messages over STDIN and STDOUT.
#
# The Agent requires a Handler object in order to fulfill requests.
class Agent(object):
def __init__(self, _in=sys.stdin, out=sys.stdout,handler=None):
def __init__(self, _in=defaultIn, out=defaultOut,handler=None):
self._in = _in
self._out = out

self._thread = None
self.handler = handler
self._write_lock = Lock()
Expand Down Expand Up @@ -87,7 +107,7 @@ def write_response(self, response, flush=False):
finally:
self._write_lock.release()

# Read requests off stdin
# Read requests off input stream
def _read_loop(self):
request = udf_pb2.Request()
while True:
Expand Down Expand Up @@ -152,10 +172,10 @@ def encodeUvarint(writer, value):
bits = value & varintMask
value >>= shiftSize
while value:
writer.write(chr(varintMoreMask|bits))
writer.write(struct.pack("B", varintMoreMask | bits))
bits = value & varintMask
value >>= shiftSize
return writer.write(chr(bits))
return writer.write(struct.pack("B", bits))

# Decode an unsigned varint, max of 32 bits
def decodeUvarint32(reader):
Expand All @@ -165,7 +185,7 @@ def decodeUvarint32(reader):
byte = reader.read(1)
if len(byte) == 0:
raise EOF
b = ord(byte)
b = struct.unpack("B", byte)[0]
result |= ((b & varintMask) << shift)
if not (b & varintMoreMask):
result &= mask32uint
Expand All @@ -186,7 +206,7 @@ def serve(self):
try:
while True:
conn, addr = self._listener.accept()
conn = conn.makefile()
conn = conn.makefile(mode='rwb')
thread = Thread(target=self._accepter.accept, args=(conn,addr))
thread.start()
except:
Expand Down
Loading

0 comments on commit 0852649

Please sign in to comment.