This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new a1045ce55e fix(frontend): drop stale attribute references when an
operator's input schema changes (#5294)
a1045ce55e is described below
commit a1045ce55e9169deb94f8ae3bf195692ef994ec5
Author: Kunwoo (Chris) <[email protected]>
AuthorDate: Sat May 30 12:31:06 2026 -0700
fix(frontend): drop stale attribute references when an operator's input
schema changes (#5294)
### What changes were proposed in this PR?
Fixes a bug where copy-pasting a schema-propagated Aggregate operator
and connecting it to a different upstream operator caused a compile
error that persisted even after clearing all properties.
**Why it is a bug**: An Aggregate stores property values that reference
input column names — `groupByKeys` and `aggregations[].attribute`.
Copy-paste clones `operatorProperties`, so the pasted operator keeps
references to the *old* source's columns. When it is wired to a
different operator, the backend `AggregateOpDesc` schema propagation
calls `inputSchema.getAttribute("<old col>")`, which throws. These old
column values were never removed.
`DynamicSchemaService.setDynamicSchema` is documented to drop properties
invalidated by a new schema, but the code never actually did it.
**What changed:** When schema propagation produces an input-attribute
list, `WorkflowCompilingService` now drops property values that the new
schema invalidates via a new pure helper `dropInvalidAttributeValues`:
- `autofill: "attributeName"` -> reset to `""` if not in the enum
- `autofill: "attributeNameList"` -> filter out entries not in the enum
- recurses through nested objects/arrays (so `aggregations[].attribute`
is handled) and never mutates the input
The misleading doc comment on `setDynamicSchema` now points to where the
contract is fulfilled.
### Any related issues, documentation, discussions?
Resolves #3070.
### How was this PR tested?
Manually tested:
https://github.com/user-attachments/assets/d46e6198-10de-4efe-8c34-71981374cde8
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.8)
---
.../workflow-compiling.service.spec.ts | 242 +++++++++++++++++++++
.../compile-workflow/workflow-compiling.service.ts | 83 +++++++
.../dynamic-schema/dynamic-schema.service.ts | 6 +-
3 files changed, 328 insertions(+), 3 deletions(-)
diff --git
a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.spec.ts
b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.spec.ts
new file mode 100644
index 0000000000..b456205f22
--- /dev/null
+++
b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.spec.ts
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { JSONSchema7Definition } from "json-schema";
+import { TestBed } from "@angular/core/testing";
+import { WorkflowCompilingService } from "./workflow-compiling.service";
+import { WorkflowActionService } from
"../workflow-graph/model/workflow-action.service";
+import { DynamicSchemaService } from
"../dynamic-schema/dynamic-schema.service";
+import { ValidationWorkflowService } from
"../validation/validation-workflow.service";
+import { OperatorMetadataService } from
"../operator-metadata/operator-metadata.service";
+import { StubOperatorMetadataService } from
"../operator-metadata/stub-operator-metadata.service";
+import { JointUIService } from "../joint-ui/joint-ui.service";
+import { WorkflowUtilService } from
"../workflow-graph/util/workflow-util.service";
+import { UndoRedoService } from "../undo-redo/undo-redo.service";
+import { mockPoint, mockScanPredicate } from
"../workflow-graph/model/mock-workflow-data";
+import { serializePortIdentity } from
"../../../common/util/port-identity-serde";
+import { commonTestImports, commonTestProviders } from
"../../../common/testing/test-utils";
+
+describe("WorkflowCompilingService.dropInvalidAttributeValues", () => {
+ // A schema shaped like the Aggregate operator after schema propagation has
filled in the
+ // valid input attribute names ("col_y" is the only attribute available on
the new input).
+ const aggregateSchema = (): JSONSchema7Definition =>
+ ({
+ type: "object",
+ properties: {
+ groupByKeys: {
+ type: "array",
+ autofill: "attributeNameList",
+ items: { type: "string", enum: ["col_y", ""] },
+ },
+ aggregations: {
+ type: "array",
+ items: {
+ type: "object",
+ properties: {
+ attribute: { type: "string", autofill: "attributeName", enum:
["col_y"] },
+ aggFunction: { type: "string" },
+ resultAttribute: { type: "string" },
+ },
+ },
+ },
+ },
+ }) as unknown as JSONSchema7Definition;
+
+ it("drops list entries and resets single attributes that are no longer
valid", () => {
+ const properties = {
+ groupByKeys: ["col_x", "col_y"],
+ aggregations: [{ attribute: "col_x", aggFunction: "sum",
resultAttribute: "r" }],
+ };
+
+ const { value, changed } =
WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(),
properties);
+
+ expect(changed).toBe(true);
+ expect(value.groupByKeys).toEqual(["col_y"]);
+ expect(value.aggregations[0].attribute).toBe("");
+ // non-attribute fields are preserved
+ expect(value.aggregations[0].aggFunction).toBe("sum");
+ expect(value.aggregations[0].resultAttribute).toBe("r");
+ // the input object is never mutated
+ expect(properties.groupByKeys).toEqual(["col_x", "col_y"]);
+ expect(properties.aggregations[0].attribute).toBe("col_x");
+ });
+
+ it("reports no change when all attribute references are valid", () => {
+ const properties = {
+ groupByKeys: ["col_y"],
+ aggregations: [{ attribute: "col_y", aggFunction: "sum",
resultAttribute: "r" }],
+ };
+
+ const { value, changed } =
WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(),
properties);
+
+ expect(changed).toBe(false);
+ expect(value).toBe(properties);
+ });
+
+ it("makes no change when the input schema (enum) is unknown", () => {
+ const schemaWithoutEnum: JSONSchema7Definition = {
+ type: "object",
+ properties: {
+ groupByKeys: {
+ type: "array",
+ autofill: "attributeNameList",
+ items: { type: "string" },
+ },
+ aggregations: {
+ type: "array",
+ items: {
+ type: "object",
+ properties: {
+ attribute: { type: "string", autofill: "attributeName" },
+ },
+ },
+ },
+ },
+ } as unknown as JSONSchema7Definition;
+
+ const properties = {
+ groupByKeys: ["col_x"],
+ aggregations: [{ attribute: "col_x" }],
+ };
+
+ const { value, changed } =
WorkflowCompilingService.dropInvalidAttributeValues(schemaWithoutEnum,
properties);
+
+ expect(changed).toBe(false);
+ expect(value).toBe(properties);
+ });
+
+ it("returns the value unchanged for non-object schemas or nullish values",
() => {
+ // boolean schema (e.g. `additionalProperties: true`)
+ expect(WorkflowCompilingService.dropInvalidAttributeValues(true, { a: 1
})).toEqual({
+ value: { a: 1 },
+ changed: false,
+ });
+ // null / undefined values are not walked
+
expect(WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(),
null)).toEqual({
+ value: null,
+ changed: false,
+ });
+
expect(WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(),
undefined)).toEqual({
+ value: undefined,
+ changed: false,
+ });
+ });
+
+ it("skips schema properties that are absent from the value object", () => {
+ // the value is missing both `groupByKeys` and `aggregations` defined in
the schema
+ const properties = { unrelated: "keep-me" };
+
+ const { value, changed } =
WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(),
properties);
+
+ expect(changed).toBe(false);
+ expect(value).toBe(properties);
+ });
+});
+
+describe("WorkflowCompilingService schema propagation property cleanup", () =>
{
+ let service: WorkflowCompilingService;
+ let workflowActionService: WorkflowActionService;
+ let dynamicSchemaService: DynamicSchemaService;
+
+ beforeEach(() => {
+ TestBed.configureTestingModule({
+ imports: [...commonTestImports],
+ providers: [
+ { provide: OperatorMetadataService, useClass:
StubOperatorMetadataService },
+ JointUIService,
+ WorkflowActionService,
+ WorkflowUtilService,
+ UndoRedoService,
+ DynamicSchemaService,
+ ValidationWorkflowService,
+ WorkflowCompilingService,
+ ...commonTestProviders,
+ ],
+ });
+ service = TestBed.inject(WorkflowCompilingService);
+ workflowActionService = TestBed.inject(WorkflowActionService);
+ dynamicSchemaService = TestBed.inject(DynamicSchemaService);
+ });
+
+ it("drops operator property values that the propagated input schema no
longer supports", () => {
+ const operatorID = mockScanPredicate.operatorID;
+ workflowActionService.addOperator(mockScanPredicate, mockPoint);
+
+ // give the operator a schema with attribute-autofill properties bound to
input port 0
+ const baseSchema = dynamicSchemaService.getDynamicSchema(operatorID);
+ dynamicSchemaService.setDynamicSchema(operatorID, {
+ ...baseSchema,
+ jsonSchema: {
+ type: "object",
+ properties: {
+ groupByKeys: {
+ type: "array",
+ autofill: "attributeNameList",
+ autofillAttributeOnPort: 0,
+ items: { type: "string" },
+ },
+ attribute: { type: "string", autofill: "attributeName",
autofillAttributeOnPort: 0 },
+ },
+ } as any,
+ });
+
+ // stale references to "col_x", a column that does not exist on the new
input
+ workflowActionService.setOperatorProperty(operatorID, { groupByKeys:
["col_x", "col_y"], attribute: "col_x" });
+
+ // the propagated input schema only contains "col_y"
+ vi.spyOn(service, "getOperatorInputSchemaMap").mockReturnValue({
+ [serializePortIdentity({ id: 0, internal: false })]: [{ attributeName:
"col_y", attributeType: "string" }],
+ } as any);
+
+ // invoke the private propagation handler directly (normally triggered by
a compile response)
+ (service as any).applySchemaPropagationResult();
+
+ const cleaned =
workflowActionService.getTexeraGraph().getOperator(operatorID).operatorProperties;
+ expect(cleaned.groupByKeys).toEqual(["col_y"]);
+ expect(cleaned.attribute).toBe("");
+ });
+
+ it("leaves valid property values untouched", () => {
+ const operatorID = mockScanPredicate.operatorID;
+ workflowActionService.addOperator(mockScanPredicate, mockPoint);
+
+ const baseSchema = dynamicSchemaService.getDynamicSchema(operatorID);
+ dynamicSchemaService.setDynamicSchema(operatorID, {
+ ...baseSchema,
+ jsonSchema: {
+ type: "object",
+ properties: {
+ attribute: { type: "string", autofill: "attributeName",
autofillAttributeOnPort: 0 },
+ },
+ } as any,
+ });
+
+ workflowActionService.setOperatorProperty(operatorID, { attribute: "col_y"
});
+
+ vi.spyOn(service, "getOperatorInputSchemaMap").mockReturnValue({
+ [serializePortIdentity({ id: 0, internal: false })]: [{ attributeName:
"col_y", attributeType: "string" }],
+ } as any);
+
+ const setSpy = vi.spyOn(workflowActionService, "setOperatorProperty");
+ (service as any).applySchemaPropagationResult();
+
+ expect(setSpy).not.toHaveBeenCalled();
+
expect(workflowActionService.getTexeraGraph().getOperator(operatorID).operatorProperties.attribute).toBe("col_y");
+ });
+});
diff --git
a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
index 9648d2b305..dae47a0088 100644
---
a/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
+++
b/frontend/src/app/workspace/service/compile-workflow/workflow-compiling.service.ts
@@ -19,6 +19,7 @@
import { HttpClient, HttpHeaders } from "@angular/common/http";
import { Injectable } from "@angular/core";
+import { JSONSchema7Definition } from "json-schema";
import { EMPTY, merge, Observable, ReplaySubject } from "rxjs";
import { CustomJSONSchema7 } from
"src/app/workspace/types/custom-json-schema.interface";
import { AppSettings } from "../../../common/app-setting";
@@ -190,6 +191,19 @@ export class WorkflowCompilingService {
let newDynamicSchema: OperatorSchema;
if (inputSchema) {
newDynamicSchema =
WorkflowCompilingService.setOperatorInputAttrs(currentDynamicSchema,
inputSchema);
+
+ // Now that the list of input attributes is known, drop any operator
property
+ // values that reference attributes which no longer exist in the input
schema (e.g. a copy-pasted
+ // operator wired to a different upstream, or an operator re-wired to
a new source). Otherwise
+ // these old references cause a compile error that survives even after
clearing properties.
+ const operator =
this.workflowActionService.getTexeraGraph().getOperator(operatorID);
+ const { value: cleanedProperties, changed } =
WorkflowCompilingService.dropInvalidAttributeValues(
+ newDynamicSchema.jsonSchema,
+ operator.operatorProperties
+ );
+ if (changed) {
+ this.workflowActionService.setOperatorProperty(operatorID,
cleanedProperties);
+ }
} else {
// otherwise, the input attributes of the operator is unknown
// if the operator is not a source operator, restore its original
schema of input attributes
@@ -392,6 +406,75 @@ export class WorkflowCompilingService {
};
}
+ /**
+ * Walks an operator's property values with its json schema and drops any
+ * value that references an input attribute which is no longer valid.
+ *
+ * Only properties marked with an `autofill` annotation are affected, and
only when the schema carries an
+ * `enum` of valid attribute names (i.e. the input schema is known). Two
cases are handled:
+ * - `attributeName`: a single column name. Reset to "" if it's not in the
enum.
+ * - `attributeNameList`: a list of column names. Filter out entries that
aren't in the enum.
+ *
+ * Returns the (possibly new) properties object and whether anything changed.
+ */
+ public static dropInvalidAttributeValues(
+ schema: JSONSchema7Definition | undefined,
+ value: any
+ ): { value: any; changed: boolean } {
+ if (typeof schema !== "object" || schema === null || value === undefined
|| value === null) {
+ return { value, changed: false };
+ }
+ const s = schema as CustomJSONSchema7;
+
+ if (s.autofill === "attributeNameList") {
+ const itemEnum = (s.items as CustomJSONSchema7 | undefined)?.enum;
+ if (Array.isArray(value) && Array.isArray(itemEnum)) {
+ const filtered = value.filter(v => itemEnum.includes(v));
+ return { value: filtered, changed: filtered.length !== value.length };
+ }
+ return { value, changed: false };
+ }
+
+ if (s.autofill === "attributeName") {
+ if (Array.isArray(s.enum) && typeof value === "string" &&
!s.enum.includes(value)) {
+ return { value: "", changed: true };
+ }
+ return { value, changed: false };
+ }
+
+ // recurse into object properties
+ if (s.properties && typeof value === "object" && !Array.isArray(value)) {
+ let changed = false;
+ const newValue = { ...value };
+ Object.entries(s.properties).forEach(([key, propSchema]) => {
+ if (key in newValue) {
+ const res =
WorkflowCompilingService.dropInvalidAttributeValues(propSchema, newValue[key]);
+ if (res.changed) {
+ newValue[key] = res.value;
+ changed = true;
+ }
+ }
+ });
+ return { value: changed ? newValue : value, changed };
+ }
+
+ // recurse into array items (only when items is a single schema, not a
tuple schema)
+ if (s.items && !Array.isArray(s.items) && Array.isArray(value)) {
+ let changed = false;
+ const newArr = value.map(item => {
+ const res =
WorkflowCompilingService.dropInvalidAttributeValues(s.items as
JSONSchema7Definition, item);
+ if (res.changed) {
+ changed = true;
+ return res.value;
+ }
+ return item;
+ });
+ return { value: changed ? newArr : value, changed };
+ }
+
+ return { value, changed: false };
+ }
+
public static restoreOperatorInputAttrs(operatorSchema: OperatorSchema):
OperatorSchema {
let newJsonSchema = operatorSchema.jsonSchema;
diff --git
a/frontend/src/app/workspace/service/dynamic-schema/dynamic-schema.service.ts
b/frontend/src/app/workspace/service/dynamic-schema/dynamic-schema.service.ts
index e2a7592772..50c33e7e86 100644
---
a/frontend/src/app/workspace/service/dynamic-schema/dynamic-schema.service.ts
+++
b/frontend/src/app/workspace/service/dynamic-schema/dynamic-schema.service.ts
@@ -120,9 +120,9 @@ export class DynamicSchemaService {
/**
* Sets the dynamic schema of an operator. If the new schema is different,
also emit dynamic schema changed event.
*
- * The new dynamic schema is validated against the current operator
properties.
- * If the changed new dynamic schema invalidates some property, then the
invalid properties fields will be dropped.
- *
+ * Note: dropping operator property values that the new schema invalidates
(e.g. attribute references that no
+ * longer exist after schema propagation) is handled by
WorkflowCompilingService.dropInvalidAttributeValues,
+ * which has access to the propagated input attributes.
*/
public setDynamicSchema(operatorID: string, dynamicSchema: OperatorSchema):
void {
const currentDynamicSchema = this.dynamicSchemaMap.get(operatorID);