mirror of https://github.com/apache/flink.git
parent
9ba5afc260
commit
5749a07b07
@ -0,0 +1,125 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.table.planner.plan.nodes.exec.stream;
|
||||
|
||||
import org.apache.flink.FlinkVersion;
|
||||
import org.apache.flink.api.dag.Transformation;
|
||||
import org.apache.flink.configuration.ReadableConfig;
|
||||
import org.apache.flink.table.api.TableException;
|
||||
import org.apache.flink.table.connector.ChangelogMode;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.functions.ProcessTableFunction;
|
||||
import org.apache.flink.table.planner.delegation.PlannerBase;
|
||||
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
|
||||
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
|
||||
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
|
||||
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
|
||||
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
|
||||
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import org.apache.calcite.rex.RexCall;
|
||||
import org.apache.calcite.rex.RexNode;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* {@link StreamExecNode} for {@link ProcessTableFunction}.
|
||||
*
|
||||
* <p>A process table function (PTF) maps zero, one, or multiple tables to zero, one, or multiple
|
||||
* rows. PTFs enable implementing user-defined operators that can be as feature-rich as built-in
|
||||
* operations. PTFs have access to Flink's managed state, event-time and timer services, underlying
|
||||
* table changelogs, and can take multiple ordered and/or partitioned tables to produce a new table.
|
||||
*/
|
||||
@ExecNodeMetadata(
|
||||
name = "stream-exec-process-table-function",
|
||||
version = 1,
|
||||
producedTransformations = StreamExecProcessTableFunction.PROCESS_TRANSFORMATION,
|
||||
minPlanVersion = FlinkVersion.v2_0,
|
||||
minStateVersion = FlinkVersion.v2_0)
|
||||
public class StreamExecProcessTableFunction extends ExecNodeBase<RowData>
|
||||
implements StreamExecNode<RowData>, SingleTransformationTranslator<RowData> {
|
||||
|
||||
public static final String PROCESS_TRANSFORMATION = "process";
|
||||
|
||||
public static final String FIELD_NAME_UID = "uid";
|
||||
public static final String FIELD_NAME_FUNCTION_CALL = "functionCall";
|
||||
public static final String FIELD_NAME_INPUT_CHANGELOG_MODES = "inputChangelogModes";
|
||||
|
||||
@JsonProperty(FIELD_NAME_UID)
|
||||
private final String uid;
|
||||
|
||||
@JsonProperty(FIELD_NAME_FUNCTION_CALL)
|
||||
private final RexCall invocation;
|
||||
|
||||
@JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODES)
|
||||
private final List<ChangelogMode> inputChangelogModes;
|
||||
|
||||
public StreamExecProcessTableFunction(
|
||||
ReadableConfig tableConfig,
|
||||
List<InputProperty> inputProperties,
|
||||
RowType outputType,
|
||||
String description,
|
||||
String uid,
|
||||
RexCall invocation,
|
||||
List<ChangelogMode> inputChangelogModes) {
|
||||
this(
|
||||
ExecNodeContext.newNodeId(),
|
||||
ExecNodeContext.newContext(StreamExecProcessTableFunction.class),
|
||||
ExecNodeContext.newPersistedConfig(
|
||||
StreamExecProcessTableFunction.class, tableConfig),
|
||||
inputProperties,
|
||||
outputType,
|
||||
description,
|
||||
uid,
|
||||
invocation,
|
||||
inputChangelogModes);
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
public StreamExecProcessTableFunction(
|
||||
@JsonProperty(FIELD_NAME_ID) int id,
|
||||
@JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
|
||||
@JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
|
||||
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
|
||||
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
|
||||
@JsonProperty(FIELD_NAME_DESCRIPTION) String description,
|
||||
@JsonProperty(FIELD_NAME_UID) String uid,
|
||||
@JsonProperty(FIELD_NAME_FUNCTION_CALL) RexNode invocation,
|
||||
@JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODES)
|
||||
List<ChangelogMode> inputChangelogModes) {
|
||||
super(id, context, persistedConfig, inputProperties, outputType, description);
|
||||
this.uid = uid;
|
||||
this.invocation = (RexCall) invocation;
|
||||
this.inputChangelogModes = inputChangelogModes;
|
||||
}
|
||||
|
||||
public String getUid() {
|
||||
return uid;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Transformation<RowData> translateToPlanInternal(
|
||||
PlannerBase planner, ExecNodeConfig config) {
|
||||
throw new TableException("Process table function is not fully supported yet.");
|
||||
}
|
||||
}
|
@ -0,0 +1,128 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.table.planner.plan.nodes.logical;
|
||||
|
||||
import org.apache.flink.annotation.Internal;
|
||||
import org.apache.flink.table.functions.FunctionDefinition;
|
||||
import org.apache.flink.table.functions.FunctionKind;
|
||||
import org.apache.flink.table.functions.TemporalTableFunction;
|
||||
import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
|
||||
import org.apache.flink.table.planner.utils.ShortcutUtils;
|
||||
|
||||
import org.apache.calcite.plan.Convention;
|
||||
import org.apache.calcite.plan.RelOptCluster;
|
||||
import org.apache.calcite.plan.RelOptRule;
|
||||
import org.apache.calcite.plan.RelOptRuleCall;
|
||||
import org.apache.calcite.plan.RelTraitSet;
|
||||
import org.apache.calcite.rel.RelNode;
|
||||
import org.apache.calcite.rel.convert.ConverterRule;
|
||||
import org.apache.calcite.rel.core.TableFunctionScan;
|
||||
import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
|
||||
import org.apache.calcite.rel.metadata.RelColumnMapping;
|
||||
import org.apache.calcite.rel.type.RelDataType;
|
||||
import org.apache.calcite.rex.RexCall;
|
||||
import org.apache.calcite.rex.RexNode;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Subclass of {@link TableFunctionScan} that is a relational expression which calls a {@link
|
||||
* FunctionKind#TABLE} or {@link FunctionKind#PROCESS_TABLE} in Flink.
|
||||
*/
|
||||
@Internal
|
||||
public class FlinkLogicalTableFunctionScan extends TableFunctionScan implements FlinkLogicalRel {
|
||||
|
||||
public static final Converter CONVERTER =
|
||||
new Converter(
|
||||
ConverterRule.Config.INSTANCE.withConversion(
|
||||
LogicalTableFunctionScan.class,
|
||||
Convention.NONE,
|
||||
FlinkConventions.LOGICAL(),
|
||||
"FlinkLogicalTableFunctionScanConverter"));
|
||||
|
||||
public FlinkLogicalTableFunctionScan(
|
||||
RelOptCluster cluster,
|
||||
RelTraitSet traitSet,
|
||||
List<RelNode> inputs,
|
||||
RexNode rexCall,
|
||||
@Nullable Type elementType,
|
||||
RelDataType rowType,
|
||||
@Nullable Set<RelColumnMapping> columnMappings) {
|
||||
super(cluster, traitSet, inputs, rexCall, elementType, rowType, columnMappings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableFunctionScan copy(
|
||||
RelTraitSet traitSet,
|
||||
List<RelNode> inputs,
|
||||
RexNode rexCall,
|
||||
@Nullable Type elementType,
|
||||
RelDataType rowType,
|
||||
@Nullable Set<RelColumnMapping> columnMappings) {
|
||||
return new FlinkLogicalTableFunctionScan(
|
||||
getCluster(), traitSet, inputs, rexCall, elementType, rowType, columnMappings);
|
||||
}
|
||||
|
||||
@Internal
|
||||
public static class Converter extends ConverterRule {
|
||||
|
||||
protected Converter(Config config) {
|
||||
super(config);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matches(RelOptRuleCall call) {
|
||||
final LogicalTableFunctionScan functionScan = call.rel(0);
|
||||
final FunctionDefinition functionDefinition =
|
||||
ShortcutUtils.unwrapFunctionDefinition(functionScan.getCall());
|
||||
if (functionDefinition == null) {
|
||||
// For Calcite stack functions
|
||||
return true;
|
||||
}
|
||||
final boolean isTableFunction =
|
||||
functionDefinition.getKind() == FunctionKind.TABLE
|
||||
|| functionDefinition.getKind() == FunctionKind.PROCESS_TABLE;
|
||||
return isTableFunction && !(functionDefinition instanceof TemporalTableFunction);
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable RelNode convert(RelNode rel) {
|
||||
final LogicalTableFunctionScan functionScan = (LogicalTableFunctionScan) rel;
|
||||
final RelTraitSet traitSet =
|
||||
rel.getTraitSet().replace(FlinkConventions.LOGICAL()).simplify();
|
||||
final List<RelNode> newInputs =
|
||||
functionScan.getInputs().stream()
|
||||
.map(input -> RelOptRule.convert(input, FlinkConventions.LOGICAL()))
|
||||
.collect(Collectors.toList());
|
||||
final RexCall rexCall = (RexCall) functionScan.getCall();
|
||||
return new FlinkLogicalTableFunctionScan(
|
||||
functionScan.getCluster(),
|
||||
traitSet,
|
||||
newInputs,
|
||||
rexCall,
|
||||
functionScan.getElementType(),
|
||||
functionScan.getRowType(),
|
||||
functionScan.getColumnMappings());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,219 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to you under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.table.planner.plan.nodes.physical.stream;
|
||||
|
||||
import org.apache.flink.table.api.ValidationException;
|
||||
import org.apache.flink.table.catalog.ContextResolvedFunction;
|
||||
import org.apache.flink.table.connector.ChangelogMode;
|
||||
import org.apache.flink.table.functions.FunctionIdentifier;
|
||||
import org.apache.flink.table.functions.ProcessTableFunction;
|
||||
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
|
||||
import org.apache.flink.table.planner.calcite.RexTableArgCall;
|
||||
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
|
||||
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
|
||||
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
|
||||
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction;
|
||||
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
|
||||
import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils;
|
||||
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
|
||||
import org.apache.flink.table.types.inference.StaticArgument;
|
||||
import org.apache.flink.table.types.inference.StaticArgumentTrait;
|
||||
import org.apache.flink.table.types.inference.SystemTypeInference;
|
||||
|
||||
import org.apache.calcite.linq4j.Ord;
|
||||
import org.apache.calcite.plan.RelOptCluster;
|
||||
import org.apache.calcite.plan.RelOptCost;
|
||||
import org.apache.calcite.plan.RelOptPlanner;
|
||||
import org.apache.calcite.plan.RelTraitSet;
|
||||
import org.apache.calcite.rel.AbstractRelNode;
|
||||
import org.apache.calcite.rel.RelNode;
|
||||
import org.apache.calcite.rel.RelWriter;
|
||||
import org.apache.calcite.rel.metadata.RelMetadataQuery;
|
||||
import org.apache.calcite.rel.type.RelDataType;
|
||||
import org.apache.calcite.rex.RexCall;
|
||||
import org.apache.calcite.rex.RexLiteral;
|
||||
import org.apache.calcite.rex.RexNode;
|
||||
import org.apache.calcite.sql.SqlKind;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig;
|
||||
|
||||
/**
|
||||
* {@link StreamPhysicalRel} node for {@link ProcessTableFunction}.
|
||||
*
|
||||
* <p>A process table function (PTF) maps zero, one, or multiple tables to zero, one, or multiple
|
||||
* rows. PTFs enable implementing user-defined operators that can be as feature-rich as built-in
|
||||
* operations. PTFs have access to Flink's managed state, event-time and timer services, underlying
|
||||
* table changelogs, and can take multiple ordered and/or partitioned tables to produce a new table.
|
||||
*/
|
||||
public class StreamPhysicalProcessTableFunction extends AbstractRelNode
|
||||
implements StreamPhysicalRel {
|
||||
|
||||
private final FlinkLogicalTableFunctionScan scan;
|
||||
private final String uid;
|
||||
|
||||
private List<RelNode> inputs;
|
||||
|
||||
public StreamPhysicalProcessTableFunction(
|
||||
RelOptCluster cluster,
|
||||
RelTraitSet traitSet,
|
||||
List<RelNode> inputs,
|
||||
FlinkLogicalTableFunctionScan scan,
|
||||
RelDataType rowType) {
|
||||
super(cluster, traitSet);
|
||||
this.inputs = inputs;
|
||||
this.rowType = rowType;
|
||||
this.scan = scan;
|
||||
this.uid = deriveUniqueIdentifier(scan);
|
||||
}
|
||||
|
||||
public StreamPhysicalProcessTableFunction(
|
||||
RelOptCluster cluster,
|
||||
RelTraitSet traitSet,
|
||||
RelNode input,
|
||||
FlinkLogicalTableFunctionScan scan,
|
||||
RelDataType rowType) {
|
||||
this(cluster, traitSet, List.of(input), scan, rowType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean requireWatermark() {
|
||||
// Even if there is no time attribute in the inputs, PTFs can work with event-time by taking
|
||||
// the watermark value as timestamp.
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<RelNode> getInputs() {
|
||||
return inputs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void replaceInput(int ordinalInParent, RelNode p) {
|
||||
final List<RelNode> newInputs = new ArrayList<>(inputs);
|
||||
newInputs.set(ordinalInParent, p);
|
||||
inputs = List.copyOf(newInputs);
|
||||
recomputeDigest();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
|
||||
return new StreamPhysicalProcessTableFunction(
|
||||
getCluster(), traitSet, inputs, scan, getRowType());
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
|
||||
final double elementRate = 100.0d * getInputs().size();
|
||||
return planner.getCostFactory().makeCost(elementRate, elementRate, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecNode<?> translateToExecNode() {
|
||||
final List<ChangelogMode> inputChangelogModes =
|
||||
getInputs().stream()
|
||||
.map(StreamPhysicalRel.class::cast)
|
||||
.map(ChangelogPlanUtils::getChangelogMode)
|
||||
.map(JavaScalaConversionUtil::toJava)
|
||||
.map(optional -> optional.orElseThrow(IllegalStateException::new))
|
||||
.collect(Collectors.toList());
|
||||
return new StreamExecProcessTableFunction(
|
||||
unwrapTableConfig(this),
|
||||
getInputs().stream().map(i -> InputProperty.DEFAULT).collect(Collectors.toList()),
|
||||
FlinkTypeFactory.toLogicalRowType(rowType),
|
||||
getRelDetailedDescription(),
|
||||
uid,
|
||||
(RexCall) scan.getCall(),
|
||||
inputChangelogModes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RelWriter explainTerms(RelWriter pw) {
|
||||
super.explainTerms(pw);
|
||||
for (Ord<RelNode> ord : Ord.zip(inputs)) {
|
||||
pw.input("input#" + ord.i, ord.e);
|
||||
}
|
||||
return pw.item("invocation", scan.getCall())
|
||||
.item("uid", uid)
|
||||
.item("select", String.join(",", getRowType().getFieldNames()))
|
||||
.item("rowType", getRowType());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RelDataType deriveRowType() {
|
||||
return rowType;
|
||||
}
|
||||
|
||||
public List<StaticArgument> getProvidedInputArgs() {
|
||||
final RexCall call = (RexCall) scan.getCall();
|
||||
final List<RexNode> operands = call.getOperands();
|
||||
final BridgingSqlFunction.WithTableFunction function =
|
||||
(BridgingSqlFunction.WithTableFunction) call.getOperator();
|
||||
final List<StaticArgument> declaredArgs =
|
||||
function.getTypeInference()
|
||||
.getStaticArguments()
|
||||
.orElseThrow(IllegalStateException::new);
|
||||
// This logic filters out optional tables for which an input is missing. It returns tables
|
||||
// in the same order as provided inputs of this RelNode.
|
||||
return Ord.zip(declaredArgs).stream()
|
||||
.filter(arg -> arg.e.is(StaticArgumentTrait.TABLE))
|
||||
.filter(arg -> operands.get(arg.i) instanceof RexTableArgCall)
|
||||
.map(arg -> arg.e)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* An important part of {@link ProcessTableFunction} is the mandatory unique identifier. Even if
|
||||
* the PTF has no state entries, state or timers might be added later. So a PTF should serve as
|
||||
* an identifiable black box for the optimizer. UIDs ensure that.
|
||||
*
|
||||
* @see SystemTypeInference
|
||||
*/
|
||||
private static String deriveUniqueIdentifier(FlinkLogicalTableFunctionScan scan) {
|
||||
final RexCall rexCall = (RexCall) scan.getCall();
|
||||
final BridgingSqlFunction.WithTableFunction function =
|
||||
(BridgingSqlFunction.WithTableFunction) rexCall.getOperator();
|
||||
final ContextResolvedFunction resolvedFunction = function.getResolvedFunction();
|
||||
final List<RexNode> operands = rexCall.getOperands();
|
||||
// Type inference ensures that uid is always added at the end
|
||||
final RexNode uidRexNode = operands.get(operands.size() - 1);
|
||||
if (uidRexNode.getKind() == SqlKind.DEFAULT) {
|
||||
final String uid =
|
||||
resolvedFunction
|
||||
.getIdentifier()
|
||||
.map(FunctionIdentifier::getFunctionName)
|
||||
.orElse("");
|
||||
if (!SystemTypeInference.isValidUidForProcessTableFunction(uid)) {
|
||||
throw new ValidationException(
|
||||
String.format(
|
||||
"Could not derive a unique identifier for process table function '%s'. "
|
||||
+ "The function's name does not qualify for a UID. Please provide "
|
||||
+ "a custom identifier using the implicit `uid` argument. "
|
||||
+ "For example: myFunction(..., uid => 'my-id')",
|
||||
resolvedFunction.asSummaryString()));
|
||||
}
|
||||
return uid;
|
||||
}
|
||||
// Otherwise UID should be correct as it has been checked by SystemTypeInference.
|
||||
return RexLiteral.stringValue(uidRexNode);
|
||||
}
|
||||
}
|
@ -0,0 +1,135 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.flink.table.planner.plan.nodes.physical.stream;
|
||||
|
||||
import org.apache.flink.table.functions.FunctionDefinition;
|
||||
import org.apache.flink.table.functions.FunctionKind;
|
||||
import org.apache.flink.table.planner.calcite.RexTableArgCall;
|
||||
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
|
||||
import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
|
||||
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
|
||||
import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
|
||||
import org.apache.flink.table.planner.utils.ShortcutUtils;
|
||||
|
||||
import org.apache.calcite.linq4j.Ord;
|
||||
import org.apache.calcite.plan.RelOptRule;
|
||||
import org.apache.calcite.plan.RelOptRuleCall;
|
||||
import org.apache.calcite.plan.RelTraitSet;
|
||||
import org.apache.calcite.rel.RelNode;
|
||||
import org.apache.calcite.rel.convert.ConverterRule;
|
||||
import org.apache.calcite.rex.RexCall;
|
||||
import org.apache.calcite.rex.RexNode;
|
||||
import org.apache.calcite.sql.TableCharacteristic;
|
||||
import org.apache.calcite.sql.TableCharacteristic.Semantics;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Rule to convert a {@link FlinkLogicalTableFunctionScan} with table arguments into a {@link
|
||||
* StreamPhysicalProcessTableFunction}.
|
||||
*/
|
||||
public class StreamPhysicalProcessTableFunctionRule extends ConverterRule {
|
||||
|
||||
public static final StreamPhysicalProcessTableFunctionRule INSTANCE =
|
||||
new StreamPhysicalProcessTableFunctionRule(
|
||||
Config.INSTANCE.withConversion(
|
||||
FlinkLogicalTableFunctionScan.class,
|
||||
FlinkConventions.LOGICAL(),
|
||||
FlinkConventions.STREAM_PHYSICAL(),
|
||||
"StreamPhysicalProcessTableFunctionRule"));
|
||||
|
||||
private StreamPhysicalProcessTableFunctionRule(Config config) {
|
||||
super(config);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matches(RelOptRuleCall call) {
|
||||
final FlinkLogicalTableFunctionScan scan = call.rel(0);
|
||||
if (scan.getInputs().isEmpty()) {
|
||||
// Let StreamPhysicalConstantTableFunctionScanRule take over
|
||||
return false;
|
||||
}
|
||||
final RexCall rexCall = (RexCall) scan.getCall();
|
||||
final FunctionDefinition definition = ShortcutUtils.unwrapFunctionDefinition(rexCall);
|
||||
return definition != null && definition.getKind() == FunctionKind.PROCESS_TABLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable RelNode convert(RelNode rel) {
|
||||
final FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) rel;
|
||||
final RexCall rexCall = (RexCall) scan.getCall();
|
||||
final BridgingSqlFunction.WithTableFunction function =
|
||||
(BridgingSqlFunction.WithTableFunction) rexCall.getOperator();
|
||||
final List<RexNode> operands = rexCall.getOperands();
|
||||
final List<RelNode> newInputs =
|
||||
applyDistributionOnInputs(function, operands, rel.getInputs());
|
||||
final RelTraitSet providedTraitSet =
|
||||
rel.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL());
|
||||
return new StreamPhysicalProcessTableFunction(
|
||||
scan.getCluster(), providedTraitSet, newInputs, scan, scan.getRowType());
|
||||
}
|
||||
|
||||
private static List<RelNode> applyDistributionOnInputs(
|
||||
BridgingSqlFunction.WithTableFunction function,
|
||||
List<RexNode> operands,
|
||||
List<RelNode> inputs) {
|
||||
return Ord.zip(operands).stream()
|
||||
.filter(operand -> operand.e instanceof RexTableArgCall)
|
||||
.map(
|
||||
tableOperand -> {
|
||||
final int pos = tableOperand.i;
|
||||
final RexTableArgCall tableArgCall = (RexTableArgCall) tableOperand.e;
|
||||
final TableCharacteristic tableCharacteristic =
|
||||
function.tableCharacteristic(pos);
|
||||
assert tableCharacteristic != null;
|
||||
return applyDistributionOnInput(
|
||||
tableArgCall,
|
||||
tableCharacteristic,
|
||||
inputs.get(tableArgCall.getInputIndex()));
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private static RelNode applyDistributionOnInput(
|
||||
RexTableArgCall tableOperand, TableCharacteristic tableCharacteristic, RelNode input) {
|
||||
final FlinkRelDistribution requiredDistribution =
|
||||
deriveDistribution(tableOperand, tableCharacteristic);
|
||||
final RelTraitSet requiredTraitSet =
|
||||
input.getCluster()
|
||||
.getPlanner()
|
||||
.emptyTraitSet()
|
||||
.replace(requiredDistribution)
|
||||
.replace(FlinkConventions.STREAM_PHYSICAL());
|
||||
return RelOptRule.convert(input, requiredTraitSet);
|
||||
}
|
||||
|
||||
private static FlinkRelDistribution deriveDistribution(
|
||||
RexTableArgCall tableOperand, TableCharacteristic tableCharacteristic) {
|
||||
if (tableCharacteristic.semantics == Semantics.SET) {
|
||||
final int[] partitionKeys = tableOperand.getPartitionKeys();
|
||||
if (partitionKeys.length == 0) {
|
||||
return FlinkRelDistribution.SINGLETON();
|
||||
}
|
||||
return FlinkRelDistribution.hash(partitionKeys, true);
|
||||
}
|
||||
return FlinkRelDistribution.DEFAULT();
|
||||
}
|
||||
}
|
@ -1,135 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.flink.table.planner.plan.nodes.logical
|
||||
|
||||
import org.apache.flink.table.functions.TemporalTableFunction
|
||||
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
|
||||
import org.apache.flink.table.planner.functions.utils.TableSqlFunction
|
||||
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
|
||||
|
||||
import org.apache.calcite.plan._
|
||||
import org.apache.calcite.rel.`type`.RelDataType
|
||||
import org.apache.calcite.rel.RelNode
|
||||
import org.apache.calcite.rel.convert.ConverterRule
|
||||
import org.apache.calcite.rel.convert.ConverterRule.Config
|
||||
import org.apache.calcite.rel.core.TableFunctionScan
|
||||
import org.apache.calcite.rel.logical.LogicalTableFunctionScan
|
||||
import org.apache.calcite.rel.metadata.RelColumnMapping
|
||||
import org.apache.calcite.rex.{RexCall, RexNode}
|
||||
|
||||
import java.lang.reflect.Type
|
||||
import java.util
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
/**
|
||||
* Sub-class of [[TableFunctionScan]] that is a relational expression which calls a table-valued
|
||||
* function in Flink.
|
||||
*/
|
||||
class FlinkLogicalTableFunctionScan(
|
||||
cluster: RelOptCluster,
|
||||
traitSet: RelTraitSet,
|
||||
inputs: util.List[RelNode],
|
||||
rexCall: RexNode,
|
||||
elementType: Type,
|
||||
rowType: RelDataType,
|
||||
columnMappings: util.Set[RelColumnMapping])
|
||||
extends TableFunctionScan(
|
||||
cluster,
|
||||
traitSet,
|
||||
inputs,
|
||||
rexCall,
|
||||
elementType,
|
||||
rowType,
|
||||
columnMappings)
|
||||
with FlinkLogicalRel {
|
||||
|
||||
override def copy(
|
||||
traitSet: RelTraitSet,
|
||||
inputs: util.List[RelNode],
|
||||
rexCall: RexNode,
|
||||
elementType: Type,
|
||||
rowType: RelDataType,
|
||||
columnMappings: util.Set[RelColumnMapping]): TableFunctionScan = {
|
||||
|
||||
new FlinkLogicalTableFunctionScan(
|
||||
cluster,
|
||||
traitSet,
|
||||
inputs,
|
||||
rexCall,
|
||||
elementType,
|
||||
rowType,
|
||||
columnMappings)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class FlinkLogicalTableFunctionScanConverter(config: Config) extends ConverterRule(config) {
|
||||
|
||||
override def matches(call: RelOptRuleCall): Boolean = {
|
||||
val logicalTableFunction: LogicalTableFunctionScan = call.rel(0)
|
||||
|
||||
!isTemporalTableFunctionCall(logicalTableFunction)
|
||||
}
|
||||
|
||||
private def isTemporalTableFunctionCall(
|
||||
logicalTableFunction: LogicalTableFunctionScan): Boolean = {
|
||||
|
||||
if (!logicalTableFunction.getCall.isInstanceOf[RexCall]) {
|
||||
return false
|
||||
}
|
||||
val rexCall = logicalTableFunction.getCall.asInstanceOf[RexCall]
|
||||
val functionDefinition = rexCall.getOperator match {
|
||||
case tsf: TableSqlFunction => tsf.udtf
|
||||
case bsf: BridgingSqlFunction => bsf.getDefinition
|
||||
case _ => return false
|
||||
}
|
||||
functionDefinition.isInstanceOf[TemporalTableFunction]
|
||||
}
|
||||
|
||||
def convert(rel: RelNode): RelNode = {
|
||||
val scan = rel.asInstanceOf[LogicalTableFunctionScan]
|
||||
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
|
||||
val newInputs = scan.getInputs.map(input => RelOptRule.convert(input, FlinkConventions.LOGICAL))
|
||||
val rexCall = scan.getCall.asInstanceOf[RexCall];
|
||||
val builder = rel.getCluster.getRexBuilder
|
||||
// When rexCall uses NamedArguments, RexCall is not inferred with the correct type.
|
||||
// We just use the type of scan as the type of RexCall.
|
||||
val newCall = rexCall.clone(rel.getRowType, rexCall.getOperands)
|
||||
|
||||
new FlinkLogicalTableFunctionScan(
|
||||
scan.getCluster,
|
||||
traitSet,
|
||||
newInputs,
|
||||
newCall,
|
||||
scan.getElementType,
|
||||
scan.getRowType,
|
||||
scan.getColumnMappings
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object FlinkLogicalTableFunctionScan {
|
||||
val CONVERTER = new FlinkLogicalTableFunctionScanConverter(
|
||||
Config.INSTANCE.withConversion(
|
||||
classOf[LogicalTableFunctionScan],
|
||||
Convention.NONE,
|
||||
FlinkConventions.LOGICAL,
|
||||
"FlinkLogicalTableFunctionScanConverter"))
|
||||
}
|
Loading…
Reference in New Issue