This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 14b00cfc6c2 [SPARK-43380][SQL] Fix Avro data type conversion issues to avoid producing incorrect results 14b00cfc6c2 is described below commit 14b00cfc6c2e477067fc9e7937e34b2aa53df1eb Author: zeruibao <zerui....@databricks.com> AuthorDate: Fri Jun 2 15:10:06 2023 -0700 [SPARK-43380][SQL] Fix Avro data type conversion issues to avoid producing incorrect results ### What changes were proposed in this pull request? We introduce the SQLConf `spark.sql.legacy.avro.allowReadingWithIncompatibleSchema` to prevent reading interval types as date or timestamp types to avoid getting corrupt dates as well as reading decimal types with incorrect precision. ### Why are the changes needed? We found the following issues with open source Avro: - Interval types can be read as date or timestamp types that would lead to wildly different results For example, `Duration.ofDays(1).plusSeconds(1)` will be read as `1972-09-27`, which is weird. - Decimal types can be read with lower precision, that leads to data being read as `null` instead of suggesting that a wider decimal format should be provided ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New unit test Closes #41052 from zeruibao/SPARK-43380-fix-avro-data-type-conversion. Lead-authored-by: zeruibao <zerui....@databricks.com> Co-authored-by: zeruibao <125398515+zerui...@users.noreply.github.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../apache/spark/sql/avro/AvroDeserializer.scala | 423 ++++++++++++--------- .../org/apache/spark/sql/avro/AvroSuite.scala | 132 +++++++ core/src/main/resources/error/error-classes.json | 10 + docs/sql-migration-guide.md | 1 + .../spark/sql/errors/QueryCompilationErrors.scala | 30 ++ .../org/apache/spark/sql/internal/SQLConf.scala | 12 + 6 files changed, 435 insertions(+), 173 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index aac979cddb2..78b1f01e2ef 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -35,7 +35,9 @@ import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArr import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.DataSourceUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -117,178 +119,260 @@ private[sql] class AvroDeserializer( val incompatibleMsg = errorPrefix + s"schema is incompatible (avroType = $avroType, sqlType = ${catalystType.sql})" - (avroType.getType, catalystType) match { - case (NULL, NullType) => (updater, ordinal, _) => - updater.setNullAt(ordinal) + val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA + val preventReadingIncorrectType = !SQLConf.get.getConf(confKey) + val logicalDataType = SchemaConverters.toSqlType(avroType).dataType + avroType.getType match { + case NULL => + (logicalDataType, catalystType) match { + case (_, NullType) => (updater, ordinal, _) => + updater.setNullAt(ordinal) + case _ => throw new IncompatibleSchemaException(incompatibleMsg) + } // TODO: we can avoid boxing if future version of avro provide primitive accessors. - case (BOOLEAN, BooleanType) => (updater, ordinal, value) => - updater.setBoolean(ordinal, value.asInstanceOf[Boolean]) - - case (INT, IntegerType) => (updater, ordinal, value) => - updater.setInt(ordinal, value.asInstanceOf[Int]) - - case (INT, DateType) => (updater, ordinal, value) => - updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int])) - - case (LONG, LongType) => (updater, ordinal, value) => - updater.setLong(ordinal, value.asInstanceOf[Long]) - - case (LONG, TimestampType) => avroType.getLogicalType match { - // For backward compatibility, if the Avro type is Long and it is not logical type - // (the `null` case), the value is processed as timestamp type with millisecond precision. - case null | _: TimestampMillis => (updater, ordinal, value) => - val millis = value.asInstanceOf[Long] - val micros = DateTimeUtils.millisToMicros(millis) - updater.setLong(ordinal, timestampRebaseFunc(micros)) - case _: TimestampMicros => (updater, ordinal, value) => - val micros = value.asInstanceOf[Long] - updater.setLong(ordinal, timestampRebaseFunc(micros)) - case other => throw new IncompatibleSchemaException(errorPrefix + - s"Avro logical type $other cannot be converted to SQL type ${TimestampType.sql}.") - } - - case (LONG, TimestampNTZType) => avroType.getLogicalType match { - // To keep consistent with TimestampType, if the Avro type is Long and it is not - // logical type (the `null` case), the value is processed as TimestampNTZ - // with millisecond precision. - case null | _: LocalTimestampMillis => (updater, ordinal, value) => - val millis = value.asInstanceOf[Long] - val micros = DateTimeUtils.millisToMicros(millis) - updater.setLong(ordinal, micros) - case _: LocalTimestampMicros => (updater, ordinal, value) => - val micros = value.asInstanceOf[Long] - updater.setLong(ordinal, micros) - case other => throw new IncompatibleSchemaException(errorPrefix + - s"Avro logical type $other cannot be converted to SQL type ${TimestampNTZType.sql}.") - } - - // Before we upgrade Avro to 1.8 for logical type support, spark-avro converts Long to Date. - // For backward compatibility, we still keep this conversion. - case (LONG, DateType) => (updater, ordinal, value) => - updater.setInt(ordinal, (value.asInstanceOf[Long] / MILLIS_PER_DAY).toInt) - - case (FLOAT, FloatType) => (updater, ordinal, value) => - updater.setFloat(ordinal, value.asInstanceOf[Float]) - - case (DOUBLE, DoubleType) => (updater, ordinal, value) => - updater.setDouble(ordinal, value.asInstanceOf[Double]) - - case (STRING, StringType) => (updater, ordinal, value) => - val str = value match { - case s: String => UTF8String.fromString(s) - case s: Utf8 => - val bytes = new Array[Byte](s.getByteLength) - System.arraycopy(s.getBytes, 0, bytes, 0, s.getByteLength) - UTF8String.fromBytes(bytes) + case BOOLEAN => + (logicalDataType, catalystType) match { + case (_, BooleanType) => (updater, ordinal, value) => + updater.setBoolean(ordinal, value.asInstanceOf[Boolean]) + case _ => throw new IncompatibleSchemaException(incompatibleMsg) } - updater.set(ordinal, str) - - case (ENUM, StringType) => (updater, ordinal, value) => - updater.set(ordinal, UTF8String.fromString(value.toString)) - - case (FIXED, BinaryType) => (updater, ordinal, value) => - updater.set(ordinal, value.asInstanceOf[GenericFixed].bytes().clone()) - - case (BYTES, BinaryType) => (updater, ordinal, value) => - val bytes = value match { - case b: ByteBuffer => - val bytes = new Array[Byte](b.remaining) - b.get(bytes) - // Do not forget to reset the position - b.rewind() - bytes - case b: Array[Byte] => b - case other => - throw new RuntimeException(errorPrefix + s"$other is not a valid avro binary.") + case INT => + (logicalDataType, catalystType) match { + case (IntegerType, IntegerType) => (updater, ordinal, value) => + updater.setInt(ordinal, value.asInstanceOf[Int]) + case (IntegerType, DateType) => (updater, ordinal, value) => + updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int])) + case (DateType, DateType) => (updater, ordinal, value) => + updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int])) + case (_: YearMonthIntervalType, _: YearMonthIntervalType) => (updater, ordinal, value) => + updater.setInt(ordinal, value.asInstanceOf[Int]) + case (_: YearMonthIntervalType, _) if preventReadingIncorrectType => + throw QueryCompilationErrors.avroIncorrectTypeError( + toFieldStr(avroPath), toFieldStr(catalystPath), + logicalDataType.catalogString, catalystType.catalogString, confKey.key) + case _ if !preventReadingIncorrectType => (updater, ordinal, value) => + updater.setInt(ordinal, value.asInstanceOf[Int]) + case _ => throw new IncompatibleSchemaException(incompatibleMsg) } - updater.set(ordinal, bytes) - - case (FIXED, _: DecimalType) => (updater, ordinal, value) => - val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal] - val bigDecimal = decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, d) - val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale) - updater.setDecimal(ordinal, decimal) - - case (BYTES, _: DecimalType) => (updater, ordinal, value) => - val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal] - val bigDecimal = decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, d) - val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale) - updater.setDecimal(ordinal, decimal) - - case (RECORD, st: StructType) => - // Avro datasource doesn't accept filters with nested attributes. See SPARK-32328. - // We can always return `false` from `applyFilters` for nested records. - val writeRecord = - getRecordWriter(avroType, st, avroPath, catalystPath, applyFilters = _ => false) - (updater, ordinal, value) => - val row = new SpecificInternalRow(st) - writeRecord(new RowUpdater(row), value.asInstanceOf[GenericRecord]) - updater.set(ordinal, row) - - case (ARRAY, ArrayType(elementType, containsNull)) => - val avroElementPath = avroPath :+ "element" - val elementWriter = newWriter(avroType.getElementType, elementType, - avroElementPath, catalystPath :+ "element") - (updater, ordinal, value) => - val collection = value.asInstanceOf[java.util.Collection[Any]] - val result = createArrayData(elementType, collection.size()) - val elementUpdater = new ArrayDataUpdater(result) - - var i = 0 - val iter = collection.iterator() - while (iter.hasNext) { - val element = iter.next() - if (element == null) { - if (!containsNull) { - throw new RuntimeException( - s"Array value at path ${toFieldStr(avroElementPath)} is not allowed to be null") - } else { - elementUpdater.setNullAt(i) - } - } else { - elementWriter(elementUpdater, i, element) - } - i += 1 + case LONG => + (logicalDataType, catalystType) match { + case (LongType, LongType) => (updater, ordinal, value) => + updater.setLong(ordinal, value.asInstanceOf[Long]) + case (LongType, TimestampType) + | (TimestampType, TimestampType) + |(TimestampNTZType, TimestampType) => avroType.getLogicalType match { + // For backward compatibility, if the Avro type is Long and it is not logical type + // (the `null` case), the value is processed as timestamp type with + // millisecond precision. + case null | _: TimestampMillis => (updater, ordinal, value) => + val millis = value.asInstanceOf[Long] + val micros = DateTimeUtils.millisToMicros(millis) + updater.setLong(ordinal, timestampRebaseFunc(micros)) + case _: TimestampMicros => (updater, ordinal, value) => + val micros = value.asInstanceOf[Long] + updater.setLong(ordinal, timestampRebaseFunc(micros)) + case other => throw new IncompatibleSchemaException(errorPrefix + + s"Avro logical type $other cannot be converted to SQL type ${TimestampType.sql}.") } - - updater.set(ordinal, result) - - case (MAP, MapType(keyType, valueType, valueContainsNull)) if keyType == StringType => - val keyWriter = newWriter(SchemaBuilder.builder().stringType(), StringType, - avroPath :+ "key", catalystPath :+ "key") - val valueWriter = newWriter(avroType.getValueType, valueType, - avroPath :+ "value", catalystPath :+ "value") - (updater, ordinal, value) => - val map = value.asInstanceOf[java.util.Map[AnyRef, AnyRef]] - val keyArray = createArrayData(keyType, map.size()) - val keyUpdater = new ArrayDataUpdater(keyArray) - val valueArray = createArrayData(valueType, map.size()) - val valueUpdater = new ArrayDataUpdater(valueArray) - val iter = map.entrySet().iterator() - var i = 0 - while (iter.hasNext) { - val entry = iter.next() - assert(entry.getKey != null) - keyWriter(keyUpdater, i, entry.getKey) - if (entry.getValue == null) { - if (!valueContainsNull) { - throw new RuntimeException( - s"Map value at path ${toFieldStr(avroPath :+ "value")} is not allowed to be null") - } else { - valueUpdater.setNullAt(i) - } - } else { - valueWriter(valueUpdater, i, entry.getValue) - } - i += 1 + case (LongType, TimestampNTZType) + | (TimestampNTZType, TimestampNTZType) + | (TimestampType, TimestampNTZType) => avroType.getLogicalType match { + // To keep consistent with TimestampType, if the Avro type is Long and it is not + // logical type (the `null` case), the value is processed as TimestampNTZ + // with millisecond precision. + case null | _: LocalTimestampMillis => (updater, ordinal, value) => + val millis = value.asInstanceOf[Long] + val micros = DateTimeUtils.millisToMicros(millis) + updater.setLong(ordinal, micros) + case _: LocalTimestampMicros => (updater, ordinal, value) => + val micros = value.asInstanceOf[Long] + updater.setLong(ordinal, micros) + case other => throw new IncompatibleSchemaException(errorPrefix + + s"Avro logical type $other cannot be converted to SQL type ${TimestampNTZType.sql}.") } - - // The Avro map will never have null or duplicated map keys, it's safe to create a - // ArrayBasedMapData directly here. - updater.set(ordinal, new ArrayBasedMapData(keyArray, valueArray)) - - case (UNION, _) => + // Before we upgrade Avro to 1.8 for logical type support, + // spark-avro converts Long to Date. + // For backward compatibility, we still keep this conversion. + case (LongType, DateType) => (updater, ordinal, value) => + updater.setInt(ordinal, (value.asInstanceOf[Long] / MILLIS_PER_DAY).toInt) + case (DateType, DateType) => (updater, ordinal, value) => + updater.setLong(ordinal, value.asInstanceOf[Long]) + case (_: DayTimeIntervalType, _: DayTimeIntervalType) => (updater, ordinal, value) => + updater.setLong(ordinal, value.asInstanceOf[Long]) + case (_: DayTimeIntervalType, _) if preventReadingIncorrectType => + throw QueryCompilationErrors.avroIncorrectTypeError( + toFieldStr(avroPath), toFieldStr(catalystPath), + logicalDataType.catalogString, catalystType.catalogString, confKey.key) + case (_: DayTimeIntervalType, DateType) => (updater, ordinal, value) => + updater.setInt(ordinal, (value.asInstanceOf[Long] / MILLIS_PER_DAY).toInt) + case _ if !preventReadingIncorrectType => (updater, ordinal, value) => + updater.setLong(ordinal, value.asInstanceOf[Long]) + case _ => throw new IncompatibleSchemaException(incompatibleMsg) + } + case FLOAT => + (logicalDataType, catalystType) match { + case (_, FloatType) => (updater, ordinal, value) => + updater.setFloat(ordinal, value.asInstanceOf[Float]) + case _ => throw new IncompatibleSchemaException(incompatibleMsg) + } + case DOUBLE => + (logicalDataType, catalystType) match { + case (_, DoubleType) => (updater, ordinal, value) => + updater.setDouble(ordinal, value.asInstanceOf[Double]) + case _ => throw new IncompatibleSchemaException(incompatibleMsg) + } + case STRING => + (logicalDataType, catalystType) match { + case (_, StringType) => (updater, ordinal, value) => + val str = value match { + case s: String => UTF8String.fromString(s) + case s: Utf8 => + val bytes = new Array[Byte](s.getByteLength) + System.arraycopy(s.getBytes, 0, bytes, 0, s.getByteLength) + UTF8String.fromBytes(bytes) + } + updater.set(ordinal, str) + case _ => throw new IncompatibleSchemaException(incompatibleMsg) + } + case ENUM => + (logicalDataType, catalystType) match { + case (_, StringType) => (updater, ordinal, value) => + updater.set(ordinal, UTF8String.fromString(value.toString)) + case _ => throw new IncompatibleSchemaException(incompatibleMsg) + } + case FIXED => + (logicalDataType, catalystType) match { + case (_, BinaryType) => (updater, ordinal, value) => + updater.set(ordinal, value.asInstanceOf[GenericFixed].bytes().clone()) + case (_, dt: DecimalType) => + val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal] + if (preventReadingIncorrectType && + d.getPrecision - d.getScale > dt.precision - dt.scale) { + throw QueryCompilationErrors.avroLowerPrecisionError(toFieldStr(avroPath), + toFieldStr(catalystPath), logicalDataType.catalogString, + dt.catalogString, confKey.key) + } + (updater, ordinal, value) => + val bigDecimal = + decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, d) + val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale) + updater.setDecimal(ordinal, decimal) + case _ => throw new IncompatibleSchemaException(incompatibleMsg) + } + case BYTES => + (logicalDataType, catalystType) match { + case (_, BinaryType) => (updater, ordinal, value) => + val bytes = value match { + case b: ByteBuffer => + val bytes = new Array[Byte](b.remaining) + b.get(bytes) + // Do not forget to reset the position + b.rewind() + bytes + case b: Array[Byte] => b + case other => + throw new RuntimeException(errorPrefix + s"$other is not a valid avro binary.") + } + updater.set(ordinal, bytes) + case (_, dt: DecimalType) => + val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal] + if (preventReadingIncorrectType && + d.getPrecision - d.getScale > dt.precision - dt.scale) { + throw QueryCompilationErrors.avroLowerPrecisionError(toFieldStr(avroPath), + toFieldStr(catalystPath), logicalDataType.catalogString, + dt.catalogString, confKey.key) + } + (updater, ordinal, value) => + val bigDecimal = decimalConversions + .fromBytes(value.asInstanceOf[ByteBuffer], avroType, d) + val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale) + updater.setDecimal(ordinal, decimal) + case _ => throw new IncompatibleSchemaException(incompatibleMsg) + } + case RECORD => + (logicalDataType, catalystType) match { + case (_, st: StructType) => + // Avro datasource doesn't accept filters with nested attributes. See SPARK-32328. + // We can always return `false` from `applyFilters` for nested records. + val writeRecord = + getRecordWriter(avroType, st, avroPath, catalystPath, applyFilters = _ => false) + (updater, ordinal, value) => + val row = new SpecificInternalRow(st) + writeRecord(new RowUpdater(row), value.asInstanceOf[GenericRecord]) + updater.set(ordinal, row) + case _ => throw new IncompatibleSchemaException(incompatibleMsg) + } + case ARRAY => + (logicalDataType, catalystType) match { + case (_, ArrayType(elementType, containsNull)) => + val avroElementPath = avroPath :+ "element" + val elementWriter = newWriter(avroType.getElementType, elementType, + avroElementPath, catalystPath :+ "element") + (updater, ordinal, value) => + val collection = value.asInstanceOf[java.util.Collection[Any]] + val result = createArrayData(elementType, collection.size()) + val elementUpdater = new ArrayDataUpdater(result) + + var i = 0 + val iter = collection.iterator() + while (iter.hasNext) { + val element = iter.next() + if (element == null) { + if (!containsNull) { + throw new RuntimeException( + s"Array value at path ${toFieldStr(avroElementPath)}" + + s" is not allowed to be null") + } else { + elementUpdater.setNullAt(i) + } + } else { + elementWriter(elementUpdater, i, element) + } + i += 1 + } + updater.set(ordinal, result) + case _ => throw new IncompatibleSchemaException(incompatibleMsg) + } + case MAP => + (logicalDataType, catalystType) match { + case (_, MapType(keyType, valueType, valueContainsNull)) + if keyType == StringType => + val keyWriter = newWriter(SchemaBuilder.builder().stringType(), StringType, + avroPath :+ "key", catalystPath :+ "key") + val valueWriter = newWriter(avroType.getValueType, valueType, + avroPath :+ "value", catalystPath :+ "value") + (updater, ordinal, value) => + val map = value.asInstanceOf[java.util.Map[AnyRef, AnyRef]] + val keyArray = createArrayData(keyType, map.size()) + val keyUpdater = new ArrayDataUpdater(keyArray) + val valueArray = createArrayData(valueType, map.size()) + val valueUpdater = new ArrayDataUpdater(valueArray) + val iter = map.entrySet().iterator() + var i = 0 + while (iter.hasNext) { + val entry = iter.next() + assert(entry.getKey != null) + keyWriter(keyUpdater, i, entry.getKey) + if (entry.getValue == null) { + if (!valueContainsNull) { + throw new RuntimeException( + s"Map value at path ${toFieldStr(avroPath :+ "value")}" + + s" is not allowed to be null") + } else { + valueUpdater.setNullAt(i) + } + } else { + valueWriter(valueUpdater, i, entry.getValue) + } + i += 1 + } + // The Avro map will never have null or duplicated map keys, it's safe to create a + // ArrayBasedMapData directly here. + updater.set(ordinal, new ArrayBasedMapData(keyArray, valueArray)) + case _ => throw new IncompatibleSchemaException(incompatibleMsg) + } + case UNION => val nonNullTypes = nonNullUnionBranches(avroType) val nonNullAvroType = Schema.createUnion(nonNullTypes.asJava) if (nonNullTypes.nonEmpty) { @@ -332,13 +416,6 @@ private[sql] class AvroDeserializer( } else { (updater, ordinal, _) => updater.setNullAt(ordinal) } - - case (INT, _: YearMonthIntervalType) => (updater, ordinal, value) => - updater.setInt(ordinal, value.asInstanceOf[Int]) - - case (LONG, _: DayTimeIntervalType) => (updater, ordinal, value) => - updater.setLong(ordinal, value.asInstanceOf[Long]) - case _ => throw new IncompatibleSchemaException(incompatibleMsg) } } diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 97260e6eea6..7ca34388523 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -32,6 +32,7 @@ import org.apache.avro.file.{DataFileReader, DataFileWriter} import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord} import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed} import org.apache.commons.io.FileUtils +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, SparkUpgradeException} import org.apache.spark.TestUtils.assertExceptionMsg @@ -814,6 +815,137 @@ abstract class AvroSuite } } + test("SPARK-43380: Fix Avro data type conversion" + + " of decimal type to avoid producing incorrect results") { + withTempPath { path => + val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key + sql("SELECT 13.1234567890 a").write.format("avro").save(path.toString) + // With the flag disabled, we will throw an exception if there is a mismatch + withSQLConf(confKey -> "false") { + val e = intercept[SparkException] { + spark.read.schema("a DECIMAL(4, 3)").format("avro").load(path.toString).collect() + } + ExceptionUtils.getRootCause(e) match { + case ex: AnalysisException => + checkError( + exception = ex, + errorClass = "AVRO_LOWER_PRECISION", + parameters = Map("avroPath" -> "field 'a'", + "sqlPath" -> "field 'a'", + "avroType" -> "decimal\\(12,10\\)", + "sqlType" -> "\"DECIMAL\\(4,3\\)\"", + "key" -> SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key), + matchPVals = true + ) + case other => + fail(s"Received unexpected exception", other) + } + } + // The following used to work, so it should still work with the flag enabled + checkAnswer( + spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString), + Row(new java.math.BigDecimal("13.123")) + ) + withSQLConf(confKey -> "true") { + // With the flag enabled, we return a null silently, which isn't great + checkAnswer( + spark.read.schema("a DECIMAL(4, 3)").format("avro").load(path.toString), + Row(null) + ) + checkAnswer( + spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString), + Row(new java.math.BigDecimal("13.123")) + ) + } + } + } + + test("SPARK-43380: Fix Avro data type conversion" + + " of DayTimeIntervalType to avoid producing incorrect results") { + withTempPath { path => + val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key + val schema = StructType(Array(StructField("a", DayTimeIntervalType(), false))) + val data = Seq(Row(java.time.Duration.ofDays(1).plusSeconds(1))) + + val df = spark.createDataFrame(sparkContext.parallelize(data), schema) + df.write.format("avro").save(path.getCanonicalPath) + + withSQLConf(confKey -> "false") { + Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType => + val e = intercept[SparkException] { + spark.read.schema(s"a $sqlType").format("avro").load(path.toString).collect() + } + + ExceptionUtils.getRootCause(e) match { + case ex: AnalysisException => + checkError( + exception = ex, + errorClass = "AVRO_INCORRECT_TYPE", + parameters = Map("avroPath" -> "field 'a'", + "sqlPath" -> "field 'a'", + "avroType" -> "interval day to second", + "sqlType" -> s""""$sqlType"""", + "key" -> SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key), + matchPVals = true + ) + case other => + fail(s"Received unexpected exception", other) + } + } + } + + withSQLConf(confKey -> "true") { + // Allow conversion and do not need to check result + spark.read.schema("a Date").format("avro").load(path.toString) + spark.read.schema("a timestamp").format("avro").load(path.toString) + spark.read.schema("a timestamp_ntz").format("avro").load(path.toString) + } + } + } + + test("SPARK-43380: Fix Avro data type conversion" + + " of YearMonthIntervalType to avoid producing incorrect results") { + withTempPath { path => + val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key + val schema = StructType(Array(StructField("a", YearMonthIntervalType(), false))) + val data = Seq(Row(java.time.Period.of(1, 1, 0))) + + val df = spark.createDataFrame(sparkContext.parallelize(data), schema) + df.write.format("avro").save(path.getCanonicalPath) + + withSQLConf(confKey -> "false") { + Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType => + val e = intercept[SparkException] { + spark.read.schema(s"a $sqlType").format("avro").load(path.toString).collect() + } + + ExceptionUtils.getRootCause(e) match { + case ex: AnalysisException => + checkError( + exception = ex, + errorClass = "AVRO_INCORRECT_TYPE", + parameters = Map("avroPath" -> "field 'a'", + "sqlPath" -> "field 'a'", + "avroType" -> "interval year to month", + "sqlType" -> s""""$sqlType"""", + "key" -> SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key), + matchPVals = true + ) + case other => + fail(s"Received unexpected exception", other) + } + } + } + + withSQLConf(confKey -> "true") { + // Allow conversion and do not need to check result + spark.read.schema("a Date").format("avro").load(path.toString) + spark.read.schema("a timestamp").format("avro").load(path.toString) + spark.read.schema("a timestamp_ntz").format("avro").load(path.toString) + } + } + } + test("converting some specific sparkSQL types to avro") { withTempPath { tempDir => val testSchema = StructType(Seq( diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 8d280e40922..c73223fba39 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -64,6 +64,16 @@ } } }, + "AVRO_INCORRECT_TYPE" : { + "message" : [ + "Cannot convert Avro <avroPath> to SQL <sqlPath> because the original encoded data type is <avroType>, however you're trying to read the field as <sqlType>, which would lead to an incorrect answer. To allow reading this field, enable the SQL configuration: <key>." + ] + }, + "AVRO_LOWER_PRECISION" : { + "message" : [ + "Cannot convert Avro <avroPath> to SQL <sqlPath> because the original encoded data type is <avroType>, however you're trying to read the field as <sqlType>, which leads to data being read as null. Please provide a wider decimal type to get the correct result. To allow reading null to this field, enable the SQL configuration: <key>." + ] + }, "BINARY_ARITHMETIC_OVERFLOW" : { "message" : [ "<value1> <symbol> <value2> caused overflow." diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 58627801fc7..6c05514d242 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -26,6 +26,7 @@ license: | - Since Spark 3.5, the JDBC options related to DS V2 pushdown are `true` by default. These options include: `pushDownAggregate`, `pushDownLimit`, `pushDownOffset` and `pushDownTableSample`. To restore the legacy behavior, please set them to `false`. e.g. set `spark.sql.catalog.your_catalog_name.pushDownAggregate` to `false`. - Since Spark 3.5, Spark thrift server will interrupt task when canceling a running statement. To restore the previous behavior, set `spark.sql.thriftServer.interruptOnCancel` to `false`. +- Since Spark 3.5, the Avro will throw `AnalysisException` when reading Interval types as Date or Timestamp types, or reading Decimal types with lower precision. To restore the legacy behavior, set `spark.sql.legacy.avro.allowIncompatibleSchema` to `true` ## Upgrading from Spark SQL 3.3 to 3.4 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index f4ca9147f91..94b8ee25dd2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3537,4 +3537,34 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { errorClass = "CANNOT_RENAME_ACROSS_SCHEMA", messageParameters = Map("type" -> "table") ) } + + def avroIncorrectTypeError( + avroPath: String, sqlPath: String, avroType: String, + sqlType: String, key: String): Throwable = { + new AnalysisException( + errorClass = "AVRO_INCORRECT_TYPE", + messageParameters = Map( + "avroPath" -> avroPath, + "sqlPath" -> sqlPath, + "avroType" -> avroType, + "sqlType" -> toSQLType(sqlType), + "key" -> key + ) + ) + } + + def avroLowerPrecisionError( + avroPath: String, sqlPath: String, avroType: String, + sqlType: String, key: String): Throwable = { + new AnalysisException( + errorClass = "AVRO_LOWER_PRECISION", + messageParameters = Map( + "avroPath" -> avroPath, + "sqlPath" -> sqlPath, + "avroType" -> avroType, + "sqlType" -> toSQLType(sqlType), + "key" -> key + ) + ) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e8185202a7e..8d1e73cb86f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4209,6 +4209,18 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA = + buildConf("spark.sql.legacy.avro.allowIncompatibleSchema") + .internal() + .doc("When set to false, if types in Avro are encoded in the same format, but " + + "the type in the Avro schema explicitly says that the data types are different, " + + "reject reading the data type in the format to avoid returning incorrect results. " + + "When set to true, it restores the legacy behavior of allow reading the data in the" + + " format, which may return incorrect results.") + .version("3.5.0") + .booleanConf + .createWithDefault(false) + val LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME = buildConf("spark.sql.legacy.v1IdentifierNoCatalog") .internal() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org