Skip to content

Commit

Permalink
[FLINK-26709][table] Replace TableConfig.getConfiguration.get/getOpti…
Browse files Browse the repository at this point in the history
…onal()

Replace `TableConfig.getConfiguration().get(<option>)/getOptional(<option>)`
 with `TableConfig.get(<option>)/getOptional(<option>)` since `TableConfig`
is now a `ReadableConfig` and the `get/getOptional` give a full view,
including the `rootConfiguration`, which makes all the options coming
from the environment (flink-conf.yaml, CLI params) available.
  • Loading branch information
matriv authored and twalthr committed Mar 22, 2022
1 parent 79110e9 commit 0b4b535
Show file tree
Hide file tree
Showing 45 changed files with 74 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ private static boolean isStreamingMode(Table table) {
TableEnvironment tableEnv = ((TableImpl) table).getTableEnvironment();
if (tableEnv instanceof TableEnvironmentImpl) {
final RuntimeExecutionMode mode =
tableEnv.getConfig().getConfiguration().get(ExecutionOptions.RUNTIME_MODE);
tableEnv.getConfig().get(ExecutionOptions.RUNTIME_MODE);
if (mode == RuntimeExecutionMode.AUTOMATIC) {
throw new RuntimeException(
String.format("Runtime execution mode '%s' is not supported yet.", mode));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,19 @@ public void testSetAndResetOption() {
sessionContext.set(NAME.key(), "test");
// runtime config from flink-conf
sessionContext.set(OBJECT_REUSE.key(), "false");
assertThat(getConfiguration().getString(TABLE_SQL_DIALECT)).isEqualTo("hive");
assertThat(getConfiguration().getInteger(MAX_PARALLELISM)).isEqualTo(128);
assertThat(getConfiguration().getString(NAME)).isEqualTo("test");
assertThat(getConfiguration().getBoolean(OBJECT_REUSE)).isFalse();
assertThat(getConfiguration().get(TABLE_SQL_DIALECT)).isEqualTo("hive");
assertThat(getConfiguration().get(MAX_PARALLELISM)).isEqualTo(128);
assertThat(getConfiguration().get(NAME)).isEqualTo("test");
assertThat(getConfiguration().get(OBJECT_REUSE)).isFalse();

sessionContext.reset();
assertThat(getConfiguration().getString(TABLE_SQL_DIALECT)).isEqualTo("default");
assertThat(getConfiguration().get(TABLE_SQL_DIALECT)).isEqualTo("default");
assertThat(getConfiguration().get(NAME)).isNull();
// The value of MAX_PARALLELISM in DEFAULTS_ENVIRONMENT_FILE is 16
assertThat(getConfiguration().getInteger(MAX_PARALLELISM)).isEqualTo(16);
assertThat(getConfiguration().getString(NAME, null)).isNull();
assertThat(getConfiguration().get(MAX_PARALLELISM)).isEqualTo(16);
assertThat(getConfiguration().getOptional(NAME)).isEmpty();
// The value of OBJECT_REUSE in origin configuration is true
assertThat(getConfiguration().getBoolean(OBJECT_REUSE)).isTrue();
assertThat(getConfiguration().get(OBJECT_REUSE)).isTrue();
}

@Test
Expand All @@ -106,22 +106,22 @@ public void testSetAndResetKeyInConfigOptions() {
// runtime config from flink-conf
sessionContext.set(OBJECT_REUSE.key(), "false");

assertThat(getConfiguration().getString(TABLE_SQL_DIALECT)).isEqualTo("hive");
assertThat(getConfiguration().getInteger(MAX_PARALLELISM)).isEqualTo(128);
assertThat(getConfiguration().getString(NAME)).isEqualTo("test");
assertThat(getConfiguration().getBoolean(OBJECT_REUSE)).isFalse();
assertThat(getConfiguration().get(TABLE_SQL_DIALECT)).isEqualTo("hive");
assertThat(getConfiguration().get(MAX_PARALLELISM)).isEqualTo(128);
assertThat(getConfiguration().get(NAME)).isEqualTo("test");
assertThat(getConfiguration().get(OBJECT_REUSE)).isFalse();

sessionContext.reset(TABLE_SQL_DIALECT.key());
assertThat(getConfiguration().getString(TABLE_SQL_DIALECT)).isEqualTo("default");
assertThat(getConfiguration().get(TABLE_SQL_DIALECT)).isEqualTo("default");

sessionContext.reset(MAX_PARALLELISM.key());
assertThat(getConfiguration().getInteger(MAX_PARALLELISM)).isEqualTo(16);
assertThat(getConfiguration().get(MAX_PARALLELISM)).isEqualTo(16);

sessionContext.reset(NAME.key());
assertThat(getConfiguration().get(NAME)).isNull();

sessionContext.reset(OBJECT_REUSE.key());
assertThat(getConfiguration().getBoolean(OBJECT_REUSE)).isTrue();
assertThat(getConfiguration().get(OBJECT_REUSE)).isTrue();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,7 @@ public void writeToFile(File file, boolean ignoreIfExists) {
internalPlan.writeToFile(
file,
ignoreIfExists,
!tableEnvironment
.getConfig()
.getConfiguration()
.get(TableConfigOptions.PLAN_FORCE_RECOMPILE));
!tableEnvironment.getConfig().get(TableConfigOptions.PLAN_FORCE_RECOMPILE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ private CompiledPlan compilePlanAndWrite(
return loadPlan(PlanReference.fromFile(filePath));
}

if (!tableConfig.getConfiguration().get(TableConfigOptions.PLAN_FORCE_RECOMPILE)) {
if (!tableConfig.get(TableConfigOptions.PLAN_FORCE_RECOMPILE)) {
throw new TableException(
String.format(
"Cannot overwrite the plan file '%s'. "
Expand Down Expand Up @@ -782,7 +782,7 @@ public TableResultInternal executeInternal(List<ModifyOperation> operations) {
List<Transformation<?>> transformations = translate(operations);
List<String> sinkIdentifierNames = extractSinkIdentifierNames(operations);
TableResultInternal result = executeInternal(transformations, sinkIdentifierNames);
if (tableConfig.getConfiguration().get(TABLE_DML_SYNC)) {
if (tableConfig.get(TABLE_DML_SYNC)) {
try {
result.await();
} catch (InterruptedException | ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,7 @@ public static boolean isSourceChangeEventsDuplicate(
boolean isCDCSource =
!mode.containsOnly(RowKind.INSERT) && !isUpsertSource(resolvedSchema, tableSource);
boolean changeEventsDuplicate =
tableConfig
.getConfiguration()
.getBoolean(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE);
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE);
boolean hasPrimaryKey = resolvedSchema.getPrimaryKey().isPresent();
return isCDCSource && changeEventsDuplicate && hasPrimaryKey;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@ public PushFilterIntoSourceScanRuleBase(RelOptRuleOperand operand, String descri
@Override
public boolean matches(RelOptRuleCall call) {
TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig();
return tableConfig
.getConfiguration()
.getBoolean(
OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED);
return tableConfig.get(
OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED);
}

protected List<RexNode> convertExpressionToRexNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,7 @@ protected FlinkLogicalTableSourceScan getNewScan(
abilitySpec = sourceWatermarkSpec;
} else {
final Duration idleTimeout =
tableConfig
.getConfiguration()
.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
final long idleTimeoutMillis;
if (!idleTimeout.isZero() && !idleTimeout.isNegative()) {
idleTimeoutMillis = idleTimeout.toMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,8 @@ protected boolean canPushDown(
BatchPhysicalGroupAggregateBase aggregate,
BatchPhysicalTableSourceScan tableSourceScan) {
TableConfig tableConfig = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig();
if (!tableConfig
.getConfiguration()
.getBoolean(
OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) {
if (!tableConfig.get(
OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED)) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ private ContextResolvedTable computeContextResolvedTable(
if (hintedOptions.isEmpty()) {
return contextResolvedTable;
}
final ReadableConfig config = context.getTableConfig().getConfiguration();
if (!config.get(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED)) {
if (!context.getTableConfig().get(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED)) {
throw new ValidationException(
String.format(
"The '%s' hint is allowed only when the config option '%s' is set to true.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class TableConfigUtils {
* @return true if the given operator is disabled.
*/
public static boolean isOperatorDisabled(TableConfig tableConfig, OperatorType operatorType) {
String value = tableConfig.getConfiguration().getString(TABLE_EXEC_DISABLED_OPERATORS);
String value = tableConfig.get(TABLE_EXEC_DISABLED_OPERATORS);
if (value == null) {
return false;
}
Expand Down Expand Up @@ -72,8 +72,7 @@ public static boolean isOperatorDisabled(TableConfig tableConfig, OperatorType o
* @return the aggregate phase strategy
*/
public static AggregatePhaseStrategy getAggPhaseStrategy(TableConfig tableConfig) {
String aggPhaseConf =
tableConfig.getConfiguration().getString(TABLE_OPTIMIZER_AGG_PHASE_STRATEGY).trim();
String aggPhaseConf = tableConfig.get(TABLE_OPTIMIZER_AGG_PHASE_STRATEGY).trim();
if (aggPhaseConf.isEmpty()) {
return AggregatePhaseStrategy.AUTO;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ class BatchPlanner(
// deadlock breakup
processors.add(new DeadlockBreakupProcessor())
// multiple input creation
if (getTableConfig.getConfiguration.getBoolean(
OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED)) {
if (getTableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED)) {
processors.add(new MultipleInputNodeCreationProcessor(false))
}
processors.add(new ForwardHashExchangeProcessor)
Expand Down Expand Up @@ -154,7 +153,7 @@ class BatchPlanner(

override def beforeTranslation(): Unit = {
super.beforeTranslation()
val runtimeMode = getConfiguration.get(ExecutionOptions.RUNTIME_MODE)
val runtimeMode = getTableConfig.get(ExecutionOptions.RUNTIME_MODE)
if (runtimeMode != RuntimeExecutionMode.BATCH) {
throw new IllegalArgumentException(
"Mismatch between configured runtime mode and actual runtime mode. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ abstract class PlannerBase(
Thread.currentThread().getContextClassLoader)

// Use config parallelism to override env parallelism.
val defaultParallelism = getTableConfig.getConfiguration.getInteger(
val defaultParallelism = getTableConfig.get(
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM)
if (defaultParallelism > 0) {
getExecEnv.getConfig.setParallelism(defaultParallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ class FlinkRelMdColumnNullCount private extends MetadataHandler[ColumnNullCount]
// If predicate has $index is not null, null count of index is must be 0 after predicate.
val rexBuilder = rel.getCluster.getRexBuilder
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(rel)
val maxCnfNodeCount = tableConfig.getConfiguration.getInteger(
FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT)
val maxCnfNodeCount = tableConfig.get(FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT)
val cnf = FlinkRexUtil.toCnf(rexBuilder, maxCnfNodeCount, predicate)
val conjunctions = RelOptUtil.conjunctions(cnf)
val notNullPredicatesAtIndexField = conjunctions.exists {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,7 @@ class FlinkRelMdDistinctRowCount private extends MetadataHandler[BuiltInMetadata
} else {
val rexBuilder = rel.getCluster.getRexBuilder
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(rel)
val maxCnfNodeCount = tableConfig.getConfiguration.getInteger(
FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT)
val maxCnfNodeCount = tableConfig.get(FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT)
val cnf = FlinkRexUtil.toCnf(rexBuilder, maxCnfNodeCount, predicate)
val conjunctions = RelOptUtil.conjunctions(cnf)
val conjunctionsWithoutExpandId = conjunctions.filterNot { c =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ class FlinkRelMdDistribution private extends MetadataHandler[FlinkDistribution]

def flinkDistribution(sort: Sort, mq: RelMetadataQuery): FlinkRelDistribution = {
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(sort)
val enableRangeSort = tableConfig.getConfiguration.getBoolean(
BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED)
val enableRangeSort = tableConfig.get(BatchPhysicalSortRule.TABLE_EXEC_RANGE_SORT_ENABLED)
if ((sort.getCollation.getFieldCollations.nonEmpty &&
sort.fetch == null && sort.offset == null) && enableRangeSort) {
//If Sort is global sort, and the table config allows the range partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,7 @@ class FlinkRelMdRowCount private extends MetadataHandler[BuiltInMetadata.RowCoun
val inputRowCnt = mq.getRowCount(input)
val config = rel.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig
val parallelism = (inputRowCnt /
config.getConfiguration.getLong(
FlinkRelMdRowCount.TABLE_OPTIMIZER_ROWS_PER_LOCALAGG) + 1).toInt
config.get(FlinkRelMdRowCount.TABLE_OPTIMIZER_ROWS_PER_LOCALAGG) + 1).toInt
if (parallelism == 1) {
ndvOfGroupKeysOnGlobalAgg
} else if (grouping.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ class SelectivityEstimator(rel: RelNode, mq: FlinkRelMetadataQuery)

private val rexBuilder = rel.getCluster.getRexBuilder
private val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(rel)
private val maxCnfNodeCount = tableConfig.getConfiguration.getInteger(
FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT)
private val maxCnfNodeCount = tableConfig.get(FlinkRexUtil.TABLE_OPTIMIZER_CNF_NODES_LIMIT)

// these default values is referred to RelMdUtil#guessSelectivity
private[flink] val defaultComparisonSelectivity = Some(0.5d)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ class FlinkLogicalSortBatchConverter extends ConverterRule(
val sort = rel.asInstanceOf[LogicalSort]
val newInput = RelOptRule.convert(sort.getInput, FlinkConventions.LOGICAL)
val config = sort.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig
val enableRangeSort = config.getConfiguration.getBoolean(TABLE_EXEC_RANGE_SORT_ENABLED)
val limitValue = config.getConfiguration.getInteger(TABLE_EXEC_SORT_DEFAULT_LIMIT)
val enableRangeSort = config.get(TABLE_EXEC_RANGE_SORT_ENABLED)
val limitValue = config.get(TABLE_EXEC_SORT_DEFAULT_LIMIT)
val (offset, fetch) = if (sort.fetch == null && sort.offset == null
&& !enableRangeSort && limitValue > 0) {
//force the sort add limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class BatchPhysicalHashAggregate(
} else {
// If partialKey is enabled, try to use partial key to satisfy the required distribution
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
val partialKeyEnabled = tableConfig.getConfiguration.getBoolean(
val partialKeyEnabled = tableConfig.get(
BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED)
partialKeyEnabled && groupKeysList.containsAll(shuffleKeys)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ abstract class BatchPhysicalOverAggregateBase(
val isAllFieldsFromInput = requiredDistribution.getKeys.forall(_ < inputFieldCnt)
if (isAllFieldsFromInput) {
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
if (tableConfig.getConfiguration.getBoolean(
if (tableConfig.get(
BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED)) {
ImmutableIntList.of(partitionKeyIndices: _*).containsAll(requiredDistribution.getKeys)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class BatchPhysicalPythonGroupAggregate(
} else {
// If partialKey is enabled, try to use partial key to satisfy the required distribution
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
val partialKeyEnabled = tableConfig.getConfiguration.getBoolean(
val partialKeyEnabled = tableConfig.get(
BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED)
partialKeyEnabled && groupKeysList.containsAll(shuffleKeys)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class BatchPhysicalRank(
} else {
// If partialKey is enabled, try to use partial key to satisfy the required distribution
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
val partialKeyEnabled = tableConfig.getConfiguration.getBoolean(
val partialKeyEnabled = tableConfig.get(
BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED)
partialKeyEnabled && partitionKeyList.containsAll(shuffleKeys)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class BatchPhysicalSortAggregate(
} else {
// If partialKey is enabled, try to use partial key to satisfy the required distribution
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
val partialKeyEnabled = tableConfig.getConfiguration.getBoolean(
val partialKeyEnabled = tableConfig.get(
BatchPhysicalJoinRuleBase.TABLE_OPTIMIZER_SHUFFLE_BY_PARTIAL_KEY_ENABLED)
partialKeyEnabled && groupKeysList.containsAll(shuffleKeys)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
val inputChangelogMode = ChangelogPlanUtils.getChangelogMode(
sink.getInput.asInstanceOf[StreamPhysicalRel]).get
val primaryKeys = sink.contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes
val upsertMaterialize = tableConfig.getConfiguration.get(
val upsertMaterialize = tableConfig.get(
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE) match {
case UpsertMaterialize.FORCE => primaryKeys.nonEmpty
case UpsertMaterialize.NONE => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ object SubplanReuser {
* Finds duplicated sub-plans and return the reused plan.
*/
def reuseDuplicatedSubplan(rels: Seq[RelNode], tableConfig: TableConfig): Seq[RelNode] = {
if (!tableConfig.getConfiguration.getBoolean(
OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED)) {
if (!tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED)) {
return rels
}
val tableSourceReuseEnabled = tableConfig.getConfiguration.getBoolean(
val tableSourceReuseEnabled = tableConfig.get(
OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED)
val context = new SubplanReuseContext(tableSourceReuseEnabled, rels: _*)
val reuseShuttle = new SubplanReuseShuttle(context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class JoinDeriveNullFilterRule
val rexBuilder = join.getCluster.getRexBuilder
val mq = FlinkRelMetadataQuery.reuseOrCreate(join.getCluster.getMetadataQuery)
val conf = FlinkRelOptUtil.getTableConfigFromContext(join)
val minNullCount = conf.getConfiguration.getLong(
val minNullCount = conf.get(
JoinDeriveNullFilterRule.TABLE_OPTIMIZER_JOIN_NULL_FILTER_THRESHOLD)

def createIsNotNullFilter(input: RelNode, keys: ImmutableIntList): RelNode = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class PushFilterIntoLegacyTableSourceScanRule extends RelOptRule(

override def matches(call: RelOptRuleCall): Boolean = {
val config = call.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig
if (!config.getConfiguration.getBoolean(
if (!config.get(
OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) {
return false
}
Expand Down
Loading

0 comments on commit 0b4b535

Please sign in to comment.