Skip to content

Commit 41d9dd4

Browse files
author
Nathaniel Cook
committed
Merge pull request influxdata#159 from influxdata/nc-udf-fixes
udf fixes
2 parents be2435e + 2247587 commit 41d9dd4

File tree

7 files changed

+71
-64
lines changed

7 files changed

+71
-64
lines changed

cmd/kapacitor/main.go

-1
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,6 @@ func doDisable(args []string) error {
600600
if rp.Error != "" {
601601
return errors.New(rp.Error)
602602
}
603-
return nil
604603
}
605604
return nil
606605
}

udf/agent/py/agent.py

+10-7
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from threading import Lock, Thread
99
from Queue import Queue
1010
import io
11+
import traceback
1112

1213
import logging
1314
logger = logging.getLogger()
@@ -23,19 +24,19 @@
2324
#
2425
# To write Points/Batches back to the Agent/Kapacitor use the Agent.write_response method, which is thread safe.
2526
class Handler(object):
26-
def info():
27+
def info(self):
2728
pass
28-
def init(init_req):
29+
def init(self, init_req):
2930
pass
30-
def snapshot():
31+
def snapshot(self):
3132
pass
32-
def restore(restore_req):
33+
def restore(self, restore_req):
3334
pass
34-
def begin_batch():
35+
def begin_batch(self):
3536
pass
36-
def point():
37+
def point(self):
3738
pass
38-
def end_batch(end_req):
39+
def end_batch(self, end_req):
3940
pass
4041

4142

@@ -122,11 +123,13 @@ def _read_loop(self):
122123
except EOF:
123124
break
124125
except Exception as e:
126+
traceback.print_exc()
125127
error = "error processing request of type %s: %s" % (msg, e)
126128
logger.error(error)
127129
response = udf_pb2.Response()
128130
response.error.error = error
129131
self.write_response(response)
132+
break
130133

131134
# Indicates the end of a file/stream has been reached.
132135
class EOF(Exception):

udf/agent/py/udf_pb2.py

+40-42
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

udf/udf.pb.go

+4-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

udf/udf.proto

+4-4
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,10 @@ message Point {
167167
// Indicates the end of a batch and contains
168168
// all meta data associated with the batch.
169169
message EndBatch {
170-
string Name = 1;
171-
string Group = 2;
172-
int64 TMax = 3;
173-
map<string,string> Tags = 4;
170+
string name = 1;
171+
string group = 2;
172+
int64 tmax = 3;
173+
map<string,string> tags = 4;
174174
}
175175

176176
//-----------------------------------------------------------

udf_process.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -460,20 +460,24 @@ func (p *UDFProcess) watchKeepalive() {
460460
defer func() {
461461
if err != nil {
462462
p.setError(err)
463+
aborted := make(chan struct{})
463464
go func() {
464465
timeout := p.keepaliveTimeout * 2
465466
if timeout <= 0 {
466467
timeout = time.Second
467468
}
468469
time.Sleep(timeout)
469-
p.mu.Lock()
470-
defer p.mu.Unlock()
471-
if !p.stopped {
470+
select {
471+
case <-aborted:
472+
// We cleanly aborted process is stopped
473+
default:
474+
// We failed to abort just kill it.
472475
p.logger.Println("E! process not responding! killing")
473476
p.kill()
474477
}
475478
}()
476479
p.abort()
480+
close(aborted)
477481
}
478482
}()
479483
defer p.requestsGroup.Done()
@@ -640,7 +644,10 @@ func (p *UDFProcess) writeBatch(b models.Batch) error {
640644

641645
req.Message = &udf.Request_End{
642646
&udf.EndBatch{
643-
Name: b.Name,
647+
Name: b.Name,
648+
Group: string(b.Group),
649+
Tmax: b.TMax.UnixNano(),
650+
Tags: b.Tags,
644651
},
645652
}
646653
return p.writeRequest(req)
@@ -746,7 +753,7 @@ func (p *UDFProcess) handleResponse(response *udf.Response) error {
746753
}
747754
case *udf.Response_End:
748755
p.batch.Name = msg.End.Name
749-
p.batch.TMax = time.Unix(0, msg.End.TMax)
756+
p.batch.TMax = time.Unix(0, msg.End.Tmax)
750757
p.batch.Group = models.GroupID(msg.End.Group)
751758
p.batch.Tags = msg.End.Tags
752759
select {

update_tick_docs.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
# of structs into property methods and chaining methods.
66

77
dest=$1 # output path for the .md files
8-
docspath=${2-/kapacitor/v0.2/tick}
8+
docspath=${2-/kapacitor/v0.10/tick}
99

1010
if [ -z "$dest" ]
1111
then

0 commit comments

Comments
 (0)