ayushtkn commented on code in PR #6407:
URL: https://github.com/apache/hive/pull/6407#discussion_r3049749996
##########
ql/src/java/org/apache/hadoop/hive/ql/metadata/RowLineageUtils.java:
##########
@@ -81,17 +82,54 @@ public static boolean supportsRowLineage(Table table) {
return table.getStorageHandler().supportsRowLineage(table.getParameters());
}
+ /**
+ * Returns the row lineage virtual columns with the leading comma for string
concatenation.
+ * Example: {@code ", ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER"}.
+ */
+ public static String getRowLineageSelectColumns(boolean rowLineageEnabled) {
+ return rowLineageEnabled
+ ? ", " + VirtualColumn.ROW_LINEAGE_ID.getName() + ", " +
VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER.getName()
+ : "";
Review Comment:
Can you change it getRowLineageColumnsForCompaction
##########
ql/src/java/org/apache/hadoop/hive/ql/metadata/RowLineageUtils.java:
##########
@@ -81,17 +82,54 @@ public static boolean supportsRowLineage(Table table) {
return table.getStorageHandler().supportsRowLineage(table.getParameters());
}
+ /**
+ * Returns the row lineage virtual columns with the leading comma for string
concatenation.
+ * Example: {@code ", ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER"}.
+ */
+ public static String getRowLineageSelectColumns(boolean rowLineageEnabled) {
+ return rowLineageEnabled
+ ? ", " + VirtualColumn.ROW_LINEAGE_ID.getName() + ", " +
VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER.getName()
+ : "";
+ }
+
+ /**
+ * Enables or disables row lineage for the current query/session context.
+ */
+ public static void setRowLineage(Configuration conf, boolean enabled) {
+ SessionStateUtil.addResource(conf, SessionStateUtil.ROW_LINEAGE, enabled);
+ }
+
+ private static void setRowLineageConfFlag(Configuration conf, boolean
enabled) {
+ if (enabled) {
+ conf.setBoolean(SessionStateUtil.ROW_LINEAGE, true);
+ } else {
+ conf.unset(SessionStateUtil.ROW_LINEAGE);
+ }
+ }
+
+ /**
+ * Enable the row lineage session flag for the current statement execution.
+ * Returns {@code true} if the flag was enabled
+ */
+ public static void enableRowLineage(SessionState sessionState) {
+ setRowLineageConfFlag(sessionState.getConf(), true);
+ }
+
+ public static void disableRowLineage(SessionState sessionState) {
+ setRowLineageConfFlag(sessionState.getConf(), false);
+ }
+
Review Comment:
why can't we directly do
```
public static void enableRowLineage(SessionState sessionState) {
sessionState.getConf().setBoolean(SessionStateUtil.ROW_LINEAGE, true);
}
public static void disableRowLineage(SessionState sessionState) {
sessionState.getConf().setBoolean(SessionStateUtil.ROW_LINEAGE, false);
}
```
You have javadoc for one & not for others, considering it is a util class we
can drop it
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java:
##########
@@ -69,84 +70,140 @@ public boolean run(CompactorContext context) throws
IOException, HiveException,
HiveConf conf = new HiveConf(context.getConf());
CompactionInfo ci = context.getCompactionInfo();
- String compactionQuery = buildCompactionQuery(context, compactTableName,
conf);
+ org.apache.hadoop.hive.ql.metadata.Table hiveTable =
+ new org.apache.hadoop.hive.ql.metadata.Table(context.getTable());
+ boolean rowLineageEnabled = RowLineageUtils.supportsRowLineage(hiveTable);
+ String compactionQuery = buildCompactionQuery(context, compactTableName,
conf, rowLineageEnabled);
SessionState sessionState = setupQueryCompactionSession(conf, ci,
tblProperties);
+
+ if (rowLineageEnabled) {
+ RowLineageUtils.enableRowLineage(sessionState);
+ LOG.debug("Row lineage flag set for compaction of table {}",
compactTableName);
+ }
Review Comment:
can we not do it when we add the columns for row lineage, would avoid
redundant checking `rowLineageEnabled`
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java:
##########
@@ -69,84 +70,140 @@ public boolean run(CompactorContext context) throws
IOException, HiveException,
HiveConf conf = new HiveConf(context.getConf());
CompactionInfo ci = context.getCompactionInfo();
- String compactionQuery = buildCompactionQuery(context, compactTableName,
conf);
+ org.apache.hadoop.hive.ql.metadata.Table hiveTable =
+ new org.apache.hadoop.hive.ql.metadata.Table(context.getTable());
+ boolean rowLineageEnabled = RowLineageUtils.supportsRowLineage(hiveTable);
+ String compactionQuery = buildCompactionQuery(context, compactTableName,
conf, rowLineageEnabled);
SessionState sessionState = setupQueryCompactionSession(conf, ci,
tblProperties);
+
+ if (rowLineageEnabled) {
+ RowLineageUtils.enableRowLineage(sessionState);
+ LOG.debug("Row lineage flag set for compaction of table {}",
compactTableName);
+ }
+
String compactionTarget = "table " +
HiveUtils.unparseIdentifier(compactTableName) +
(ci.partName != null ? ", partition " +
HiveUtils.unparseIdentifier(ci.partName) : "");
try {
- DriverUtils.runOnDriver(conf, sessionState, compactionQuery);
+ DriverUtils.runOnDriver(sessionState.getConf(), sessionState,
compactionQuery);
LOG.info("Completed compaction for {}", compactionTarget);
return true;
} catch (HiveException e) {
LOG.error("Failed compacting {}", compactionTarget, e);
throw e;
} finally {
+ RowLineageUtils.disableRowLineage(sessionState);
sessionState.setCompaction(false);
}
}
- private String buildCompactionQuery(CompactorContext context, String
compactTableName, HiveConf conf)
+ private String buildCompactionQuery(CompactorContext context, String
compactTableName, HiveConf conf,
+ boolean rowLineageEnabled)
throws HiveException {
CompactionInfo ci = context.getCompactionInfo();
+ String rowLineageColumns =
RowLineageUtils.getRowLineageSelectColumns(rowLineageEnabled);
org.apache.hadoop.hive.ql.metadata.Table table =
Hive.get(conf).getTable(context.getTable().getDbName(),
context.getTable().getTableName());
Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
String orderBy = ci.orderByClause == null ? "" : ci.orderByClause;
- String fileSizePredicate = null;
- String compactionQuery;
-
- if (ci.type == CompactionType.MINOR) {
- long fileSizeInBytesThreshold =
CompactionEvaluator.getFragmentSizeBytes(table.getParameters());
- fileSizePredicate = String.format("%1$s in (select file_path from
%2$s.files where file_size_in_bytes < %3$d)",
- VirtualColumn.FILE_PATH.getName(), compactTableName,
fileSizeInBytesThreshold);
- conf.setLong(CompactorContext.COMPACTION_FILE_SIZE_THRESHOLD,
fileSizeInBytesThreshold);
- // IOW query containing a join with Iceberg .files metadata table fails
with exception that Iceberg AVRO format
- // doesn't support vectorization, hence disabling it in this case.
- conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+ String fileSizePredicate = buildMinorFileSizePredicate(ci,
compactTableName, conf, table);
+
+ String compactionQuery = (ci.partName == null) ?
+ buildFullTableCompactionQuery(compactTableName, conf, icebergTable,
+ rowLineageColumns, fileSizePredicate, orderBy) :
+ buildPartitionCompactionQuery(ci, compactTableName, conf, icebergTable,
+ rowLineageColumns, fileSizePredicate, orderBy);
+
+ LOG.info("Compaction query: {}", compactionQuery);
+ return compactionQuery;
+ }
+
+ private static String buildMinorFileSizePredicate(
+ CompactionInfo ci, String compactTableName, HiveConf conf,
org.apache.hadoop.hive.ql.metadata.Table table) {
+ if (ci.type != CompactionType.MINOR) {
+ return null;
}
- if (ci.partName == null) {
- if (!icebergTable.spec().isPartitioned()) {
- HiveConf.setVar(conf, ConfVars.REWRITE_POLICY,
RewritePolicy.FULL_TABLE.name());
- compactionQuery = String.format("insert overwrite table %s select *
from %<s %2$s %3$s", compactTableName,
- fileSizePredicate == null ? "" : "where " + fileSizePredicate,
orderBy);
- } else if (icebergTable.specs().size() > 1) {
- // Compacting partitions of old partition specs on a partitioned table
with partition evolution
- HiveConf.setVar(conf, ConfVars.REWRITE_POLICY,
RewritePolicy.PARTITION.name());
- // A single filter on a virtual column causes errors during
compilation,
- // added another filter on file_path as a workaround.
- compactionQuery = String.format("insert overwrite table %1$s select *
from %1$s " +
- "where %2$s != %3$d and %4$s is not null %5$s %6$s",
- compactTableName, VirtualColumn.PARTITION_SPEC_ID.getName(),
icebergTable.spec().specId(),
- VirtualColumn.FILE_PATH.getName(), fileSizePredicate == null ? ""
: "and " + fileSizePredicate, orderBy);
- } else {
- // Partitioned table without partition evolution with partition spec
as null in the compaction request - this
- // code branch is not supposed to be reachable
- throw new HiveException(ErrorMsg.COMPACTION_NO_PARTITION);
- }
- } else {
- HiveConf.setBoolVar(conf, ConfVars.HIVE_CONVERT_JOIN, false);
- conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+ long fileSizeInBytesThreshold =
CompactionEvaluator.getFragmentSizeBytes(table.getParameters());
+ conf.setLong(CompactorContext.COMPACTION_FILE_SIZE_THRESHOLD,
fileSizeInBytesThreshold);
+ // IOW query containing a join with Iceberg .files metadata table fails
with exception that Iceberg AVRO format
+ // doesn't support vectorization, hence disabling it in this case.
+ conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+
+ return String.format("%1$s in (select file_path from %2$s.files where
file_size_in_bytes < %3$d)",
+ VirtualColumn.FILE_PATH.getName(), compactTableName,
fileSizeInBytesThreshold);
+ }
+
+ private String buildFullTableCompactionQuery(
+ String compactTableName,
+ HiveConf conf,
+ Table icebergTable,
+ String rowLineageColumns,
+ String fileSizePredicate,
+ String orderBy) throws HiveException {
+ String selectColumns = buildSelectColumnList(icebergTable, conf);
+
+ if (!icebergTable.spec().isPartitioned()) {
+ HiveConf.setVar(conf, ConfVars.REWRITE_POLICY,
RewritePolicy.FULL_TABLE.name());
+ return String.format("insert overwrite table %1$s select %2$s%3$s from
%1$s %4$s %5$s",
+ compactTableName, selectColumns, rowLineageColumns,
+ fileSizePredicate == null ? "" : "where " + fileSizePredicate,
orderBy);
+ }
+
+ if (icebergTable.specs().size() > 1) {
+ // Compacting partitions of old partition specs on a partitioned table
with partition evolution
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY,
RewritePolicy.PARTITION.name());
- conf.set(IcebergCompactionService.PARTITION_PATH, new
Path(ci.partName).toString());
-
- PartitionSpec spec;
- String partitionPredicate;
- try {
- spec = IcebergTableUtil.getPartitionSpec(icebergTable, ci.partName);
- partitionPredicate = buildPartitionPredicate(ci, spec);
- } catch (MetaException e) {
- throw new HiveException(e);
- }
+ // A single filter on a virtual column causes errors during compilation,
+ // added another filter on file_path as a workaround.
+ return String.format("insert overwrite table %1$s select %2$s%3$s from
%1$s " +
+ "where %4$s != %5$d and %6$s is not null %7$s %8$s",
+ compactTableName, selectColumns, rowLineageColumns,
+ VirtualColumn.PARTITION_SPEC_ID.getName(),
icebergTable.spec().specId(),
+ VirtualColumn.FILE_PATH.getName(), fileSizePredicate == null ? "" :
"and " + fileSizePredicate, orderBy);
+ }
+
+ // Partitioned table without partition evolution with partition spec as
null in the compaction request - this
+ // code branch is not supposed to be reachable
+ throw new HiveException(ErrorMsg.COMPACTION_NO_PARTITION);
+ }
- compactionQuery = String.format("INSERT OVERWRITE TABLE %1$s SELECT *
FROM %1$s WHERE %2$s IN " +
- "(SELECT FILE_PATH FROM %1$s.FILES WHERE %3$s AND SPEC_ID = %4$d)
%5$s %6$s",
- compactTableName, VirtualColumn.FILE_PATH.getName(), partitionPredicate,
spec.specId(),
- fileSizePredicate == null ? "" : "AND " + fileSizePredicate, orderBy);
+ private String buildPartitionCompactionQuery(
+ CompactionInfo ci,
+ String compactTableName,
+ HiveConf conf,
+ Table icebergTable,
+ String rowLineageColumns,
+ String fileSizePredicate,
+ String orderBy) throws HiveException {
+ HiveConf.setBoolVar(conf, ConfVars.HIVE_CONVERT_JOIN, false);
+ conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+ HiveConf.setVar(conf, ConfVars.REWRITE_POLICY,
RewritePolicy.PARTITION.name());
+ conf.set(IcebergCompactionService.PARTITION_PATH, new
Path(ci.partName).toString());
+
+ PartitionSpec spec;
+ String partitionPredicate;
+ try {
+ spec = IcebergTableUtil.getPartitionSpec(icebergTable, ci.partName);
+ partitionPredicate = buildPartitionPredicate(ci, spec);
+ } catch (MetaException e) {
+ throw new HiveException(e);
}
- return compactionQuery;
+
+ return String.format("INSERT OVERWRITE TABLE %1$s SELECT *%2$s FROM %1$s
WHERE %3$s IN " +
+ "(SELECT FILE_PATH FROM %1$s.FILES WHERE %4$s AND SPEC_ID = %5$d)
%6$s %7$s",
+ compactTableName, rowLineageColumns,
VirtualColumn.FILE_PATH.getName(), partitionPredicate, spec.specId(),
+ fileSizePredicate == null ? "" : "AND " + fileSizePredicate, orderBy);
+ }
+
+ /**
+ * Builds a comma-separated SELECT list from the Iceberg table schema.
+ */
+ private static String buildSelectColumnList(Table icebergTable, HiveConf
conf) {
+ return icebergTable.schema().columns().stream()
+ .map(Types.NestedField::name)
+ .map(col -> HiveUtils.unparseIdentifier(col, conf))
+ .collect(Collectors.joining(", "));
Review Comment:
I don't think this logic should kick in like this. If `rowLineage` isn't
enabled, it should just return `*`, like before.
If `rowLineage` is enabled add the name from
`ROW_LINEAGE_COLUMNS_TO_FILE_NAME`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]