From 5749a07b072803a6f106e287f40db069abc3feaf Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Tue, 21 Jan 2025 15:36:31 +0100 Subject: [PATCH] [FLINK-37076][table-planner] Support PTFs until ExecNode level This closes #26019. --- .../flink/table/annotation/ArgumentTrait.java | 31 +- .../table/types/inference/StaticArgument.java | 46 ++- .../types/inference/StaticArgumentTrait.java | 5 +- .../types/inference/SystemTypeInference.java | 12 +- .../planner/calcite/RexTableArgCall.java | 26 ++ .../nodes/exec/ExecNodeGraphGenerator.java | 25 +- .../exec/serde/RexNodeJsonDeserializer.java | 28 ++ .../exec/serde/RexNodeJsonSerializer.java | 27 +- .../StreamExecProcessTableFunction.java | 125 +++++++ .../FlinkLogicalTableFunctionScan.java | 128 +++++++ .../StreamPhysicalProcessTableFunction.java | 219 +++++++++++ ...treamPhysicalProcessTableFunctionRule.java | 135 +++++++ ...PhysicalConstantTableFunctionScanRule.java | 91 ++--- ...PhysicalConstantTableFunctionScanRule.java | 77 ++-- .../plan/utils/ExecNodeMetadataUtil.java | 4 +- .../table/planner/utils/ShortcutUtils.java | 5 + .../FlinkLogicalTableFunctionScan.scala | 135 ------- .../FlinkChangelogModeInferenceProgram.scala | 48 +++ .../plan/rules/FlinkStreamRuleSets.scala | 3 + .../exec/serde/RexNodeJsonSerdeTest.java | 13 +- .../stream/sql/ProcessTableFunctionTest.java | 149 ++++++-- .../stream/sql/ProcessTableFunctionTest.xml | 340 +++++++++++++++++- 22 files changed, 1398 insertions(+), 274 deletions(-) create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunctionRule.java delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java index 8704a3c440b..4e63b261e7d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/annotation/ArgumentTrait.java @@ -21,6 +21,7 @@ package org.apache.flink.table.annotation; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.functions.ProcessTableFunction; import org.apache.flink.table.types.inference.StaticArgumentTrait; +import org.apache.flink.types.RowKind; /** * Declares traits for {@link ArgumentHint}. They enable basic validation by the framework. @@ -78,6 +79,8 @@ public enum ArgumentTrait { /** * Defines that a PARTITION BY clause is optional for {@link #TABLE_AS_SET}. By default, it is * mandatory for improving the parallel execution by distributing the table by key. + * + *

