Skip to content

Commit

Permalink
[FLINK-28887][python] Fix the bug of custom metrics in Thread Mode
Browse files Browse the repository at this point in the history
This closes apache#20540.
  • Loading branch information
HuangXingBo committed Aug 11, 2022
1 parent 4739a5c commit 4ebb787
Show file tree
Hide file tree
Showing 35 changed files with 635 additions and 152 deletions.
3 changes: 0 additions & 3 deletions flink-python/dev/integration_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,5 @@ test_module "datastream"
# test fn_execution module
test_module "fn_execution"

# test metrics module
test_module "metrics"

# test table module
test_module "table"
50 changes: 50 additions & 0 deletions flink-python/pyflink/datastream/tests/test_data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
AggregatingStateDescriptor, StateTtlConfig)
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
from pyflink.java_gateway import get_gateway
from pyflink.metrics import Counter, Meter, Distribution
from pyflink.testing.test_case_utils import PyFlinkBatchTestCase, PyFlinkStreamingTestCase
from pyflink.util.java_utils import get_j_env_configuration

Expand Down Expand Up @@ -1718,6 +1719,55 @@ def setUp(self):
config = get_j_env_configuration(self.env._j_stream_execution_environment)
config.setString("python.execution-mode", "thread")

def test_metrics(self):
ds = self.env.from_collection(
[('ab', 'a', decimal.Decimal(1)),
('bdc', 'a', decimal.Decimal(2)),
('cfgs', 'a', decimal.Decimal(3)),
('deeefg', 'a', decimal.Decimal(4))],
type_info=Types.TUPLE(
[Types.STRING(), Types.STRING(), Types.BIG_DEC()]))

class MyMapFunction(MapFunction):
def __init__(self):
self.counter = None # type: Counter
self.counter_value = 0
self.meter = None # type: Meter
self.meter_value = 0
self.value_to_expose = 0
self.distribution = None # type: Distribution

def open(self, runtime_context: RuntimeContext):
self.counter = runtime_context.get_metrics_group().counter("my_counter")
self.meter = runtime_context.get_metrics_group().meter('my_meter', 1)
runtime_context.get_metrics_group().gauge("my_gauge", lambda: self.value_to_expose)
self.distribution = runtime_context.get_metrics_group().distribution(
"my_distribution")

def map(self, value):
self.counter.inc()
self.counter_value += 1
assert self.counter.get_count() == self.counter_value

self.meter.mark_event(1)
self.meter_value += 1
assert self.meter.get_count() == self.meter_value

self.value_to_expose += 1

self.distribution.update(int(value[2]))

return Row(value[0], len(value[0]), value[2])

(ds.key_by(lambda value: value[1])
.map(MyMapFunction(),
output_type=Types.ROW([Types.STRING(), Types.INT(), Types.BIG_DEC()]))
.add_sink(self.test_sink))
self.env.execute('test_basic_operations')
results = self.test_sink.get_results()
expected = ['+I[ab, 2, 1]', '+I[bdc, 3, 2]', '+I[cfgs, 4, 3]', '+I[deeefg, 6, 4]']
self.assert_equals_sorted(expected, results)


@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
class EmbeddedDataStreamBatchTests(DataStreamBatchTests, PyFlinkBatchTestCase):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
MapState, ListStateDescriptor, ListState,
ValueStateDescriptor, ValueState)
from pyflink.fn_execution.embedded.state_impl import KeyedStateBackend
from pyflink.fn_execution.metrics.embedded.metric_impl import MetricGroupImpl
from pyflink.metrics import MetricGroup


