This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2bb224cdd2 [Bugfix] schema update bug fix (#9382) 2bb224cdd2 is described below commit 2bb224cdd2c70ded78c6b8d400ad237d9353ac5e Author: MeihanLi <42751784+meiha...@users.noreply.github.com> AuthorDate: Thu Sep 15 18:25:21 2022 -0700 [Bugfix] schema update bug fix (#9382) --- .../broker/broker/HelixBrokerStarterTest.java | 2 +- .../org/apache/pinot/common/data/SchemaTest.java | 10 ++++---- .../api/resources/PinotSchemaRestletResource.java | 21 +++++++++++----- .../api/resources/TableConfigsRestletResource.java | 2 +- .../helix/core/PinotHelixResourceManager.java | 21 +++++++++------- ...PinotIngestionRestletResourceStatelessTest.java | 2 +- ...PinotInstanceAssignmentRestletResourceTest.java | 2 +- .../api/PinotSchemaRestletResourceTest.java | 28 +++++++++++++++++----- .../controller/api/upload/ZKOperatorTest.java | 2 +- .../pinot/controller/helix/TableCacheTest.java | 2 +- .../java/org/apache/pinot/spi/data/FieldSpec.java | 15 ++++++++++++ .../java/org/apache/pinot/spi/data/Schema.java | 8 +++++-- 12 files changed, 82 insertions(+), 33 deletions(-) diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java index 0fc2ddcd2e..b2abbd59a9 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java @@ -88,7 +88,7 @@ public class HelixBrokerStarterTest extends ControllerTest { Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME) .addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.INT, "EPOCH|DAYS", "1:DAYS").build(); - _helixResourceManager.addSchema(schema, true); + _helixResourceManager.addSchema(schema, true, false); TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME) .setTimeType(TimeUnit.DAYS.name()).build(); diff --git a/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java b/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java index a53228e841..e2e02ff193 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java @@ -401,7 +401,7 @@ public class SchemaTest { .addMetric("metric", FieldSpec.DataType.INT).addMetric("metricWithDefault", FieldSpec.DataType.INT, 5) .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.HOURS, "time"), null) .addDateTime("dateTime", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS").build(); - Assert.assertFalse(schema3.isBackwardCompatibleWith(oldSchema)); + Assert.assertTrue(schema3.isBackwardCompatibleWith(oldSchema)); // change datetime column Schema schema4 = new Schema.SchemaBuilder().addSingleValueDimension("svDimension", FieldSpec.DataType.INT) @@ -411,7 +411,7 @@ public class SchemaTest { .addMetric("metric", FieldSpec.DataType.INT).addMetric("metricWithDefault", FieldSpec.DataType.INT, 5) .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.DAYS, "time"), null) .addDateTime("dateTime", FieldSpec.DataType.LONG, "2:HOURS:EPOCH", "1:HOURS").build(); // timeUnit 1 -> 2 - Assert.assertFalse(schema4.isBackwardCompatibleWith(oldSchema)); + Assert.assertTrue(schema4.isBackwardCompatibleWith(oldSchema)); // change default value Schema schema5 = new Schema.SchemaBuilder().addSingleValueDimension("svDimension", FieldSpec.DataType.INT) @@ -421,7 +421,7 @@ public class SchemaTest { .addMetric("metric", FieldSpec.DataType.INT).addMetric("metricWithDefault", FieldSpec.DataType.INT, 5) .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.DAYS, "time"), null) .addDateTime("dateTime", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS").build(); - Assert.assertFalse(schema5.isBackwardCompatibleWith(oldSchema)); + Assert.assertTrue(schema5.isBackwardCompatibleWith(oldSchema)); // add a new column Schema schema6 = new Schema.SchemaBuilder().addSingleValueDimension("svDimension", FieldSpec.DataType.INT) @@ -464,12 +464,12 @@ public class SchemaTest { Assert.assertTrue(newSchema.isBackwardCompatibleWith(oldSchema)); Assert.assertEquals(newSchema, oldSchema); - // STRING with default to BOOLEAN without default - incompatible + // STRING with default to BOOLEAN without default - backward compatible change newSchema = new Schema.SchemaBuilder().addSingleValueDimension("svInt", FieldSpec.DataType.INT) .addSingleValueDimension("svString", FieldSpec.DataType.STRING) .addSingleValueDimension("svStringWithDefault", FieldSpec.DataType.BOOLEAN).build(); newSchema.updateBooleanFieldsIfNeeded(oldSchema); - Assert.assertFalse(newSchema.isBackwardCompatibleWith(oldSchema)); + Assert.assertTrue(newSchema.isBackwardCompatibleWith(oldSchema)); // New added BOOLEAN - compatible newSchema = new Schema.SchemaBuilder().addSingleValueDimension("svInt", FieldSpec.DataType.INT) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java index 4b2a465baa..3b0a1959f4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java @@ -213,7 +213,11 @@ public class PinotSchemaRestletResource { @ManualAuthorization // performed after parsing schema public ConfigSuccessResponse addSchema( @ApiParam(value = "Whether to override the schema if the schema exists") @DefaultValue("true") - @QueryParam("override") boolean override, FormDataMultiPart multiPart, @Context HttpHeaders httpHeaders, + @QueryParam("override") boolean override, + @ApiParam(value = "Whether to force overriding the schema if the schema exists") @DefaultValue("false") + @QueryParam("force") boolean force, + FormDataMultiPart multiPart, + @Context HttpHeaders httpHeaders, @Context Request request) { Pair<Schema, Map<String, Object>> schemaAndUnrecognizedProps = getSchemaAndUnrecognizedPropertiesFromMultiPart(multiPart); @@ -222,7 +226,7 @@ public class PinotSchemaRestletResource { validateSchemaName(schema.getSchemaName()); AccessControlUtils.validatePermission(schema.getSchemaName(), AccessType.CREATE, httpHeaders, endpointUrl, _accessControlFactory.create()); - SuccessResponse successResponse = addSchema(schema, override); + SuccessResponse successResponse = addSchema(schema, override, force); return new ConfigSuccessResponse(successResponse.getStatus(), schemaAndUnrecognizedProps.getRight()); } @@ -240,7 +244,11 @@ public class PinotSchemaRestletResource { @ManualAuthorization // performed after parsing schema public ConfigSuccessResponse addSchema( @ApiParam(value = "Whether to override the schema if the schema exists") @DefaultValue("true") - @QueryParam("override") boolean override, String schemaJsonString, @Context HttpHeaders httpHeaders, + @QueryParam("override") boolean override, + @ApiParam(value = "Whether to force overriding the schema if the schema exists") @DefaultValue("false") + @QueryParam("force") boolean force, + String schemaJsonString, + @Context HttpHeaders httpHeaders, @Context Request request) { Pair<Schema, Map<String, Object>> schemaAndUnrecognizedProperties = null; try { @@ -255,7 +263,7 @@ public class PinotSchemaRestletResource { validateSchemaName(schema.getSchemaName()); AccessControlUtils.validatePermission(schema.getSchemaName(), AccessType.CREATE, httpHeaders, endpointUrl, _accessControlFactory.create()); - SuccessResponse successResponse = addSchema(schema, override); + SuccessResponse successResponse = addSchema(schema, override, force); return new ConfigSuccessResponse(successResponse.getStatus(), schemaAndUnrecognizedProperties.getRight()); } @@ -344,13 +352,14 @@ public class PinotSchemaRestletResource { * Internal method to add schema * @param schema schema * @param override set to true to override the existing schema with the same name + * @param force set to true to skip all rules and force to override the existing schema with the same name */ - private SuccessResponse addSchema(Schema schema, boolean override) { + private SuccessResponse addSchema(Schema schema, boolean override, boolean force) { String schemaName = schema.getSchemaName(); validateSchemaInternal(schema); try { - _pinotHelixResourceManager.addSchema(schema, override); + _pinotHelixResourceManager.addSchema(schema, override, force); // Best effort notification. If controller fails at this point, no notification is given. LOGGER.info("Notifying metadata event for adding new schema {}", schemaName); _metadataEventNotifierFactory.create().notifyOnSchemaEvents(schema, SchemaEventType.CREATE); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java index 5fe6e325b2..1a322ff06b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java @@ -212,7 +212,7 @@ public class TableConfigsRestletResource { } try { - _pinotHelixResourceManager.addSchema(schema, false); + _pinotHelixResourceManager.addSchema(schema, false, false); LOGGER.info("Added schema: {}", schema.getSchemaName()); if (offlineTableConfig != null) { _pinotHelixResourceManager.addTable(offlineTableConfig); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index e3b2b31ed1..9d40e6a385 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1258,7 +1258,7 @@ public class PinotHelixResourceManager { * Schema APIs */ - public void addSchema(Schema schema, boolean override) + public void addSchema(Schema schema, boolean override, boolean force) throws SchemaAlreadyExistsException, SchemaBackwardIncompatibleException { String schemaName = schema.getSchemaName(); LOGGER.info("Adding schema: {} with override: {}", schemaName, override); @@ -1267,7 +1267,7 @@ public class PinotHelixResourceManager { if (oldSchema != null) { // Update existing schema if (override) { - updateSchema(schema, oldSchema); + updateSchema(schema, oldSchema, force); } else { throw new SchemaAlreadyExistsException(String.format("Schema: %s already exists", schemaName)); } @@ -1288,7 +1288,7 @@ public class PinotHelixResourceManager { throw new SchemaNotFoundException(String.format("Schema: %s does not exist", schemaName)); } - updateSchema(schema, oldSchema); + updateSchema(schema, oldSchema, false); if (reload) { LOGGER.info("Reloading tables with name: {}", schemaName); @@ -1303,7 +1303,7 @@ public class PinotHelixResourceManager { * Helper method to update the schema, or throw SchemaBackwardIncompatibleException when the new schema is not * backward-compatible with the existing schema. */ - private void updateSchema(Schema schema, Schema oldSchema) + private void updateSchema(Schema schema, Schema oldSchema, boolean force) throws SchemaBackwardIncompatibleException { String schemaName = schema.getSchemaName(); schema.updateBooleanFieldsIfNeeded(oldSchema); @@ -1311,10 +1311,15 @@ public class PinotHelixResourceManager { LOGGER.info("New schema: {} is the same as the existing schema, not updating it", schemaName); return; } - if (!schema.isBackwardCompatibleWith(oldSchema)) { - // TODO: Add the reason of the incompatibility - throw new SchemaBackwardIncompatibleException( - String.format("New schema: %s is not backward-compatible with the existing schema", schemaName)); + boolean isBackwardCompatible = schema.isBackwardCompatibleWith(oldSchema); + if (!isBackwardCompatible) { + if (force) { + LOGGER.warn("Force updated schema: {} which is backward incompatible with the existing schema", oldSchema); + } else { + // TODO: Add the reason of the incompatibility + throw new SchemaBackwardIncompatibleException( + String.format("New schema: %s is not backward-compatible with the existing schema", schemaName)); + } } ZKMetadataProvider.setSchema(_propertyStore, schema); LOGGER.info("Updated schema: {}", schemaName); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java index 43ac573d75..f6306c9c92 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java @@ -69,7 +69,7 @@ public class PinotIngestionRestletResourceStatelessTest extends ControllerTest { Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("breed", FieldSpec.DataType.STRING) .addSingleValueDimension("name", FieldSpec.DataType.STRING).build(); - _helixResourceManager.addSchema(schema, true); + _helixResourceManager.addSchema(schema, true, false); _helixResourceManager.addTable(tableConfig); // Create a file with few records diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java index 99fcb9deef..6387f1ad76 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java @@ -77,7 +77,7 @@ public class PinotInstanceAssignmentRestletResourceTest { Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME) .addDateTime(TIME_COLUMN_NAME, DataType.INT, "1:DAYS:EPOCH", "1:DAYS") .build(); - TEST_INSTANCE.getHelixResourceManager().addSchema(schema, true); + TEST_INSTANCE.getHelixResourceManager().addSchema(schema, true, false); TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setBrokerTenant(BROKER_TENANT_NAME) .setServerTenant(SERVER_TENANT_NAME) diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSchemaRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSchemaRestletResourceTest.java index 1b5039fca0..b73f49b7e4 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSchemaRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSchemaRestletResourceTest.java @@ -119,21 +119,37 @@ public class PinotSchemaRestletResourceTest { // Change the column data type from STRING to BOOLEAN newColumnFieldSpec.setDataType(DataType.BOOLEAN); + // Update the schema with addSchema api and override on, force on + resp = ControllerTest.sendMultipartPostRequest(addSchemaUrl + "?force=true", schema.toSingleLineJsonString()); + Assert.assertEquals(resp.getStatusCode(), 200); + + // Change another column max length from default 512 to 2000 + newColumnFieldSpec2.setMaxLength(2000); + // Change another column default null value from default "null" to "0" + newColumnFieldSpec2.setDefaultNullValue("0"); + // Update the schema with addSchema api and override on resp = ControllerTest.sendMultipartPostRequest(addSchemaUrl, schema.toSingleLineJsonString()); Assert.assertEquals(resp.getStatusCode(), 200); - // Change another column data type from STRING to BOOLEAN - newColumnFieldSpec2.setDataType(DataType.BOOLEAN); + // Get the schema and verify the default null value and max length have been changed + remoteSchema = Schema.fromString(ControllerTest.sendGetRequest(getSchemaUrl)); + Assert.assertEquals(remoteSchema.getFieldSpecFor(newColumnFieldSpec2.getName()).getMaxLength(), 2000); + Assert.assertEquals(remoteSchema.getFieldSpecFor(newColumnFieldSpec2.getName()).getDefaultNullValue(), "0"); - // Update the schema with updateSchema api + // Change another column max length from 1000 + newColumnFieldSpec2.setMaxLength(1000); + // Change another column default null value from default "null" to "1" + newColumnFieldSpec2.setDefaultNullValue("1"); + + // Update the schema with updateSchema api and override on resp = ControllerTest.sendMultipartPutRequest(updateSchemaUrl, schema.toSingleLineJsonString()); Assert.assertEquals(resp.getStatusCode(), 200); - // Get the schema and verify the data types are not changed + // Get the schema and verify the default null value and max length have been changed remoteSchema = Schema.fromString(ControllerTest.sendGetRequest(getSchemaUrl)); - Assert.assertEquals(remoteSchema.getFieldSpecFor(newColumnFieldSpec.getName()).getDataType(), DataType.STRING); - Assert.assertEquals(remoteSchema.getFieldSpecFor(newColumnFieldSpec2.getName()).getDataType(), DataType.STRING); + Assert.assertEquals(remoteSchema.getFieldSpecFor(newColumnFieldSpec2.getName()).getMaxLength(), 1000); + Assert.assertEquals(remoteSchema.getFieldSpecFor(newColumnFieldSpec2.getName()).getDefaultNullValue(), "1"); // Add a new BOOLEAN column DimensionFieldSpec newColumnFieldSpec3 = new DimensionFieldSpec("newColumn3", DataType.BOOLEAN, true); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java index ef9910677d..d5566e24a9 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java @@ -90,7 +90,7 @@ public class ZKOperatorTest { new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN) .setStreamConfigs(getStreamConfigs()).setLLC(true).setNumReplicas(1).build(); - _resourceManager.addSchema(schema, false); + _resourceManager.addSchema(schema, false, false); _resourceManager.addTable(offlineTableConfig); _resourceManager.addTable(realtimeTableConfig); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java index 333cd91b80..9f2024dd99 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java @@ -73,7 +73,7 @@ public class TableCacheTest { Schema schema = new Schema.SchemaBuilder().setSchemaName(SCHEMA_NAME).addSingleValueDimension("testColumn", DataType.INT) .build(); - TEST_INSTANCE.getHelixResourceManager().addSchema(schema, false); + TEST_INSTANCE.getHelixResourceManager().addSchema(schema, false, false); // Wait for at most 10 seconds for the callback to add the schema to the cache TestUtils.waitForCondition(aVoid -> tableCache.getSchema(SCHEMA_NAME) != null, 10_000L, "Failed to add the schema to the cache"); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java index b953bd5ecf..d73816dbc6 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java @@ -536,4 +536,19 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable { // Sort fieldspecs based on their name return _name.compareTo(otherSpec._name); } + + /*** + * Return true if it is backward compatible with the old FieldSpec. + * Backward compatibility requires + * all other fields except DefaultNullValue and Max Length should be retained. + * + * @param oldFieldSpec + * @return + */ + public boolean isBackwardCompatibleWith(FieldSpec oldFieldSpec) { + + return EqualityUtils.isEqual(_name, oldFieldSpec._name) + && EqualityUtils.isEqual(_dataType, oldFieldSpec._dataType) + && EqualityUtils.isEqual(_isSingleValueField, oldFieldSpec._isSingleValueField); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java index 9c5a3cc42b..74832cf81f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java @@ -690,7 +690,10 @@ public final class Schema implements Serializable { /** * Check whether the current schema is backward compatible with oldSchema. - * Backward compatibility requires all columns and fieldSpec in oldSchema should be retained. + * + * Backward compatibility requires + * (1) all columns in oldSchema should be retained. + * (2) all column fieldSpecs should be backward compatible with the old ones. * * @param oldSchema old schema */ @@ -703,7 +706,8 @@ public final class Schema implements Serializable { } FieldSpec oldSchemaFieldSpec = entry.getValue(); FieldSpec fieldSpec = getFieldSpecFor(oldSchemaColumnName); - if (!fieldSpec.equals(oldSchemaFieldSpec)) { + + if (!fieldSpec.isBackwardCompatibleWith(oldSchemaFieldSpec)) { return false; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org