Skip to content

Commit

Permalink
Go SDK usercounters
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Burke committed Mar 27, 2018
1 parent 068c76a commit c8f9bfc
Show file tree
Hide file tree
Showing 11 changed files with 1,104 additions and 33 deletions.
13 changes: 11 additions & 2 deletions sdks/go/examples/wordcount/wordcount.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"fmt"
"log"
"regexp"
"strings"

"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
Expand Down Expand Up @@ -101,10 +102,18 @@ var (
// returns a PCollection of type string. Also, using named function transforms allows
// for easy reuse, modular testing, and an improved monitoring experience.

var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
var (
wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
empty = beam.NewCounter("extract", "emptyLines")
lineLen = beam.NewDistribution("extract", "lineLenDistro")
)

// extractFn is a DoFn that emits the words in a given line.
func extractFn(line string, emit func(string)) {
func extractFn(ctx context.Context, line string, emit func(string)) {
lineLen.Update(ctx, int64(len(line)))
if len(strings.TrimSpace(line)) == 0 {
empty.Inc(ctx, 1)
}
for _, word := range wordRE.FindAllString(line, -1) {
emit(word)
}
Expand Down
Loading

0 comments on commit c8f9bfc

Please sign in to comment.