class StreamingRuntimeContext(RuntimeContext):
Expand Down Expand Up @@ -75,8 +77,8 @@ def get_job_parameter(self, key: str, default_value: str):
"""
return self._job_parameters[key] if key in self._job_parameters else default_value

def get_metrics_group(self):
return self._runtime_context.getMetricGroup()
def get_metrics_group(self) -> MetricGroup:
return MetricGroupImpl(self._runtime_context.getMetricGroup())

def get_state(self, state_descriptor: ValueStateDescriptor) -> ValueState:
return self._keyed_state_backend.get_value_state(state_descriptor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
NonKeyedTimerServiceImpl,
)
from pyflink.fn_execution.datastream.window.window_operator import WindowOperator
from pyflink.metrics.metricbase import GenericMetricGroup
from pyflink.fn_execution.metrics.process.metric_impl import GenericMetricGroup


class Operation(operations.OneInputOperation, abc.ABC):
Expand Down
17 changes: 17 additions & 0 deletions flink-python/pyflink/fn_execution/metrics/embedded/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
41 changes: 41 additions & 0 deletions flink-python/pyflink/fn_execution/metrics/embedded/counter_impl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.metrics import Counter


class CounterImpl(Counter):
def __init__(self, inner_counter):
self._inner_counter = inner_counter

def inc(self, n: int = 1):
"""
Increment the current count by the given value.
"""
self._inner_counter.inc(n)

def dec(self, n: int = 1):
"""
Decrement the current count by 1.
"""
self.inc(-n)

def get_count(self) -> int:
"""
Returns the current count.
"""
return self._inner_counter.getCount()
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.metrics import Distribution


class DistributionImpl(Distribution):
def __init__(self, inner_distribution):
self._inner_distribution = inner_distribution

def update(self, value):
"""
Updates the distribution value.
"""
self._inner_distribution.update(value)
36 changes: 36 additions & 0 deletions flink-python/pyflink/fn_execution/metrics/embedded/meter_impl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.metrics import Meter


class MeterImpl(Meter):

def __init__(self, inner_counter):
self._inner_counter = inner_counter

def mark_event(self, value: int = 1):
"""
Mark occurrence of the specified number of events.
"""
self._inner_counter.markEvent(value)

def get_count(self) -> int:
"""
Get number of events marked on the meter.
"""
return self._inner_counter.getCount()
61 changes: 61 additions & 0 deletions flink-python/pyflink/fn_execution/metrics/embedded/metric_impl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import Callable

from pemja import findClass

from pyflink.fn_execution.metrics.embedded.counter_impl import CounterImpl
from pyflink.fn_execution.metrics.embedded.distribution_impl import DistributionImpl
from pyflink.fn_execution.metrics.embedded.meter_impl import MeterImpl
from pyflink.metrics import MetricGroup, Counter, Distribution, Meter

JMeterView = findClass('org.apache.flink.metrics.MeterView')
JMetricGauge = findClass('org.apache.flink.python.metric.embedded.MetricGauge')
JMetricDistribution = findClass('org.apache.flink.python.metric.embedded.MetricDistribution')


class MetricGroupImpl(MetricGroup):

def __init__(self, metrics):
self._metrics = metrics

def add_group(self, name: str, extra: str = None) -> 'MetricGroup':
if extra is None:
return MetricGroupImpl(self._metrics.addGroup(name))
else:
return MetricGroupImpl(self._metrics.addGroup(name, extra))

def counter(self, name: str) -> 'Counter':
return CounterImpl(self._metrics.counter(name))

def gauge(self, name: str, obj: Callable[[], int]) -> None:
self._metrics.gauge(name, JMetricGauge(PythonGaugeCallable(obj)))

def meter(self, name: str, time_span_in_seconds: int = 60) -> 'Meter':
return MeterImpl(self._metrics.meter(name, JMeterView(time_span_in_seconds)))

def distribution(self, name: str) -> 'Distribution':
return DistributionImpl(self._metrics.gauge(name, JMetricDistribution()))


class PythonGaugeCallable(object):
def __init__(self, func: Callable):
self.func = func

def get_value(self):
return self.func()
17 changes: 17 additions & 0 deletions flink-python/pyflink/fn_execution/metrics/process/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
49 changes: 49 additions & 0 deletions flink-python/pyflink/fn_execution/metrics/process/counter_impl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.metrics import Counter


class CounterImpl(Counter):
def __init__(self, inner_counter):
self._inner_counter = inner_counter

def inc(self, n: int = 1):
"""
Increment the current count by the given value.
.. versionadded:: 1.11.0
"""
self._inner_counter.inc(n)

def dec(self, n: int = 1):
"""
Decrement the current count by 1.
.. versionadded:: 1.11.0
"""
self.inc(-n)

def get_count(self) -> int:
"""
Returns the current count.
.. versionadded:: 1.11.0
"""
from apache_beam.metrics.execution import MetricsEnvironment
container = MetricsEnvironment.current_container()
return container.get_counter(self._inner_counter.metric_name).get_cumulative()
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.metrics import Distribution


class DistributionImpl(Distribution):
def __init__(self, inner_distribution):
self._inner_distribution = inner_distribution

def update(self, value):
"""
Updates the distribution value.
.. versionadded:: 1.11.0
"""
self._inner_distribution.update(value)
Loading

0 comments on commit 4ebb787

Please sign in to comment.