Note: This trait is only valid for {@link #TABLE_AS_SET} arguments. */ OPTIONAL_PARTITION_BY(false, StaticArgumentTrait.OPTIONAL_PARTITION_BY), @@ -97,8 +100,34 @@ public enum ArgumentTrait { * *

In case of multiple table arguments, pass-through columns are added according to the * declaration order in the PTF signature. + * + *

Note: This trait is valid for {@link #TABLE_AS_ROW} and {@link #TABLE_AS_SET} arguments. + */ + PASS_COLUMNS_THROUGH(false, StaticArgumentTrait.PASS_COLUMNS_THROUGH), + + /** + * Defines that updates are allowed as input to the given table argument. By default, a table + * argument is insert-only and updates will be rejected. + * + *

Input tables become updating when sub queries such as aggregations or outer joins force an + * incremental computation. For example, the following query only works if the function is able + * to digest retraction messages: + * + *

+     *     // Changes +[1] followed by -U[1], +U[2], -U[2], +U[3] will enter the function
+     *     WITH UpdatingTable AS (
+     *       SELECT COUNT(*) FROM (VALUES 1, 2, 3)
+     *     )
+     *     SELECT * FROM f(tableArg => TABLE UpdatingTable)
+     * 
+ * + *

If updates should be supported, ensure that the data type of the table argument is chosen + * in a way that it can encode changes. In other words: choose a row type that exposes the + * {@link RowKind} change flag. + * + *

Note: This trait is valid for {@link #TABLE_AS_ROW} and {@link #TABLE_AS_SET} arguments. */ - PASS_COLUMNS_THROUGH(false, StaticArgumentTrait.PASS_COLUMNS_THROUGH); + SUPPORT_UPDATES(false, StaticArgumentTrait.SUPPORT_UPDATES); private final boolean isRoot; private final StaticArgumentTrait staticTrait; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java index fc7fd86abb2..73f8afbf60f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgument.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.NullType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.StructuredType; @@ -273,29 +274,46 @@ public class StaticArgument { } } - void checkTableType() { + private void checkTableType() { if (!traits.contains(StaticArgumentTrait.TABLE)) { return; } - if (dataType == null - && conversionClass != null - && !DUMMY_ROW_TYPE.supportsInputConversion(conversionClass)) { + checkPolymorphicTableType(); + checkTypedTableType(); + } + + private void checkPolymorphicTableType() { + if (dataType != null || conversionClass == null) { + return; + } + if (!DUMMY_ROW_TYPE.supportsInputConversion(conversionClass)) { throw new ValidationException( String.format( "Invalid conversion class '%s' for argument '%s'. " + "Polymorphic, untyped table arguments must use a row class.", conversionClass.getName(), name)); } - if (dataType != null) { - final LogicalType type = dataType.getLogicalType(); - if (traits.contains(StaticArgumentTrait.TABLE) - && !LogicalTypeChecks.isCompositeType(type)) { - throw new ValidationException( - String.format( - "Invalid data type '%s' for table argument '%s'. " - + "Typed table arguments must use a composite type (i.e. row or structured type).", - type, name)); - } + } + + private void checkTypedTableType() { + if (dataType == null) { + return; + } + final LogicalType type = dataType.getLogicalType(); + if (traits.contains(StaticArgumentTrait.TABLE) + && !LogicalTypeChecks.isCompositeType(type)) { + throw new ValidationException( + String.format( + "Invalid data type '%s' for table argument '%s'. " + + "Typed table arguments must use a composite type (i.e. row or structured type).", + type, name)); + } + if (is(StaticArgumentTrait.SUPPORT_UPDATES) && !type.is(LogicalTypeRoot.ROW)) { + throw new ValidationException( + String.format( + "Invalid data type '%s' for table argument '%s'. " + + "Table arguments that support updates must use a row type.", + type, name)); } } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java index b9f4d4c71fb..fbcc7be7879 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/StaticArgumentTrait.java @@ -37,8 +37,9 @@ public enum StaticArgumentTrait { MODEL(), TABLE_AS_ROW(TABLE), TABLE_AS_SET(TABLE), - OPTIONAL_PARTITION_BY(TABLE_AS_SET), - PASS_COLUMNS_THROUGH(TABLE); + PASS_COLUMNS_THROUGH(TABLE), + SUPPORT_UPDATES(TABLE), + OPTIONAL_PARTITION_BY(TABLE_AS_SET); private final Set requirements; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java index 6c64b32ec25..d3b18b3ab12 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java @@ -81,6 +81,10 @@ public class SystemTypeInference { return builder.build(); } + public static boolean isValidUidForProcessTableFunction(String uid) { + return UID_FORMAT.test(uid); + } + // -------------------------------------------------------------------------------------------- private static void checkScalarArgsOnly(List defaultArgs) { @@ -283,7 +287,7 @@ public class SystemTypeInference { + "that is not overloaded and doesn't contain varargs."); } - checkUidColumn(callContext); + checkUidArg(callContext); checkMultipleTableArgs(callContext); checkTableArgTraits(staticArgs, callContext); @@ -297,16 +301,16 @@ public class SystemTypeInference { return origin.getExpectedSignatures(definition); } - private static void checkUidColumn(CallContext callContext) { + private static void checkUidArg(CallContext callContext) { final List args = callContext.getArgumentDataTypes(); // Verify the uid format if provided int uidPos = args.size() - 1; if (!callContext.isArgumentNull(uidPos)) { final String uid = callContext.getArgumentValue(uidPos, String.class).orElse(""); - if (!UID_FORMAT.test(uid)) { + if (!isValidUidForProcessTableFunction(uid)) { throw new ValidationException( - "Invalid unique identifier for process table function. The 'uid' argument " + "Invalid unique identifier for process table function. The `uid` argument " + "must be a string literal that follows the pattern [a-zA-Z_][a-zA-Z-_0-9]*. " + "But found: " + uid); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexTableArgCall.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexTableArgCall.java index 369f392042b..adbcd0633a3 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexTableArgCall.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexTableArgCall.java @@ -28,6 +28,7 @@ import org.apache.calcite.rex.RexNode; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; /** @@ -96,4 +97,29 @@ public class RexTableArgCall extends RexCall { public RexTableArgCall copy(RelDataType type, int[] partitionKeys, int[] orderKeys) { return new RexTableArgCall(type, inputIndex, partitionKeys, orderKeys); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + final RexTableArgCall that = (RexTableArgCall) o; + return inputIndex == that.inputIndex + && Arrays.equals(partitionKeys, that.partitionKeys) + && Arrays.equals(orderKeys, that.orderKeys); + } + + @Override + public int hashCode() { + int result = Objects.hash(super.hashCode(), inputIndex); + result = 31 * result + Arrays.hashCode(partitionKeys); + result = 31 * result + Arrays.hashCode(orderKeys); + return result; + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java index beadfc4f125..3d9e9dbc26c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java @@ -19,15 +19,19 @@ package org.apache.flink.table.planner.plan.nodes.exec; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.planner.plan.nodes.common.CommonIntermediateTableScan; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction; import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel; import org.apache.calcite.rel.RelNode; import java.util.ArrayList; +import java.util.HashSet; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; +import java.util.Set; /** * A generator that generates a {@link ExecNode} graph from a graph of {@link FlinkPhysicalRel}s. @@ -43,9 +47,11 @@ import java.util.Map; public class ExecNodeGraphGenerator { private final Map> visitedRels; + private final Set visitedProcessTableFunctionUids; public ExecNodeGraphGenerator() { this.visitedRels = new IdentityHashMap<>(); + this.visitedProcessTableFunctionUids = new HashSet<>(); } public ExecNodeGraph generate(List relNodes, boolean isCompiled) { @@ -78,8 +84,25 @@ public class ExecNodeGraphGenerator { inputEdges.add(ExecEdge.builder().source(inputNode).target(execNode).build()); } execNode.setInputEdges(inputEdges); - + checkUidForProcessTableFunction(execNode); visitedRels.put(rel, execNode); return execNode; } + + private void checkUidForProcessTableFunction(ExecNode execNode) { + if (!(execNode instanceof StreamExecProcessTableFunction)) { + return; + } + final String uid = ((StreamExecProcessTableFunction) execNode).getUid(); + if (visitedProcessTableFunctionUids.contains(uid)) { + throw new ValidationException( + String.format( + "Duplicate unique identifier '%s' detected among process table functions. " + + "Make sure that all PTF calls have an identifier defined that is globally unique. " + + "Please provide a custom identifier using the implicit `uid` argument. " + + "For example: myFunction(..., uid => 'my-id')", + uid)); + } + visitedProcessTableFunctionUids.add(uid); + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java index c5111219283..89a30ff9a40 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java @@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.functions.FunctionIdentifier; import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.table.functions.UserDefinedFunctionHelper; +import org.apache.flink.table.planner.calcite.RexTableArgCall; import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction; import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; import org.apache.flink.table.planner.functions.sql.BuiltInSqlOperator; @@ -93,6 +94,8 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSe import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_NAME; import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_NULL_AS; import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_OPERANDS; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_ORDER_KEYS; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_PARTITION_KEYS; import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_RANGES; import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SARG; import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SQL_KIND; @@ -107,6 +110,7 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSe import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_INPUT_REF; import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_LITERAL; import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_PATTERN_INPUT_REF; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_TABLE_ARG_CALL; import static org.apache.flink.table.planner.typeutils.SymbolUtil.serializableToCalcite; /** @@ -144,6 +148,8 @@ final class RexNodeJsonDeserializer extends StdDeserializer { return deserializeCorrelVariable(jsonNode, serdeContext); case KIND_PATTERN_INPUT_REF: return deserializePatternFieldRef(jsonNode, serdeContext); + case KIND_TABLE_ARG_CALL: + return deserializeTableArgCall(jsonNode, serdeContext); case KIND_CALL: return deserializeCall(jsonNode, serdeContext); default: @@ -313,6 +319,28 @@ final class RexNodeJsonDeserializer extends StdDeserializer { return serdeContext.getRexBuilder().makePatternFieldRef(alpha, fieldType, inputIndex); } + private static RexNode deserializeTableArgCall(JsonNode jsonNode, SerdeContext serdeContext) { + final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE); + final RelDataType callType = + RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, serdeContext); + + final int inputIndex = jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue(); + + final JsonNode partitionKeysNode = jsonNode.required(FIELD_NAME_PARTITION_KEYS); + final int[] partitionKeys = new int[partitionKeysNode.size()]; + for (int i = 0; i < partitionKeysNode.size(); ++i) { + partitionKeys[i] = partitionKeysNode.get(i).asInt(); + } + + final JsonNode orderKeysNode = jsonNode.required(FIELD_NAME_ORDER_KEYS); + final int[] orderKeys = new int[orderKeysNode.size()]; + for (int i = 0; i < orderKeysNode.size(); ++i) { + orderKeys[i] = orderKeysNode.get(i).asInt(); + } + + return new RexTableArgCall(callType, inputIndex, partitionKeys, orderKeys); + } + private static RexNode deserializeCall(JsonNode jsonNode, SerdeContext serdeContext) throws IOException { final SqlOperator operator = deserializeSqlOperator(jsonNode, serdeContext); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java index f559f6445e5..facb4a11158 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java @@ -33,6 +33,7 @@ import org.apache.flink.table.functions.ScalarFunctionDefinition; import org.apache.flink.table.functions.TableAggregateFunctionDefinition; import org.apache.flink.table.functions.TableFunctionDefinition; import org.apache.flink.table.functions.UserDefinedFunction; +import org.apache.flink.table.planner.calcite.RexTableArgCall; import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction; import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; import org.apache.flink.table.planner.functions.sql.BuiltInSqlOperator; @@ -81,10 +82,10 @@ final class RexNodeJsonSerializer extends StdSerializer { static final String FIELD_NAME_VALUE = "value"; static final String FIELD_NAME_TYPE = "type"; static final String FIELD_NAME_NAME = "name"; + static final String FIELD_NAME_INPUT_INDEX = "inputIndex"; // INPUT_REF static final String KIND_INPUT_REF = "INPUT_REF"; - static final String FIELD_NAME_INPUT_INDEX = "inputIndex"; // LITERAL static final String KIND_LITERAL = "LITERAL"; @@ -122,6 +123,11 @@ final class RexNodeJsonSerializer extends StdSerializer { static final String FIELD_NAME_SQL_KIND = "sqlKind"; static final String FIELD_NAME_CLASS = "class"; + // TABLE_ARG_CALL + static final String KIND_TABLE_ARG_CALL = "TABLE_ARG_CALL"; + static final String FIELD_NAME_PARTITION_KEYS = "partitionKeys"; + static final String FIELD_NAME_ORDER_KEYS = "orderKeys"; + RexNodeJsonSerializer() { super(RexNode.class); } @@ -154,7 +160,10 @@ final class RexNodeJsonSerializer extends StdSerializer { (RexPatternFieldRef) rexNode, jsonGenerator, serializerProvider); break; default: - if (rexNode instanceof RexCall) { + if (rexNode instanceof RexTableArgCall) { + serializeTableArgCall( + (RexTableArgCall) rexNode, jsonGenerator, serializerProvider); + } else if (rexNode instanceof RexCall) { serializeCall( (RexCall) rexNode, jsonGenerator, @@ -323,6 +332,20 @@ final class RexNodeJsonSerializer extends StdSerializer { gen.writeEndObject(); } + private static void serializeTableArgCall( + RexTableArgCall tableArgCall, JsonGenerator gen, SerializerProvider serializerProvider) + throws IOException { + gen.writeStartObject(); + gen.writeStringField(FIELD_NAME_KIND, KIND_TABLE_ARG_CALL); + gen.writeNumberField(FIELD_NAME_INPUT_INDEX, tableArgCall.getInputIndex()); + gen.writeFieldName(FIELD_NAME_PARTITION_KEYS); + gen.writeArray(tableArgCall.getPartitionKeys(), 0, tableArgCall.getPartitionKeys().length); + gen.writeFieldName(FIELD_NAME_ORDER_KEYS); + gen.writeArray(tableArgCall.getOrderKeys(), 0, tableArgCall.getOrderKeys().length); + serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, tableArgCall.getType(), gen); + gen.writeEndObject(); + } + private static void serializeCall( RexCall call, JsonGenerator gen, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java new file mode 100644 index 00000000000..50a8a0f6108 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.FlinkVersion; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.ProcessTableFunction; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; + +import java.util.List; + +/** + * {@link StreamExecNode} for {@link ProcessTableFunction}. + * + *

A process table function (PTF) maps zero, one, or multiple tables to zero, one, or multiple + * rows. PTFs enable implementing user-defined operators that can be as feature-rich as built-in + * operations. PTFs have access to Flink's managed state, event-time and timer services, underlying + * table changelogs, and can take multiple ordered and/or partitioned tables to produce a new table. + */ +@ExecNodeMetadata( + name = "stream-exec-process-table-function", + version = 1, + producedTransformations = StreamExecProcessTableFunction.PROCESS_TRANSFORMATION, + minPlanVersion = FlinkVersion.v2_0, + minStateVersion = FlinkVersion.v2_0) +public class StreamExecProcessTableFunction extends ExecNodeBase + implements StreamExecNode, SingleTransformationTranslator { + + public static final String PROCESS_TRANSFORMATION = "process"; + + public static final String FIELD_NAME_UID = "uid"; + public static final String FIELD_NAME_FUNCTION_CALL = "functionCall"; + public static final String FIELD_NAME_INPUT_CHANGELOG_MODES = "inputChangelogModes"; + + @JsonProperty(FIELD_NAME_UID) + private final String uid; + + @JsonProperty(FIELD_NAME_FUNCTION_CALL) + private final RexCall invocation; + + @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODES) + private final List inputChangelogModes; + + public StreamExecProcessTableFunction( + ReadableConfig tableConfig, + List inputProperties, + RowType outputType, + String description, + String uid, + RexCall invocation, + List inputChangelogModes) { + this( + ExecNodeContext.newNodeId(), + ExecNodeContext.newContext(StreamExecProcessTableFunction.class), + ExecNodeContext.newPersistedConfig( + StreamExecProcessTableFunction.class, tableConfig), + inputProperties, + outputType, + description, + uid, + invocation, + inputChangelogModes); + } + + @JsonCreator + public StreamExecProcessTableFunction( + @JsonProperty(FIELD_NAME_ID) int id, + @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context, + @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig, + @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties, + @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, + @JsonProperty(FIELD_NAME_DESCRIPTION) String description, + @JsonProperty(FIELD_NAME_UID) String uid, + @JsonProperty(FIELD_NAME_FUNCTION_CALL) RexNode invocation, + @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODES) + List inputChangelogModes) { + super(id, context, persistedConfig, inputProperties, outputType, description); + this.uid = uid; + this.invocation = (RexCall) invocation; + this.inputChangelogModes = inputChangelogModes; + } + + public String getUid() { + return uid; + } + + @Override + protected Transformation translateToPlanInternal( + PlannerBase planner, ExecNodeConfig config) { + throw new TableException("Process table function is not fully supported yet."); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java new file mode 100644 index 00000000000..19287e08287 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.logical; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.FunctionKind; +import org.apache.flink.table.functions.TemporalTableFunction; +import org.apache.flink.table.planner.plan.nodes.FlinkConventions; +import org.apache.flink.table.planner.utils.ShortcutUtils; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.TableFunctionScan; +import org.apache.calcite.rel.logical.LogicalTableFunctionScan; +import org.apache.calcite.rel.metadata.RelColumnMapping; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.lang.reflect.Type; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Subclass of {@link TableFunctionScan} that is a relational expression which calls a {@link + * FunctionKind#TABLE} or {@link FunctionKind#PROCESS_TABLE} in Flink. + */ +@Internal +public class FlinkLogicalTableFunctionScan extends TableFunctionScan implements FlinkLogicalRel { + + public static final Converter CONVERTER = + new Converter( + ConverterRule.Config.INSTANCE.withConversion( + LogicalTableFunctionScan.class, + Convention.NONE, + FlinkConventions.LOGICAL(), + "FlinkLogicalTableFunctionScanConverter")); + + public FlinkLogicalTableFunctionScan( + RelOptCluster cluster, + RelTraitSet traitSet, + List inputs, + RexNode rexCall, + @Nullable Type elementType, + RelDataType rowType, + @Nullable Set columnMappings) { + super(cluster, traitSet, inputs, rexCall, elementType, rowType, columnMappings); + } + + @Override + public TableFunctionScan copy( + RelTraitSet traitSet, + List inputs, + RexNode rexCall, + @Nullable Type elementType, + RelDataType rowType, + @Nullable Set columnMappings) { + return new FlinkLogicalTableFunctionScan( + getCluster(), traitSet, inputs, rexCall, elementType, rowType, columnMappings); + } + + @Internal + public static class Converter extends ConverterRule { + + protected Converter(Config config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final LogicalTableFunctionScan functionScan = call.rel(0); + final FunctionDefinition functionDefinition = + ShortcutUtils.unwrapFunctionDefinition(functionScan.getCall()); + if (functionDefinition == null) { + // For Calcite stack functions + return true; + } + final boolean isTableFunction = + functionDefinition.getKind() == FunctionKind.TABLE + || functionDefinition.getKind() == FunctionKind.PROCESS_TABLE; + return isTableFunction && !(functionDefinition instanceof TemporalTableFunction); + } + + @Override + public @Nullable RelNode convert(RelNode rel) { + final LogicalTableFunctionScan functionScan = (LogicalTableFunctionScan) rel; + final RelTraitSet traitSet = + rel.getTraitSet().replace(FlinkConventions.LOGICAL()).simplify(); + final List newInputs = + functionScan.getInputs().stream() + .map(input -> RelOptRule.convert(input, FlinkConventions.LOGICAL())) + .collect(Collectors.toList()); + final RexCall rexCall = (RexCall) functionScan.getCall(); + return new FlinkLogicalTableFunctionScan( + functionScan.getCluster(), + traitSet, + newInputs, + rexCall, + functionScan.getElementType(), + functionScan.getRowType(), + functionScan.getColumnMappings()); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java new file mode 100644 index 00000000000..49932263d3c --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.physical.stream; + +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ContextResolvedFunction; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.table.functions.ProcessTableFunction; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.calcite.RexTableArgCall; +import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan; +import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.types.inference.StaticArgument; +import org.apache.flink.table.types.inference.StaticArgumentTrait; +import org.apache.flink.table.types.inference.SystemTypeInference; + +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.AbstractRelNode; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig; + +/** + * {@link StreamPhysicalRel} node for {@link ProcessTableFunction}. + * + *

A process table function (PTF) maps zero, one, or multiple tables to zero, one, or multiple + * rows. PTFs enable implementing user-defined operators that can be as feature-rich as built-in + * operations. PTFs have access to Flink's managed state, event-time and timer services, underlying + * table changelogs, and can take multiple ordered and/or partitioned tables to produce a new table. + */ +public class StreamPhysicalProcessTableFunction extends AbstractRelNode + implements StreamPhysicalRel { + + private final FlinkLogicalTableFunctionScan scan; + private final String uid; + + private List inputs; + + public StreamPhysicalProcessTableFunction( + RelOptCluster cluster, + RelTraitSet traitSet, + List inputs, + FlinkLogicalTableFunctionScan scan, + RelDataType rowType) { + super(cluster, traitSet); + this.inputs = inputs; + this.rowType = rowType; + this.scan = scan; + this.uid = deriveUniqueIdentifier(scan); + } + + public StreamPhysicalProcessTableFunction( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + FlinkLogicalTableFunctionScan scan, + RelDataType rowType) { + this(cluster, traitSet, List.of(input), scan, rowType); + } + + @Override + public boolean requireWatermark() { + // Even if there is no time attribute in the inputs, PTFs can work with event-time by taking + // the watermark value as timestamp. + return true; + } + + @Override + public List getInputs() { + return inputs; + } + + @Override + public void replaceInput(int ordinalInParent, RelNode p) { + final List newInputs = new ArrayList<>(inputs); + newInputs.set(ordinalInParent, p); + inputs = List.copyOf(newInputs); + recomputeDigest(); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + return new StreamPhysicalProcessTableFunction( + getCluster(), traitSet, inputs, scan, getRowType()); + } + + @Override + public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + final double elementRate = 100.0d * getInputs().size(); + return planner.getCostFactory().makeCost(elementRate, elementRate, 0); + } + + @Override + public ExecNode translateToExecNode() { + final List inputChangelogModes = + getInputs().stream() + .map(StreamPhysicalRel.class::cast) + .map(ChangelogPlanUtils::getChangelogMode) + .map(JavaScalaConversionUtil::toJava) + .map(optional -> optional.orElseThrow(IllegalStateException::new)) + .collect(Collectors.toList()); + return new StreamExecProcessTableFunction( + unwrapTableConfig(this), + getInputs().stream().map(i -> InputProperty.DEFAULT).collect(Collectors.toList()), + FlinkTypeFactory.toLogicalRowType(rowType), + getRelDetailedDescription(), + uid, + (RexCall) scan.getCall(), + inputChangelogModes); + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + super.explainTerms(pw); + for (Ord ord : Ord.zip(inputs)) { + pw.input("input#" + ord.i, ord.e); + } + return pw.item("invocation", scan.getCall()) + .item("uid", uid) + .item("select", String.join(",", getRowType().getFieldNames())) + .item("rowType", getRowType()); + } + + @Override + protected RelDataType deriveRowType() { + return rowType; + } + + public List getProvidedInputArgs() { + final RexCall call = (RexCall) scan.getCall(); + final List operands = call.getOperands(); + final BridgingSqlFunction.WithTableFunction function = + (BridgingSqlFunction.WithTableFunction) call.getOperator(); + final List declaredArgs = + function.getTypeInference() + .getStaticArguments() + .orElseThrow(IllegalStateException::new); + // This logic filters out optional tables for which an input is missing. It returns tables + // in the same order as provided inputs of this RelNode. + return Ord.zip(declaredArgs).stream() + .filter(arg -> arg.e.is(StaticArgumentTrait.TABLE)) + .filter(arg -> operands.get(arg.i) instanceof RexTableArgCall) + .map(arg -> arg.e) + .collect(Collectors.toList()); + } + + /** + * An important part of {@link ProcessTableFunction} is the mandatory unique identifier. Even if + * the PTF has no state entries, state or timers might be added later. So a PTF should serve as + * an identifiable black box for the optimizer. UIDs ensure that. + * + * @see SystemTypeInference + */ + private static String deriveUniqueIdentifier(FlinkLogicalTableFunctionScan scan) { + final RexCall rexCall = (RexCall) scan.getCall(); + final BridgingSqlFunction.WithTableFunction function = + (BridgingSqlFunction.WithTableFunction) rexCall.getOperator(); + final ContextResolvedFunction resolvedFunction = function.getResolvedFunction(); + final List operands = rexCall.getOperands(); + // Type inference ensures that uid is always added at the end + final RexNode uidRexNode = operands.get(operands.size() - 1); + if (uidRexNode.getKind() == SqlKind.DEFAULT) { + final String uid = + resolvedFunction + .getIdentifier() + .map(FunctionIdentifier::getFunctionName) + .orElse(""); + if (!SystemTypeInference.isValidUidForProcessTableFunction(uid)) { + throw new ValidationException( + String.format( + "Could not derive a unique identifier for process table function '%s'. " + + "The function's name does not qualify for a UID. Please provide " + + "a custom identifier using the implicit `uid` argument. " + + "For example: myFunction(..., uid => 'my-id')", + resolvedFunction.asSummaryString())); + } + return uid; + } + // Otherwise UID should be correct as it has been checked by SystemTypeInference. + return RexLiteral.stringValue(uidRexNode); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunctionRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunctionRule.java new file mode 100644 index 00000000000..fd33940ee9a --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunctionRule.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.physical.stream; + +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.FunctionKind; +import org.apache.flink.table.planner.calcite.RexTableArgCall; +import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; +import org.apache.flink.table.planner.plan.nodes.FlinkConventions; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan; +import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution; +import org.apache.flink.table.planner.utils.ShortcutUtils; + +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.TableCharacteristic; +import org.apache.calcite.sql.TableCharacteristic.Semantics; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Rule to convert a {@link FlinkLogicalTableFunctionScan} with table arguments into a {@link + * StreamPhysicalProcessTableFunction}. + */ +public class StreamPhysicalProcessTableFunctionRule extends ConverterRule { + + public static final StreamPhysicalProcessTableFunctionRule INSTANCE = + new StreamPhysicalProcessTableFunctionRule( + Config.INSTANCE.withConversion( + FlinkLogicalTableFunctionScan.class, + FlinkConventions.LOGICAL(), + FlinkConventions.STREAM_PHYSICAL(), + "StreamPhysicalProcessTableFunctionRule")); + + private StreamPhysicalProcessTableFunctionRule(Config config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final FlinkLogicalTableFunctionScan scan = call.rel(0); + if (scan.getInputs().isEmpty()) { + // Let StreamPhysicalConstantTableFunctionScanRule take over + return false; + } + final RexCall rexCall = (RexCall) scan.getCall(); + final FunctionDefinition definition = ShortcutUtils.unwrapFunctionDefinition(rexCall); + return definition != null && definition.getKind() == FunctionKind.PROCESS_TABLE; + } + + @Override + public @Nullable RelNode convert(RelNode rel) { + final FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) rel; + final RexCall rexCall = (RexCall) scan.getCall(); + final BridgingSqlFunction.WithTableFunction function = + (BridgingSqlFunction.WithTableFunction) rexCall.getOperator(); + final List operands = rexCall.getOperands(); + final List newInputs = + applyDistributionOnInputs(function, operands, rel.getInputs()); + final RelTraitSet providedTraitSet = + rel.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL()); + return new StreamPhysicalProcessTableFunction( + scan.getCluster(), providedTraitSet, newInputs, scan, scan.getRowType()); + } + + private static List applyDistributionOnInputs( + BridgingSqlFunction.WithTableFunction function, + List operands, + List inputs) { + return Ord.zip(operands).stream() + .filter(operand -> operand.e instanceof RexTableArgCall) + .map( + tableOperand -> { + final int pos = tableOperand.i; + final RexTableArgCall tableArgCall = (RexTableArgCall) tableOperand.e; + final TableCharacteristic tableCharacteristic = + function.tableCharacteristic(pos); + assert tableCharacteristic != null; + return applyDistributionOnInput( + tableArgCall, + tableCharacteristic, + inputs.get(tableArgCall.getInputIndex())); + }) + .collect(Collectors.toList()); + } + + private static RelNode applyDistributionOnInput( + RexTableArgCall tableOperand, TableCharacteristic tableCharacteristic, RelNode input) { + final FlinkRelDistribution requiredDistribution = + deriveDistribution(tableOperand, tableCharacteristic); + final RelTraitSet requiredTraitSet = + input.getCluster() + .getPlanner() + .emptyTraitSet() + .replace(requiredDistribution) + .replace(FlinkConventions.STREAM_PHYSICAL()); + return RelOptRule.convert(input, requiredTraitSet); + } + + private static FlinkRelDistribution deriveDistribution( + RexTableArgCall tableOperand, TableCharacteristic tableCharacteristic) { + if (tableCharacteristic.semantics == Semantics.SET) { + final int[] partitionKeys = tableOperand.getPartitionKeys(); + if (partitionKeys.length == 0) { + return FlinkRelDistribution.SINGLETON(); + } + return FlinkRelDistribution.hash(partitionKeys, true); + } + return FlinkRelDistribution.DEFAULT(); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.java index d6113743f3e..c88d132b4c3 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.java @@ -18,16 +18,21 @@ package org.apache.flink.table.planner.plan.rules.physical.batch; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.FunctionKind; import org.apache.flink.table.planner.plan.nodes.FlinkConventions; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan; import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCorrelate; import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalValues; +import org.apache.flink.table.planner.utils.ShortcutUtils; import com.google.common.collect.ImmutableList; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelRule; import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rex.RexUtil; import org.immutables.value.Value; @@ -35,20 +40,21 @@ import org.immutables.value.Value; import scala.Option; /** - * Converts {@link FlinkLogicalTableFunctionScan} with constant RexCall to + * Converts {@link FlinkLogicalTableFunctionScan} with constant parameters. Add the rule to support + * selecting from a UDF directly, e.g. {@code SELECT * FROM func() as T(c)}. + * + *

For {@link FunctionKind#TABLE}: * *

- *                            {@link BatchPhysicalCorrelate}
- *                                   /               \
- * empty {@link BatchPhysicalValuesRule}}     {@link FlinkLogicalTableFunctionScan}.
+ *   empty {@link BatchPhysicalValues} -> {@link BatchPhysicalCorrelate}
  * 
* - *

Add the rule to support select from a UDF directly, such as the following SQL: {@code SELECT * - * FROM LATERAL TABLE(func()) as T(c)} + *

{@link BatchPhysicalCorrelateRule} powers queries such as {@code SELECT * FROM T, LATERAL + * TABLE(func()) as T(c)} or {@code SELECT a, c FROM T, LATERAL TABLE(func(a)) as T(c)}. + * + *

For {@link FunctionKind#PROCESS_TABLE}: * - *

Note: {@link BatchPhysicalCorrelateRule} is responsible for converting a reasonable physical - * plan for the normal correlate query, such as the following SQL: example1: {@code SELECT * FROM T, - * LATERAL TABLE(func()) as T(c) example2: SELECT a, c FROM T, LATERAL TABLE(func(a)) as T(c)} + *

{@link FunctionKind#PROCESS_TABLE} is currently unsupported. */ @Value.Enclosing public class BatchPhysicalConstantTableFunctionScanRule @@ -65,18 +71,17 @@ public class BatchPhysicalConstantTableFunctionScanRule } public boolean matches(RelOptRuleCall call) { - FlinkLogicalTableFunctionScan scan = call.rel(0); - return RexUtil.isConstant(scan.getCall()) && scan.getInputs().isEmpty(); + final FlinkLogicalTableFunctionScan scan = call.rel(0); + return !RexUtil.containsInputRef(scan.getCall()) && scan.getInputs().isEmpty(); } public void onMatch(RelOptRuleCall call) { - FlinkLogicalTableFunctionScan scan = call.rel(0); - - // create correlate left - RelOptCluster cluster = scan.getCluster(); - RelTraitSet traitSet = + final FlinkLogicalTableFunctionScan scan = call.rel(0); + final RelOptCluster cluster = scan.getCluster(); + final RelTraitSet traitSet = call.getPlanner().emptyTraitSet().replace(FlinkConventions.BATCH_PHYSICAL()); - BatchPhysicalValues values = + + final BatchPhysicalValues values = new BatchPhysicalValues( cluster, traitSet, @@ -84,35 +89,37 @@ public class BatchPhysicalConstantTableFunctionScanRule cluster.getTypeFactory() .createStructType(ImmutableList.of(), ImmutableList.of())); - BatchPhysicalCorrelate correlate = - new BatchPhysicalCorrelate( - cluster, - traitSet, - values, - scan, - Option.empty(), - scan.getRowType(), - JoinRelType.INNER); - call.transformTo(correlate); + final FunctionDefinition function = ShortcutUtils.unwrapFunctionDefinition(scan.getCall()); + assert function != null; + final RelNode replacement; + if (function.getKind() == FunctionKind.TABLE) { + replacement = + new BatchPhysicalCorrelate( + cluster, + traitSet, + values, + scan, + Option.empty(), + scan.getRowType(), + JoinRelType.INNER); + } else { + throw new TableException("Unsupported function for scan:" + function.getKind()); + } + + call.transformTo(replacement); } /** Configuration for {@link BatchPhysicalConstantTableFunctionScanRule}. */ - @Value.Immutable(singleton = false) + @Value.Immutable public interface BatchPhysicalConstantTableFunctionScanRuleConfig extends RelRule.Config { - BatchPhysicalConstantTableFunctionScanRule.BatchPhysicalConstantTableFunctionScanRuleConfig - DEFAULT = - ImmutableBatchPhysicalConstantTableFunctionScanRule - .BatchPhysicalConstantTableFunctionScanRuleConfig.builder() - .build() - .withOperandSupplier( - b0 -> - b0.operand(FlinkLogicalTableFunctionScan.class) - .anyInputs()) - .withDescription("BatchPhysicalConstantTableFunctionScanRule") - .as( - BatchPhysicalConstantTableFunctionScanRule - .BatchPhysicalConstantTableFunctionScanRuleConfig - .class); + BatchPhysicalConstantTableFunctionScanRuleConfig DEFAULT = + ImmutableBatchPhysicalConstantTableFunctionScanRule + .BatchPhysicalConstantTableFunctionScanRuleConfig.builder() + .build() + .withOperandSupplier( + b0 -> b0.operand(FlinkLogicalTableFunctionScan.class).anyInputs()) + .withDescription("BatchPhysicalConstantTableFunctionScanRule") + .as(BatchPhysicalConstantTableFunctionScanRuleConfig.class); @Override default BatchPhysicalConstantTableFunctionScanRule toRule() { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.java index 385b34e0457..df177f7bdbc 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.java @@ -18,16 +18,22 @@ package org.apache.flink.table.planner.plan.rules.physical.stream; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.FunctionKind; import org.apache.flink.table.planner.plan.nodes.FlinkConventions; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCorrelate; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalProcessTableFunction; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalValues; +import org.apache.flink.table.planner.utils.ShortcutUtils; import com.google.common.collect.ImmutableList; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelRule; import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rex.RexUtil; import org.immutables.value.Value; @@ -35,20 +41,26 @@ import org.immutables.value.Value; import scala.Option; /** - * Converts {@link FlinkLogicalTableFunctionScan} with constant RexCall. To + * Converts {@link FlinkLogicalTableFunctionScan} with constant parameters. Add the rule to support + * selecting from a UDF directly, e.g. {@code SELECT * FROM func() as T(c)}. + * + *

For {@link org.apache.flink.table.functions.FunctionKind#TABLE}: * *

- *                           {@link StreamPhysicalCorrelate}
- *                              /                     \
- *       empty {@link StreamPhysicalValues}  {@link FlinkLogicalTableFunctionScan}
+ *   empty {@link StreamPhysicalValues} -> {@link StreamPhysicalCorrelate}
  * 
* - *

Add the rule to support select from a UDF directly, such as the following SQL: {@code SELECT * - * FROM LATERAL TABLE(func()) as T(c)} + *

{@link StreamPhysicalCorrelateRule} powers queries such as {@code SELECT * FROM T, LATERAL + * TABLE(func()) as T(c)} or {@code SELECT a, c FROM T, LATERAL TABLE(func(a)) as T(c)}. + * + *

For {@link org.apache.flink.table.functions.FunctionKind#PROCESS_TABLE}: + * + *

+ *   empty {@link StreamPhysicalValues} -> {@link StreamPhysicalProcessTableFunction}
+ * 
* - *

Note: @{link StreamPhysicalCorrelateRule} is responsible for converting a reasonable physical - * plan for the normal correlate query, such as the following SQL: example1: {@code SELECT * FROM T, - * LATERAL TABLE(func()) as T(c) example2: SELECT a, c FROM T, LATERAL TABLE(func(a)) as T(c)} + *

{@link StreamPhysicalProcessTableFunction} powers queries such as {@code SELECT * FROM func(t + * => TABLE T)} or {@code SELECT * FROM func(t => TABLE T PARTITION BY k)}. */ @Value.Enclosing public class StreamPhysicalConstantTableFunctionScanRule @@ -67,18 +79,17 @@ public class StreamPhysicalConstantTableFunctionScanRule } public boolean matches(RelOptRuleCall call) { - FlinkLogicalTableFunctionScan scan = call.rel(0); + final FlinkLogicalTableFunctionScan scan = call.rel(0); return !RexUtil.containsInputRef(scan.getCall()) && scan.getInputs().isEmpty(); } public void onMatch(RelOptRuleCall call) { - FlinkLogicalTableFunctionScan scan = call.rel(0); - - // create correlate left - RelOptCluster cluster = scan.getCluster(); - RelTraitSet traitSet = + final FlinkLogicalTableFunctionScan scan = call.rel(0); + final RelOptCluster cluster = scan.getCluster(); + final RelTraitSet traitSet = call.getPlanner().emptyTraitSet().replace(FlinkConventions.STREAM_PHYSICAL()); - StreamPhysicalValues values = + + final StreamPhysicalValues values = new StreamPhysicalValues( cluster, traitSet, @@ -86,20 +97,32 @@ public class StreamPhysicalConstantTableFunctionScanRule cluster.getTypeFactory() .createStructType(ImmutableList.of(), ImmutableList.of())); - StreamPhysicalCorrelate correlate = - new StreamPhysicalCorrelate( - cluster, - traitSet, - values, - scan, - Option.empty(), - scan.getRowType(), - JoinRelType.INNER); - call.transformTo(correlate); + final FunctionDefinition function = ShortcutUtils.unwrapFunctionDefinition(scan.getCall()); + assert function != null; + final RelNode replacement; + if (function.getKind() == FunctionKind.TABLE) { + replacement = + new StreamPhysicalCorrelate( + cluster, + traitSet, + values, + scan, + Option.empty(), + scan.getRowType(), + JoinRelType.INNER); + } else if (function.getKind() == FunctionKind.PROCESS_TABLE) { + replacement = + new StreamPhysicalProcessTableFunction( + cluster, traitSet, values, scan, scan.getRowType()); + } else { + throw new TableException("Unsupported function for scan:" + function.getKind()); + } + + call.transformTo(replacement); } /** Configuration for {@link StreamPhysicalConstantTableFunctionScanRule}. */ - @Value.Immutable(singleton = false) + @Value.Immutable public interface StreamPhysicalConstantTableFunctionScanRuleConfig extends RelRule.Config { StreamPhysicalConstantTableFunctionScanRule .StreamPhysicalConstantTableFunctionScanRuleConfig diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java index a19fc0dbaf9..484d0883be1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java @@ -76,6 +76,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMiniBatch import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMultipleInput; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCalc; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupAggregate; @@ -124,7 +125,7 @@ public final class ExecNodeMetadataUtil { } private static final Set>> EXEC_NODES = - new HashSet>>() { + new HashSet<>() { { add(StreamExecCalc.class); add(StreamExecChangelogNormalize.class); @@ -164,6 +165,7 @@ public final class ExecNodeMetadataUtil { add(StreamExecWindowTableFunction.class); add(StreamExecPythonCalc.class); add(StreamExecAsyncCalc.class); + add(StreamExecProcessTableFunction.class); add(StreamExecPythonCorrelate.class); add(StreamExecPythonGroupAggregate.class); add(StreamExecPythonGroupWindowAggregate.class); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java index 522b3731919..a025eacf3be 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java @@ -29,6 +29,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.expressions.RexNodeExpression; import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; +import org.apache.flink.table.planner.functions.utils.TableSqlFunction; import org.apache.calcite.plan.Context; import org.apache.calcite.plan.RelOptCluster; @@ -140,6 +141,10 @@ public final class ShortcutUtils { } final RexCall call = (RexCall) rexNode; if (!(call.getOperator() instanceof BridgingSqlFunction)) { + // legacy + if (call.getOperator() instanceof TableSqlFunction) { + return ((TableSqlFunction) call.getOperator()).udtf(); + } return null; } return ((BridgingSqlFunction) call.getOperator()).getDefinition(); diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala deleted file mode 100644 index ca664d7b63e..00000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.nodes.logical - -import org.apache.flink.table.functions.TemporalTableFunction -import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction -import org.apache.flink.table.planner.functions.utils.TableSqlFunction -import org.apache.flink.table.planner.plan.nodes.FlinkConventions - -import org.apache.calcite.plan._ -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.convert.ConverterRule.Config -import org.apache.calcite.rel.core.TableFunctionScan -import org.apache.calcite.rel.logical.LogicalTableFunctionScan -import org.apache.calcite.rel.metadata.RelColumnMapping -import org.apache.calcite.rex.{RexCall, RexNode} - -import java.lang.reflect.Type -import java.util - -import scala.collection.JavaConversions._ - -/** - * Sub-class of [[TableFunctionScan]] that is a relational expression which calls a table-valued - * function in Flink. - */ -class FlinkLogicalTableFunctionScan( - cluster: RelOptCluster, - traitSet: RelTraitSet, - inputs: util.List[RelNode], - rexCall: RexNode, - elementType: Type, - rowType: RelDataType, - columnMappings: util.Set[RelColumnMapping]) - extends TableFunctionScan( - cluster, - traitSet, - inputs, - rexCall, - elementType, - rowType, - columnMappings) - with FlinkLogicalRel { - - override def copy( - traitSet: RelTraitSet, - inputs: util.List[RelNode], - rexCall: RexNode, - elementType: Type, - rowType: RelDataType, - columnMappings: util.Set[RelColumnMapping]): TableFunctionScan = { - - new FlinkLogicalTableFunctionScan( - cluster, - traitSet, - inputs, - rexCall, - elementType, - rowType, - columnMappings) - } - -} - -class FlinkLogicalTableFunctionScanConverter(config: Config) extends ConverterRule(config) { - - override def matches(call: RelOptRuleCall): Boolean = { - val logicalTableFunction: LogicalTableFunctionScan = call.rel(0) - - !isTemporalTableFunctionCall(logicalTableFunction) - } - - private def isTemporalTableFunctionCall( - logicalTableFunction: LogicalTableFunctionScan): Boolean = { - - if (!logicalTableFunction.getCall.isInstanceOf[RexCall]) { - return false - } - val rexCall = logicalTableFunction.getCall.asInstanceOf[RexCall] - val functionDefinition = rexCall.getOperator match { - case tsf: TableSqlFunction => tsf.udtf - case bsf: BridgingSqlFunction => bsf.getDefinition - case _ => return false - } - functionDefinition.isInstanceOf[TemporalTableFunction] - } - - def convert(rel: RelNode): RelNode = { - val scan = rel.asInstanceOf[LogicalTableFunctionScan] - val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL).simplify() - val newInputs = scan.getInputs.map(input => RelOptRule.convert(input, FlinkConventions.LOGICAL)) - val rexCall = scan.getCall.asInstanceOf[RexCall]; - val builder = rel.getCluster.getRexBuilder - // When rexCall uses NamedArguments, RexCall is not inferred with the correct type. - // We just use the type of scan as the type of RexCall. - val newCall = rexCall.clone(rel.getRowType, rexCall.getOperands) - - new FlinkLogicalTableFunctionScan( - scan.getCluster, - traitSet, - newInputs, - newCall, - scan.getElementType, - scan.getRowType, - scan.getColumnMappings - ) - } - -} - -object FlinkLogicalTableFunctionScan { - val CONVERTER = new FlinkLogicalTableFunctionScanConverter( - Config.INSTANCE.withConversion( - classOf[LogicalTableFunctionScan], - Convention.NONE, - FlinkConventions.LOGICAL, - "FlinkLogicalTableFunctionScanConverter")) -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index 681dee6e766..e6d2892eab6 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -31,6 +31,7 @@ import org.apache.flink.table.planner.plan.utils.RankProcessStrategy.{AppendFast import org.apache.flink.table.planner.sinks.DataStreamTableSink import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig import org.apache.flink.table.runtime.operators.join.FlinkJoinType +import org.apache.flink.table.types.inference.{StaticArgument, StaticArgumentTrait} import org.apache.flink.types.RowKind import org.apache.calcite.rel.RelNode @@ -329,6 +330,29 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val providedTrait = new ModifyKindSetTrait(scan.intermediateTable.modifyKindSet) createNewNode(scan, List(), providedTrait, requiredTrait, requester) + case process: StreamPhysicalProcessTableFunction => + // Accepted changes depend on input argument declaration + val requiredChildrenTraits = process.getProvidedInputArgs + .map( + arg => + if (arg.is(StaticArgumentTrait.SUPPORT_UPDATES)) { + ModifyKindSetTrait.ALL_CHANGES + } else { + ModifyKindSetTrait.INSERT_ONLY + }) + .toList + + val children = if (requiredChildrenTraits.isEmpty) { + // Constant function has a single StreamPhysicalValues input + visitChildren(process, ModifyKindSetTrait.INSERT_ONLY) + } else { + visitChildren(process, requiredChildrenTraits) + } + + // Currently, PTFs will only output insert-only + val providedTrait = ModifyKindSetTrait.INSERT_ONLY + createNewNode(process, children, providedTrait, requiredTrait, requester) + case _ => throw new UnsupportedOperationException( s"Unsupported visit for ${rel.getClass.getSimpleName}") @@ -350,6 +374,16 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti newChildren.toList } + private def visitChildren( + parent: StreamPhysicalRel, + requiredChildrenTraits: List[ModifyKindSetTrait]): List[StreamPhysicalRel] = { + val requester = getNodeName(parent) + val newChildren = for (i <- 0 until parent.getInputs.size()) yield { + visitChild(parent, i, requiredChildrenTraits(i), requester) + } + newChildren.toList + } + private def visitChild( parent: StreamPhysicalRel, childOrdinal: Int, @@ -676,6 +710,20 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti createNewNode(rel, Some(List()), providedTrait) } + case process: StreamPhysicalProcessTableFunction => + // ProcessTableFunction currently only consumes retract or insert-only + val children = process.getInputs.map { + case child: StreamPhysicalRel => + val childModifyKindSet = getModifyKindSet(child) + val requiredChildTrait = if (childModifyKindSet.isInsertOnly) { + UpdateKindTrait.NONE + } else { + UpdateKindTrait.BEFORE_AND_AFTER + } + this.visit(child, requiredChildTrait) + }.toList + createNewNode(rel, Some(children.flatten), UpdateKindTrait.NONE) + case _ => throw new UnsupportedOperationException( s"Unsupported visit for ${rel.getClass.getSimpleName}") diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala index fb1fec0a1b6..b5d30afa20f 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.rules import org.apache.flink.table.planner.plan.nodes.logical._ +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalProcessTableFunctionRule import org.apache.flink.table.planner.plan.rules.logical._ import org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule import org.apache.flink.table.planner.plan.rules.physical.stream._ @@ -465,6 +466,8 @@ object FlinkStreamRuleSets { ExpandWindowTableFunctionTransposeRule.INSTANCE, StreamPhysicalWindowRankRule.INSTANCE, StreamPhysicalWindowDeduplicateRule.INSTANCE, + // process table function + StreamPhysicalProcessTableFunctionRule.INSTANCE, // join StreamPhysicalJoinRule.INSTANCE, StreamPhysicalIntervalJoinRule.INSTANCE, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java index 489f10a6123..e4baf0e0375 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java @@ -35,6 +35,7 @@ import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.module.Module; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.calcite.FlinkTypeSystem; +import org.apache.flink.table.planner.calcite.RexTableArgCall; import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils; @@ -684,7 +685,17 @@ public class RexNodeJsonSerdeTest { FlinkSqlOperatorTable.HASH_CODE, rexBuilder.makeInputRef(FACTORY.createSqlType(SqlTypeName.INTEGER), 1)), rexBuilder.makePatternFieldRef( - "test", FACTORY.createSqlType(SqlTypeName.INTEGER), 0)); + "test", FACTORY.createSqlType(SqlTypeName.INTEGER), 0), + new RexTableArgCall( + FACTORY.createStructType( + StructKind.PEEK_FIELDS_NO_EXPAND, + Arrays.asList( + FACTORY.createSqlType(SqlTypeName.VARCHAR), + FACTORY.createSqlType(SqlTypeName.INTEGER)), + Arrays.asList("f1", "f2")), + 0, + new int[] {1}, + new int[] {0})); } // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java index 8aeee6ca5c0..96b4e514775 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java @@ -46,6 +46,7 @@ import java.util.stream.Stream; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.apache.flink.table.annotation.ArgumentTrait.OPTIONAL_PARTITION_BY; import static org.apache.flink.table.annotation.ArgumentTrait.PASS_COLUMNS_THROUGH; +import static org.apache.flink.table.annotation.ArgumentTrait.SUPPORT_UPDATES; import static org.apache.flink.table.annotation.ArgumentTrait.TABLE_AS_ROW; import static org.apache.flink.table.annotation.ArgumentTrait.TABLE_AS_SET; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -58,9 +59,18 @@ public class ProcessTableFunctionTest extends TableTestBase { @BeforeEach void setup() { util = streamTestUtil(TableConfig.getDefault()); - util.tableEnv().executeSql("CREATE VIEW t1 AS SELECT 'Bob' AS name, 12 AS score"); - util.tableEnv().executeSql("CREATE VIEW t2 AS SELECT 'Bob' AS name, 12 AS different"); - util.tableEnv().executeSql("CREATE VIEW t3 AS SELECT 'Bob' AS name, TRUE AS isValid"); + util.tableEnv() + .executeSql( + "CREATE VIEW t AS SELECT * FROM (VALUES ('Bob', 12), ('Alice', 42)) AS T(name, score)"); + util.tableEnv() + .executeSql("CREATE VIEW t_name_diff AS SELECT 'Bob' AS name, 12 AS different"); + util.tableEnv() + .executeSql("CREATE VIEW t_type_diff AS SELECT 'Bob' AS name, TRUE AS isValid"); + util.tableEnv() + .executeSql("CREATE VIEW t_updating AS SELECT name, COUNT(*) FROM t GROUP BY name"); + util.tableEnv() + .executeSql( + "CREATE TABLE t_sink (name STRING, data STRING) WITH ('connector' = 'blackhole')"); } @Test @@ -86,13 +96,13 @@ public class ProcessTableFunctionTest extends TableTestBase { @Test void testTableAsRow() { util.addTemporarySystemFunction("f", TableAsRowFunction.class); - assertReachesOptimizer("SELECT * FROM f(r => TABLE t1, i => 1)"); + util.verifyRelPlan("SELECT * FROM f(r => TABLE t, i => 1)"); } @Test void testTypedTableAsRow() { util.addTemporarySystemFunction("f", TypedTableAsRowFunction.class); - assertReachesOptimizer("SELECT * FROM f(u => TABLE t1, i => 1)"); + util.verifyRelPlan("SELECT * FROM f(u => TABLE t, i => 1)"); } @Test @@ -100,25 +110,25 @@ public class ProcessTableFunctionTest extends TableTestBase { util.addTemporarySystemFunction("f", TypedTableAsRowFunction.class); // function expects // but table is - assertReachesOptimizer("SELECT * FROM f(u => TABLE t2, i => 1)"); + util.verifyRelPlan("SELECT * FROM f(u => TABLE t_name_diff, i => 1)"); } @Test void testTableAsSet() { util.addTemporarySystemFunction("f", TableAsSetFunction.class); - assertReachesOptimizer("SELECT * FROM f(r => TABLE t1 PARTITION BY name, i => 1)"); + util.verifyRelPlan("SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1)"); } @Test void testTableAsSetOptionalPartitionBy() { util.addTemporarySystemFunction("f", TableAsSetOptionalPartitionFunction.class); - assertReachesOptimizer("SELECT * FROM f(r => TABLE t1, i => 1)"); + util.verifyRelPlan("SELECT * FROM f(r => TABLE t, i => 1)"); } @Test void testTypedTableAsSet() { util.addTemporarySystemFunction("f", TypedTableAsSetFunction.class); - assertReachesOptimizer("SELECT * FROM f(u => TABLE t1 PARTITION BY name, i => 1)"); + util.verifyRelPlan("SELECT * FROM f(u => TABLE t PARTITION BY name, i => 1)"); } @Test @@ -131,27 +141,83 @@ public class ProcessTableFunctionTest extends TableTestBase { void testPojoArgs() { util.addTemporarySystemFunction("f", PojoArgsFunction.class); util.addTemporarySystemFunction("pojoCreator", PojoCreatingFunction.class); - assertReachesOptimizer( - "SELECT * FROM f(input => TABLE t1, scalar => pojoCreator('Bob', 12), uid => 'my-ptf')"); + util.verifyRelPlan( + "SELECT * FROM f(input => TABLE t, scalar => pojoCreator('Bob', 12), uid => 'my-ptf')"); } @Test void testTableAsSetPassThroughColumns() { util.addTemporarySystemFunction("f", TableAsSetPassThroughFunction.class); - assertReachesOptimizer("SELECT * FROM f(r => TABLE t1 PARTITION BY name, i => 1)"); + util.verifyRelPlan("SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1)"); } @Test void testTableAsRowPassThroughColumns() { util.addTemporarySystemFunction("f", TableAsRowPassThroughFunction.class); - assertReachesOptimizer("SELECT * FROM f(r => TABLE t1, i => 1)"); + util.verifyRelPlan("SELECT * FROM f(r => TABLE t, i => 1)"); + } + + @Test + void testUpdatingInput() { + util.addTemporarySystemFunction("f", UpdatingArgFunction.class); + util.verifyRelPlan("SELECT * FROM f(r => TABLE t_updating PARTITION BY name, i => 1)"); + } + + @Test + void testMissingUid() { + // Function name contains special characters and can thus not be used as UID + util.addTemporarySystemFunction("f*", ScalarArgsFunction.class); + assertThatThrownBy(() -> util.verifyRelPlan("SELECT * FROM `f*`(42, true)")) + .satisfies( + anyCauseMatches( + "Could not derive a unique identifier for process table function 'f*'. " + + "The function's name does not qualify for a UID. " + + "Please provide a custom identifier using the implicit `uid` argument. " + + "For example: myFunction(..., uid => 'my-id')")); + } + + @Test + void testUidPipelineSplitIntoTwoFunctions() { + util.addTemporarySystemFunction("f", TableAsSetFunction.class); + util.verifyExecPlan( + util.tableEnv() + .createStatementSet() + .addInsertSql( + "INSERT INTO t_sink SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1, uid => 'a')") + .addInsertSql( + "INSERT INTO t_sink SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1, uid => 'b')")); + } + + @Test + void testUidPipelineMergeIntoOneFunction() { + util.addTemporarySystemFunction("f", TableAsSetFunction.class); + util.verifyExecPlan( + util.tableEnv() + .createStatementSet() + .addInsertSql( + "INSERT INTO t_sink SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1, uid => 'same')") + .addInsertSql( + "INSERT INTO t_sink SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1, uid => 'same')")); + } + + @Test + void testUidPipelineMergeWithFanOut() { + util.addTemporarySystemFunction("f", TableAsSetFunction.class); + + util.verifyExecPlan( + util.tableEnv() + .createStatementSet() + .addInsertSql( + "INSERT INTO t_sink SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1, uid => 'same') WHERE name = 'Bob'") + .addInsertSql( + "INSERT INTO t_sink SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1, uid => 'same') WHERE name = 'Alice'")); } @ParameterizedTest @MethodSource("errorSpecs") void testErrorBehavior(ErrorSpec spec) { util.addTemporarySystemFunction("f", spec.functionClass); - assertThatThrownBy(() -> util.verifyRelPlan(spec.sql)) + assertThatThrownBy(() -> util.verifyExecPlan(spec.sql)) .satisfies(anyCauseMatches(spec.errorMessage)); } @@ -162,31 +228,31 @@ public class ProcessTableFunctionTest extends TableTestBase { ScalarArgsFunction.class, "SELECT * FROM f(uid => '%', i => 1, b => true)", "Invalid unique identifier for process table function. " - + "The 'uid' argument must be a string literal that follows the pattern [a-zA-Z_][a-zA-Z-_0-9]*. " + + "The `uid` argument must be a string literal that follows the pattern [a-zA-Z_][a-zA-Z-_0-9]*. " + "But found: %"), ErrorSpec.of( "typed table as row with invalid input", TypedTableAsRowFunction.class, // function expects - "SELECT * FROM f(u => TABLE t3, i => 1)", + "SELECT * FROM f(u => TABLE t_type_diff, i => 1)", "No match found for function signature " + "f(, , )"), ErrorSpec.of( "table as set with missing partition by", TableAsSetFunction.class, - "SELECT * FROM f(r => TABLE t1, i => 1)", + "SELECT * FROM f(r => TABLE t, i => 1)", "Table argument 'r' requires a PARTITION BY clause for parallel processing."), ErrorSpec.of( "typed table as set with invalid input", TypedTableAsSetFunction.class, // function expects - "SELECT * FROM f(u => TABLE t3 PARTITION BY name, i => 1)", + "SELECT * FROM f(u => TABLE t_type_diff PARTITION BY name, i => 1)", "No match found for function signature " + "f(, , )"), ErrorSpec.of( "table function instead of process table function", NoProcessTableFunction.class, - "SELECT * FROM f(r => TABLE t1)", + "SELECT * FROM f(r => TABLE t)", "Only scalar arguments are supported at this location. " + "But argument 'r' declared the following traits: [TABLE, TABLE_AS_ROW]"), ErrorSpec.of( @@ -197,7 +263,7 @@ public class ProcessTableFunctionTest extends TableTestBase { ErrorSpec.of( "multiple table args", MultiTableFunction.class, - "SELECT * FROM f(r1 => TABLE t1, r2 => TABLE t1)", + "SELECT * FROM f(r1 => TABLE t, r2 => TABLE t)", "Currently, only signatures with at most one table argument are supported."), ErrorSpec.of( "row instead of table", @@ -207,25 +273,36 @@ public class ProcessTableFunctionTest extends TableTestBase { ErrorSpec.of( "table as row partition by", TableAsRowFunction.class, - "SELECT * FROM f(r => TABLE t1 PARTITION BY name, i => 1)", + "SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1)", "Only tables with set semantics may be partitioned. " + "Invalid PARTITION BY clause in the 0-th operand of table function 'f'"), ErrorSpec.of( "invalid partition by clause", TableAsSetFunction.class, - "SELECT * FROM f(r => TABLE t1 PARTITION BY invalid, i => 1)", + "SELECT * FROM f(r => TABLE t PARTITION BY invalid, i => 1)", "Invalid column 'invalid' for PARTITION BY clause. Available columns are: [name, score]"), ErrorSpec.of( "unsupported order by", TableAsSetFunction.class, - "SELECT * FROM f(r => TABLE t1 PARTITION BY name ORDER BY score, i => 1)", - "ORDER BY clause is currently not supported.")); - } - - private void assertReachesOptimizer(String sql) { - assertThatThrownBy(() -> util.verifyRelPlan(sql)) - .hasMessageContaining( - "This exception indicates that the query uses an unsupported SQL feature."); + "SELECT * FROM f(r => TABLE t PARTITION BY name ORDER BY score, i => 1)", + "ORDER BY clause is currently not supported."), + ErrorSpec.of( + "updates into insert-only table arg", + TableAsSetFunction.class, + "SELECT * FROM f(r => TABLE t_updating PARTITION BY name, i => 1)", + "StreamPhysicalProcessTableFunction doesn't support consuming update changes"), + ErrorSpec.of( + "updates into POJO table arg", + InvalidTypedUpdatingArgFunction.class, + "SELECT * FROM f(r => TABLE t_updating, i => 1)", + "Table arguments that support updates must use a row type."), + ErrorSpec.of( + "uid conflict", + TableAsSetFunction.class, + "SELECT * FROM f(r => TABLE t PARTITION BY name, i => 42, uid => 'same') " + + "UNION ALL SELECT * FROM f(r => TABLE t PARTITION BY name, i => 999, uid => 'same')", + "Duplicate unique identifier 'same' detected among process table functions. " + + "Make sure that all PTF calls have an identifier defined that is globally unique.")); } /** Testing function. */ @@ -252,6 +329,18 @@ public class ProcessTableFunctionTest extends TableTestBase { public void eval(@ArgumentHint(TABLE_AS_SET) Row r, Integer i) {} } + /** Testing function. */ + public static class UpdatingArgFunction extends ProcessTableFunction { + @SuppressWarnings("unused") + public void eval(@ArgumentHint({TABLE_AS_SET, SUPPORT_UPDATES}) Row r, Integer i) {} + } + + /** Testing function. */ + public static class InvalidTypedUpdatingArgFunction extends ProcessTableFunction { + @SuppressWarnings("unused") + public void eval(@ArgumentHint({TABLE_AS_ROW, SUPPORT_UPDATES}) User u, Integer i) {} + } + /** Testing function. */ public static class MultiTableFunction extends ProcessTableFunction { @SuppressWarnings("unused") diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml index 5f4f90d18fd..3ab9cb43979 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml @@ -16,6 +16,93 @@ See the License for the specific language governing permissions and limitations under the License. --> + + + 'my-ptf')]]> + + + + + + + + + + + TABLE t, scalar => pojoCreator('Bob', 12), uid => 'my-ptf')]]> + + + + + + + + + + + 1, b => true)]]> + + + + + + + + + + + + + + + + 'my-uid', i => 1, b => true)]]> @@ -28,48 +115,273 @@ LogicalProject(EXPR$0=[$0]) - + - 'my-ptf')]]> + TABLE t, i => 1)]]> - + - 1, b => true, invalid => 'invalid')]]> + TABLE t, i => 1)]]> + + + + + + + + + + + TABLE t PARTITION BY name, i => 1)]]> + + + + + + + + + + + TABLE t PARTITION BY name, i => 1)]]> + + + + + + + + + + + TABLE t, i => 1)]]> - + - 1, b => true)]]> + TABLE t_name_diff, i => 1)]]> + + + + + + + + + + + TABLE t PARTITION BY name, i => 1)]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + TABLE t_updating PARTITION BY name, i => 1)]]> + + + + + + + + + + + TABLE t, i => 1)]]> + + + + + + + + + + + 1, b => true, invalid => 'invalid')]]>