This is an automated email from the ASF dual-hosted git repository. sajjad pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 86f923de45 Support Avro's Fixed data type (#9642) 86f923de45 is described below commit 86f923de4533c5b1573854fe5bfca73e15d98604 Author: Sajjad Moradi <moradi.saj...@gmail.com> AuthorDate: Mon Nov 28 15:26:44 2022 -0800 Support Avro's Fixed data type (#9642) --- .../queries/JsonIngestionFromAvroQueriesTest.java | 54 +++-- .../inputformat/avro/AvroRecordExtractor.java | 4 + .../inputformat/avro/AvroRecordExtractorTest.java | 226 +++++++++++++++++++++ .../recordtransformer/DataTypeTransformer.java | 1 + 4 files changed, 268 insertions(+), 17 deletions(-) diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java index 16d17c2cb2..b53f3a35d4 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java @@ -21,6 +21,7 @@ package org.apache.pinot.queries; import com.google.common.collect.Lists; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -30,12 +31,15 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.common.function.scalar.StringFunctions; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; @@ -75,12 +79,14 @@ public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest { private static final String JSON_COLUMN_1 = "jsonColumn1"; // for testing RECORD, ARRAY, MAP, UNION private static final String JSON_COLUMN_2 = "jsonColumn2"; // for testing ENUM private static final String JSON_COLUMN_3 = "jsonColumn3"; // for testing FIXED + private static final String JSON_COLUMN_4 = "jsonColumn4"; // for testing BYTES private static final String STRING_COLUMN = "stringColumn"; private static final org.apache.pinot.spi.data.Schema SCHEMA = new org.apache.pinot.spi.data.Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT) .addSingleValueDimension(JSON_COLUMN_1, FieldSpec.DataType.JSON) .addSingleValueDimension(JSON_COLUMN_2, FieldSpec.DataType.JSON) .addSingleValueDimension(JSON_COLUMN_3, FieldSpec.DataType.JSON) + .addSingleValueDimension(JSON_COLUMN_4, FieldSpec.DataType.JSON) .addSingleValueDimension(STRING_COLUMN, FieldSpec.DataType.STRING).build(); private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); @@ -105,13 +111,14 @@ public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest { /** @return {@link GenericRow} representing a row in Pinot table. */ private static GenericRow createTableRecord(int intValue, String stringValue, Object jsonValue, - GenericData.EnumSymbol enumValue, GenericData.Fixed fixedValue) { + GenericData.EnumSymbol enumValue, GenericData.Fixed fixedValue, byte[] bytesValue) { GenericRow record = new GenericRow(); record.putValue(INT_COLUMN, intValue); record.putValue(STRING_COLUMN, stringValue); record.putValue(JSON_COLUMN_1, jsonValue); record.putValue(JSON_COLUMN_2, enumValue); record.putValue(JSON_COLUMN_3, fixedValue); + record.putValue(JSON_COLUMN_4, ByteBuffer.wrap(bytesValue)); return record; } @@ -158,37 +165,39 @@ public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest { new Field(JSON_COLUMN_1, createUnion(createArray(create(Type.STRING)), createMap(create(Type.STRING)), createRecordSchema(), create(Type.STRING), create(Type.NULL))), new Field(JSON_COLUMN_2, enumSchema), - new Field(JSON_COLUMN_3, fixedSchema)); + new Field(JSON_COLUMN_3, fixedSchema), + new Field(JSON_COLUMN_4, create(Type.BYTES))); avroSchema.setFields(fields); List<GenericRow> inputRecords = new ArrayList<>(); // Insert ARRAY inputRecords.add( createTableRecord(1, "daffy duck", Arrays.asList("this", "is", "a", "test"), createEnumField(enumSchema, "UP"), - createFixedField(fixedSchema, 1))); + createFixedField(fixedSchema, 1), new byte[] {0, 0, 0, 1})); // Insert MAP inputRecords.add( createTableRecord(2, "mickey mouse", createMapField(new Pair[]{Pair.of("a", "1"), Pair.of("b", "2")}), - createEnumField(enumSchema, "DOWN"), createFixedField(fixedSchema, 2))); + createEnumField(enumSchema, "DOWN"), createFixedField(fixedSchema, 2), new byte[] {0, 0, 0, 2})); inputRecords.add( createTableRecord(3, "donald duck", createMapField(new Pair[]{Pair.of("a", "1"), Pair.of("b", "2")}), - createEnumField(enumSchema, "UP"), createFixedField(fixedSchema, 3))); + createEnumField(enumSchema, "UP"), createFixedField(fixedSchema, 3), new byte[] {0, 0, 0, 3})); inputRecords.add( createTableRecord(4, "scrooge mcduck", createMapField(new Pair[]{Pair.of("a", "1"), Pair.of("b", "2")}), - createEnumField(enumSchema, "LEFT"), createFixedField(fixedSchema, 4))); + createEnumField(enumSchema, "LEFT"), createFixedField(fixedSchema, 4), new byte[] {0, 0, 0, 4})); // insert RECORD inputRecords.add(createTableRecord(5, "minney mouse", createRecordField("id", 1, "name", "minney"), - createEnumField(enumSchema, "RIGHT"), createFixedField(fixedSchema, 5))); + createEnumField(enumSchema, "RIGHT"), createFixedField(fixedSchema, 5), new byte[] {0, 0, 0, 5})); // Insert simple Java String (gets converted into JSON value) inputRecords.add( - createTableRecord(6, "pluto", "test", createEnumField(enumSchema, "DOWN"), createFixedField(fixedSchema, 6))); + createTableRecord(6, "pluto", "test", createEnumField(enumSchema, "DOWN"), createFixedField(fixedSchema, 6), + new byte[] {0, 0, 0, 6})); // Insert JSON string (gets converted into JSON document) inputRecords.add( createTableRecord(7, "scooby doo", "{\"name\":\"scooby\",\"id\":7}", createEnumField(enumSchema, "UP"), - createFixedField(fixedSchema, 7))); + createFixedField(fixedSchema, 7), new byte[] {0, 0, 0, 7})); try (DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { fileWriter.create(avroSchema, AVRO_DATA_FILE); @@ -199,6 +208,7 @@ public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest { record.put(JSON_COLUMN_1, inputRecord.getValue(JSON_COLUMN_1)); record.put(JSON_COLUMN_2, inputRecord.getValue(JSON_COLUMN_2)); record.put(JSON_COLUMN_3, inputRecord.getValue(JSON_COLUMN_3)); + record.put(JSON_COLUMN_4, inputRecord.getValue(JSON_COLUMN_4)); fileWriter.append(record); } } @@ -212,6 +222,7 @@ public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest { set.add(JSON_COLUMN_1); set.add(JSON_COLUMN_2); set.add(JSON_COLUMN_3); + set.add(JSON_COLUMN_4); AvroRecordReader avroRecordReader = new AvroRecordReader(); avroRecordReader.init(AVRO_DATA_FILE, set, null); return avroRecordReader; @@ -316,19 +327,28 @@ public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest { /** Verify that ingestion from avro FIXED type field (jsonColumn3) to Pinot JSON column worked fine. */ @Test public void testSimpleSelectOnFixedJsonColumn() { - Operator<SelectionResultsBlock> operator = getOperator("select jsonColumn3 FROM testTable"); + testByteArray("select jsonColumn3 FROM testTable"); + } + + /** Verify that ingestion from avro BYTES type field (jsonColumn4) to Pinot JSON column worked fine. */ + @Test + public void testSimpleSelectOnBytesJsonColumn() { + testByteArray("select jsonColumn4 FROM testTable"); + } + + private void testByteArray(String query) { + Operator<SelectionResultsBlock> operator = getOperator(query); SelectionResultsBlock block = operator.nextBlock(); Collection<Object[]> rows = block.getRows(); Assert.assertEquals(block.getDataSchema().getColumnDataType(0), DataSchema.ColumnDataType.JSON); - List<String> expecteds = - Arrays.asList("[[0,0,0,1]]", "[[0,0,0,2]]", "[[0,0,0,3]]", "[[0,0,0,4]]", "[[0,0,0,5]]", "[[0,0,0,6]]", - "[[0,0,0,7]]"); - int index = 0; + List<String> expecteds = IntStream.range(1, 8) + .mapToObj(i -> new byte[] {0, 0, 0, (byte) i}) + .map(byteArray -> "[\"" + StringFunctions.toBase64(byteArray) + "\"]") + .collect(Collectors.toList()); - Iterator<Object[]> iterator = rows.iterator(); - while (iterator.hasNext()) { - Object[] row = iterator.next(); + int index = 0; + for (Object[] row : rows) { Assert.assertEquals(Arrays.toString(row), expecteds.get(index++)); } } diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java index 1cb4d04721..d8a757def8 100644 --- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java +++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractor.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; import org.apache.pinot.spi.data.readers.BaseRecordExtractor; import org.apache.pinot.spi.data.readers.GenericRow; @@ -136,6 +137,9 @@ public class AvroRecordExtractor extends BaseRecordExtractor<GenericRecord> { if (value instanceof Instant) { return Timestamp.from((Instant) value); } + if (value instanceof GenericFixed) { + return ((GenericFixed) value).bytes(); + } // LocalDate, LocalTime and UUID are returned as the ::toString version of the logical type return super.convertSingleValue(value); } diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java index e748bc68e4..bda7fd9909 100644 --- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java +++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java @@ -43,6 +43,10 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.SchemaStore; +import org.apache.avro.specific.SpecificData; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.readers.AbstractRecordExtractorTest; import org.apache.pinot.spi.data.readers.GenericRow; @@ -243,4 +247,226 @@ public class AvroRecordExtractorTest extends AbstractRecordExtractorTest { Assert.assertEquals(avroRecordExtractor.convertSingleValue(byteBuffer), content); } } + + @Test + public void testGenericFixedDataType() { + Schema avroSchema = createRecord("EventRecord", null, null, false); + Schema fixedSchema = createFixed("FixedSchema", "", "", 4); + avroSchema.setFields(Lists.newArrayList(new Schema.Field("fixedData", fixedSchema))); + GenericRecord genericRecord = new GenericData.Record(avroSchema); + genericRecord.put("fixedData", new GenericData.Fixed(fixedSchema, new byte[]{0, 1, 2, 3})); + GenericRow genericRow = new GenericRow(); + AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor(); + avroRecordExtractor.init(null, null); + avroRecordExtractor.extract(genericRecord, genericRow); + Assert.assertEquals(genericRow.getValue("fixedData"), new byte[]{0, 1, 2, 3}); + } + + @Test + public void testSpecificFixedDataType() { + EventRecord specificRecord = new EventRecord(new FixedSchema(new byte[]{0, 1, 2, 3})); + GenericRow outputGenericRow = new GenericRow(); + AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor(); + avroRecordExtractor.init(null, null); + avroRecordExtractor.extract(specificRecord, outputGenericRow); + Assert.assertEquals(outputGenericRow.getValue("fixedData"), new byte[]{0, 1, 2, 3}); + } + + /** + * SpecificRecord created for testing Fixed data type + */ + static class EventRecord extends org.apache.avro.specific.SpecificRecordBase + implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = 5451592186784305712L; + public static final org.apache.avro.Schema SCHEMA = new org.apache.avro.Schema.Parser().parse( + "{\"type\":\"record\",\"name\":\"EventRecord\",\"fields\":[{" + + "\"name\":\"fixedData\",\"type\":{\"type\":\"fixed\",\"name\":\"FixedSchema\",\"doc\":\"\",\"size\":4}}" + + "]}"); + + public static org.apache.avro.Schema getClassSchema() { + return SCHEMA; + } + + private static final SpecificData MODEL = new SpecificData(); + + private static final BinaryMessageEncoder<EventRecord> ENCODER = + new BinaryMessageEncoder<EventRecord>(MODEL, SCHEMA); + + private static final BinaryMessageDecoder<EventRecord> DECODER = + new BinaryMessageDecoder<EventRecord>(MODEL, SCHEMA); + + public static BinaryMessageEncoder<EventRecord> getEncoder() { + return ENCODER; + } + + public static BinaryMessageDecoder<EventRecord> getDecoder() { + return DECODER; + } + + public static BinaryMessageDecoder<EventRecord> createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder<EventRecord>(MODEL, SCHEMA, resolver); + } + + public java.nio.ByteBuffer toByteBuffer() + throws java.io.IOException { + return ENCODER.encode(this); + } + + public static EventRecord fromByteBuffer(java.nio.ByteBuffer b) + throws java.io.IOException { + return DECODER.decode(b); + } + + private FixedSchema _fixedData; + + public EventRecord() { + } + + public EventRecord(FixedSchema fixedData) { + _fixedData = fixedData; + } + + public org.apache.avro.specific.SpecificData getSpecificData() { + return MODEL; + } + + public org.apache.avro.Schema getSchema() { + return SCHEMA; + } + + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field) { + switch (field) { + case 0: + return _fixedData; + default: + throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value = "unchecked") + public void put(int field, java.lang.Object value) { + switch (field) { + case 0: + _fixedData = (FixedSchema) value; + break; + default: + throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + public FixedSchema getFixedData() { + return _fixedData; + } + + public void setFixedData(FixedSchema value) { + _fixedData = value; + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter<EventRecord> WRITER = + (org.apache.avro.io.DatumWriter<EventRecord>) MODEL.createDatumWriter(SCHEMA); + + @Override + public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER.write(this, SpecificData.getEncoder(out)); + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader<EventRecord> READER = + (org.apache.avro.io.DatumReader<EventRecord>) MODEL.createDatumReader(SCHEMA); + + @Override + public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER.read(this, SpecificData.getDecoder(in)); + } + + @Override + protected boolean hasCustomCoders() { + return true; + } + + @Override + public void customEncode(org.apache.avro.io.Encoder out) + throws java.io.IOException { + out.writeFixed(_fixedData.bytes(), 0, 4); + } + + @Override + public void customDecode(org.apache.avro.io.ResolvingDecoder in) + throws java.io.IOException { + org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff(); + if (fieldOrder == null) { + if (_fixedData == null) { + _fixedData = new FixedSchema(); + } + in.readFixed(_fixedData.bytes(), 0, 4); + } else { + for (int i = 0; i < 1; i++) { + switch (fieldOrder[i].pos()) { + case 0: + if (_fixedData == null) { + _fixedData = new FixedSchema(); + } + in.readFixed(_fixedData.bytes(), 0, 4); + break; + + default: + throw new java.io.IOException("Corrupt ResolvingDecoder."); + } + } + } + } + } + + /** + * SpecificFixed created for testing Fixed data type + */ + static class FixedSchema extends org.apache.avro.specific.SpecificFixed { + private static final long serialVersionUID = -1121289150751596161L; + public static final org.apache.avro.Schema SCHEMA = new org.apache.avro.Schema.Parser().parse( + "{\"type\":\"fixed\",\"name\":\"FixedSchema\",\"doc\":\"\",\"size\":4}"); + + public static org.apache.avro.Schema getClassSchema() { + return SCHEMA; + } + + public org.apache.avro.Schema getSchema() { + return SCHEMA; + } + + /** Creates a new FixedSchema */ + public FixedSchema() { + super(); + } + + /** + * Creates a new FixedSchema with the given bytes. + * @param bytes The bytes to create the new FixedSchema. + */ + public FixedSchema(byte[] bytes) { + super(bytes); + } + + private static final org.apache.avro.io.DatumWriter<FixedSchema> WRITER = + new org.apache.avro.specific.SpecificDatumWriter<FixedSchema>(SCHEMA); + + @Override + public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER.write(this, org.apache.avro.specific.SpecificData.getEncoder(out)); + } + + private static final org.apache.avro.io.DatumReader<FixedSchema> READER = + new org.apache.avro.specific.SpecificDatumReader<FixedSchema>(SCHEMA); + + @Override + public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER.read(this, org.apache.avro.specific.SpecificData.getDecoder(in)); + } + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java index 3838b6a56f..1ed69c9065 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/DataTypeTransformer.java @@ -92,6 +92,7 @@ public class DataTypeTransformer implements RecordTransformer { // Single-value column source = PinotDataType.getSingleValueType(value.getClass()); } + // Skipping conversion when srcType!=destType is speculative, and can be unsafe when // the array for MV column contains values of mixing types. Mixing types can lead // to ClassCastException during conversion, often aborting data ingestion jobs. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org