This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 38211a6c79 Add JOIN support to PinotQuery (#10421)
38211a6c79 is described below

commit 38211a6c795ddb27194bfa3b2c161372790bc55b
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Mon Mar 27 23:31:34 2023 -0700

    Add JOIN support to PinotQuery (#10421)
---
 .../requesthandler/BaseBrokerRequestHandler.java   |  20 +-
 .../apache/pinot/common/request/DataSource.java    | 240 +++++--
 .../java/org/apache/pinot/common/request/Join.java | 761 +++++++++++++++++++++
 .../org/apache/pinot/common/request/JoinType.java  |  67 ++
 .../apache/pinot/sql/parsers/CalciteSqlParser.java |  65 +-
 .../pinot/sql/parsers/CalciteSqlCompilerTest.java  | 122 +++-
 pinot-common/src/thrift/query.thrift               |  39 +-
 7 files changed, 1227 insertions(+), 87 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 7112e0ee58..6fe32d6031 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -57,6 +57,7 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.BrokerQueryPhase;
 import org.apache.pinot.common.metrics.BrokerTimer;
 import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.DataSource;
 import org.apache.pinot.common.request.Expression;
 import org.apache.pinot.common.request.ExpressionType;
 import org.apache.pinot.common.request.Function;
@@ -305,12 +306,25 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       }
 
       PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
-      if (serverPinotQuery.getDataSource() == null) {
+      DataSource dataSource = serverPinotQuery.getDataSource();
+      if (dataSource == null) {
         LOGGER.info("Data source (FROM clause) not found in request {}: {}", 
request, query);
         
requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
         return new BrokerResponseNative(
             QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, 
"Data source (FROM clause) not found"));
       }
+      if (dataSource.getJoin() != null) {
+        LOGGER.info("JOIN is not supported in request {}: {}", request, query);
+        
requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
+        return new BrokerResponseNative(
+            QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, 
"JOIN is not supported"));
+      }
+      if (dataSource.getTableName() == null) {
+        LOGGER.info("Table name not found in request {}: {}", request, query);
+        
requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
+        return new BrokerResponseNative(
+            QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, 
"Table name not found"));
+      }
 
       try {
         handleSubquery(serverPinotQuery, requestId, request, 
requesterIdentity, requestContext);
@@ -321,8 +335,8 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
         return new 
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
       }
 
-      String tableName = 
getActualTableName(serverPinotQuery.getDataSource().getTableName(), 
_tableCache);
-      serverPinotQuery.getDataSource().setTableName(tableName);
+      String tableName = getActualTableName(dataSource.getTableName(), 
_tableCache);
+      dataSource.setTableName(tableName);
       String rawTableName = TableNameBuilder.extractRawTableName(tableName);
       requestContext.setTableName(rawTableName);
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/request/DataSource.java 
b/pinot-common/src/main/java/org/apache/pinot/common/request/DataSource.java
index e4174c5613..c6f4864ccb 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/request/DataSource.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/request/DataSource.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 /**
- * Autogenerated by Thrift Compiler (0.17.0)
+ * Autogenerated by Thrift Compiler (0.15.0)
  *
  * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
  *  @generated
@@ -25,25 +25,34 @@
 package org.apache.pinot.common.request;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler 
(0.17.0)", date = "2023-02-08")
-public class DataSource implements org.apache.thrift.TBase<DataSource, 
DataSource._Fields>, java.io.Serializable, Cloneable, Comparable<DataSource> {
-  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new 
org.apache.thrift.protocol.TStruct("DataSource");
-
-  private static final org.apache.thrift.protocol.TField TABLE_NAME_FIELD_DESC 
= new org.apache.thrift.protocol.TField("tableName", 
org.apache.thrift.protocol.TType.STRING, (short)1);
-  private static final org.apache.thrift.protocol.TField SUBQUERY_FIELD_DESC = 
new org.apache.thrift.protocol.TField("subquery", 
org.apache.thrift.protocol.TType.STRUCT, (short)2);
-
-  private static final org.apache.thrift.scheme.SchemeFactory 
STANDARD_SCHEME_FACTORY = new DataSourceStandardSchemeFactory();
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler 
(0.15.0)", date = "2023-03-14")
+public class DataSource
+    implements org.apache.thrift.TBase<DataSource, DataSource._Fields>, 
java.io.Serializable, Cloneable,
+               Comparable<DataSource> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC =
+      new org.apache.thrift.protocol.TStruct("DataSource");
+
+  private static final org.apache.thrift.protocol.TField TABLE_NAME_FIELD_DESC 
=
+      new org.apache.thrift.protocol.TField("tableName", 
org.apache.thrift.protocol.TType.STRING, (short) 1);
+  private static final org.apache.thrift.protocol.TField SUBQUERY_FIELD_DESC =
+      new org.apache.thrift.protocol.TField("subquery", 
org.apache.thrift.protocol.TType.STRUCT, (short) 2);
+  private static final org.apache.thrift.protocol.TField JOIN_FIELD_DESC =
+      new org.apache.thrift.protocol.TField("join", 
org.apache.thrift.protocol.TType.STRUCT, (short) 3);
+
+  private static final org.apache.thrift.scheme.SchemeFactory 
STANDARD_SCHEME_FACTORY =
+      new DataSourceStandardSchemeFactory();
   private static final org.apache.thrift.scheme.SchemeFactory 
TUPLE_SCHEME_FACTORY = new DataSourceTupleSchemeFactory();
 
   public @org.apache.thrift.annotation.Nullable java.lang.String tableName; // 
optional
   public @org.apache.thrift.annotation.Nullable PinotQuery subquery; // 
optional
+  public @org.apache.thrift.annotation.Nullable Join join; // optional
 
   /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
-    TABLE_NAME((short)1, "tableName"),
-    SUBQUERY((short)2, "subquery");
+    TABLE_NAME((short) 1, "tableName"), SUBQUERY((short) 2, "subquery"), 
JOIN((short) 3, "join");
 
-    private static final java.util.Map<java.lang.String, _Fields> byName = new 
java.util.HashMap<java.lang.String, _Fields>();
+    private static final java.util.Map<java.lang.String, _Fields> byName =
+        new java.util.HashMap<java.lang.String, _Fields>();
 
     static {
       for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
@@ -56,11 +65,13 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
      */
     @org.apache.thrift.annotation.Nullable
     public static _Fields findByThriftId(int fieldId) {
-      switch(fieldId) {
+      switch (fieldId) {
         case 1: // TABLE_NAME
           return TABLE_NAME;
         case 2: // SUBQUERY
           return SUBQUERY;
+        case 3: // JOIN
+          return JOIN;
         default:
           return null;
       }
@@ -72,7 +83,9 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
      */
     public static _Fields findByThriftIdOrThrow(int fieldId) {
       _Fields fields = findByThriftId(fieldId);
-      if (fields == null) throw new java.lang.IllegalArgumentException("Field 
" + fieldId + " doesn't exist!");
+      if (fields == null) {
+        throw new java.lang.IllegalArgumentException("Field " + fieldId + " 
doesn't exist!");
+      }
       return fields;
     }
 
@@ -104,14 +117,21 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
   }
 
   // isset id assignments
-  private static final _Fields optionals[] = 
{_Fields.TABLE_NAME,_Fields.SUBQUERY};
+  private static final _Fields optionals[] = {_Fields.TABLE_NAME, 
_Fields.SUBQUERY, _Fields.JOIN};
   public static final java.util.Map<_Fields, 
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+
   static {
-    java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = 
new java.util.EnumMap<_Fields, 
org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
-    tmpMap.put(_Fields.TABLE_NAME, new 
org.apache.thrift.meta_data.FieldMetaData("tableName", 
org.apache.thrift.TFieldRequirementType.OPTIONAL,
-        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
-    tmpMap.put(_Fields.SUBQUERY, new 
org.apache.thrift.meta_data.FieldMetaData("subquery", 
org.apache.thrift.TFieldRequirementType.OPTIONAL,
-        new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 PinotQuery.class)));
+    java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap =
+        new java.util.EnumMap<_Fields, 
org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.TABLE_NAME,
+        new org.apache.thrift.meta_data.FieldMetaData("tableName", 
org.apache.thrift.TFieldRequirementType.OPTIONAL,
+            new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.SUBQUERY,
+        new org.apache.thrift.meta_data.FieldMetaData("subquery", 
org.apache.thrift.TFieldRequirementType.OPTIONAL,
+            new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 PinotQuery.class)));
+    tmpMap.put(_Fields.JOIN,
+        new org.apache.thrift.meta_data.FieldMetaData("join", 
org.apache.thrift.TFieldRequirementType.OPTIONAL,
+            new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT,
 "Join")));
     metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
     
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(DataSource.class,
 metaDataMap);
   }
@@ -129,6 +149,9 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
     if (other.isSetSubquery()) {
       this.subquery = new PinotQuery(other.subquery);
     }
+    if (other.isSetJoin()) {
+      this.join = new Join(other.join);
+    }
   }
 
   @Override
@@ -140,6 +163,7 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
   public void clear() {
     this.tableName = null;
     this.subquery = null;
+    this.join = null;
   }
 
   @org.apache.thrift.annotation.Nullable
@@ -192,6 +216,31 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
     }
   }
 
+  @org.apache.thrift.annotation.Nullable
+  public Join getJoin() {
+    return this.join;
+  }
+
+  public DataSource setJoin(@org.apache.thrift.annotation.Nullable Join join) {
+    this.join = join;
+    return this;
+  }
+
+  public void unsetJoin() {
+    this.join = null;
+  }
+
+  /** Returns true if field join is set (has been assigned a value) and false 
otherwise */
+  public boolean isSetJoin() {
+    return this.join != null;
+  }
+
+  public void setJoinIsSet(boolean value) {
+    if (!value) {
+      this.join = null;
+    }
+  }
+
   @Override
   public void setFieldValue(_Fields field, 
@org.apache.thrift.annotation.Nullable java.lang.Object value) {
     switch (field) {
@@ -199,7 +248,7 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
         if (value == null) {
           unsetTableName();
         } else {
-          setTableName((java.lang.String)value);
+          setTableName((java.lang.String) value);
         }
         break;
 
@@ -207,10 +256,17 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
         if (value == null) {
           unsetSubquery();
         } else {
-          setSubquery((PinotQuery)value);
+          setSubquery((PinotQuery) value);
         }
         break;
 
+      case JOIN:
+        if (value == null) {
+          unsetJoin();
+        } else {
+          setJoin((Join) value);
+        }
+        break;
     }
   }
 
@@ -224,6 +280,8 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
       case SUBQUERY:
         return getSubquery();
 
+      case JOIN:
+        return getJoin();
     }
     throw new java.lang.IllegalStateException();
   }
@@ -240,39 +298,59 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
         return isSetTableName();
       case SUBQUERY:
         return isSetSubquery();
+      case JOIN:
+        return isSetJoin();
     }
     throw new java.lang.IllegalStateException();
   }
 
   @Override
   public boolean equals(java.lang.Object that) {
-    if (that instanceof DataSource)
-      return this.equals((DataSource)that);
+    if (that instanceof DataSource) {
+      return this.equals((DataSource) that);
+    }
     return false;
   }
 
   public boolean equals(DataSource that) {
-    if (that == null)
+    if (that == null) {
       return false;
-    if (this == that)
+    }
+    if (this == that) {
       return true;
+    }
 
     boolean this_present_tableName = true && this.isSetTableName();
     boolean that_present_tableName = true && that.isSetTableName();
     if (this_present_tableName || that_present_tableName) {
-      if (!(this_present_tableName && that_present_tableName))
+      if (!(this_present_tableName && that_present_tableName)) {
         return false;
-      if (!this.tableName.equals(that.tableName))
+      }
+      if (!this.tableName.equals(that.tableName)) {
         return false;
+      }
     }
 
     boolean this_present_subquery = true && this.isSetSubquery();
     boolean that_present_subquery = true && that.isSetSubquery();
     if (this_present_subquery || that_present_subquery) {
-      if (!(this_present_subquery && that_present_subquery))
+      if (!(this_present_subquery && that_present_subquery)) {
+        return false;
+      }
+      if (!this.subquery.equals(that.subquery)) {
+        return false;
+      }
+    }
+
+    boolean this_present_join = true && this.isSetJoin();
+    boolean that_present_join = true && that.isSetJoin();
+    if (this_present_join || that_present_join) {
+      if (!(this_present_join && that_present_join)) {
         return false;
-      if (!this.subquery.equals(that.subquery))
+      }
+      if (!this.join.equals(that.join)) {
         return false;
+      }
     }
 
     return true;
@@ -283,12 +361,19 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
     int hashCode = 1;
 
     hashCode = hashCode * 8191 + ((isSetTableName()) ? 131071 : 524287);
-    if (isSetTableName())
+    if (isSetTableName()) {
       hashCode = hashCode * 8191 + tableName.hashCode();
+    }
 
     hashCode = hashCode * 8191 + ((isSetSubquery()) ? 131071 : 524287);
-    if (isSetSubquery())
+    if (isSetSubquery()) {
       hashCode = hashCode * 8191 + subquery.hashCode();
+    }
+
+    hashCode = hashCode * 8191 + ((isSetJoin()) ? 131071 : 524287);
+    if (isSetJoin()) {
+      hashCode = hashCode * 8191 + join.hashCode();
+    }
 
     return hashCode;
   }
@@ -321,6 +406,16 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
         return lastComparison;
       }
     }
+    lastComparison = java.lang.Boolean.compare(isSetJoin(), other.isSetJoin());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetJoin()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.join, 
other.join);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -331,12 +426,14 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
   }
 
   @Override
-  public void read(org.apache.thrift.protocol.TProtocol iprot) throws 
org.apache.thrift.TException {
+  public void read(org.apache.thrift.protocol.TProtocol iprot)
+      throws org.apache.thrift.TException {
     scheme(iprot).read(iprot, this);
   }
 
   @Override
-  public void write(org.apache.thrift.protocol.TProtocol oprot) throws 
org.apache.thrift.TException {
+  public void write(org.apache.thrift.protocol.TProtocol oprot)
+      throws org.apache.thrift.TException {
     scheme(oprot).write(oprot, this);
   }
 
@@ -355,7 +452,9 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
       first = false;
     }
     if (isSetSubquery()) {
-      if (!first) sb.append(", ");
+      if (!first) {
+        sb.append(", ");
+      }
       sb.append("subquery:");
       if (this.subquery == null) {
         sb.append("null");
@@ -364,11 +463,24 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
       }
       first = false;
     }
+    if (isSetJoin()) {
+      if (!first) {
+        sb.append(", ");
+      }
+      sb.append("join:");
+      if (this.join == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.join);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
 
-  public void validate() throws org.apache.thrift.TException {
+  public void validate()
+      throws org.apache.thrift.TException {
     // check for required fields
     // check for sub-struct validity
     if (subquery != null) {
@@ -376,7 +488,8 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
     }
   }
 
-  private void writeObject(java.io.ObjectOutputStream out) throws 
java.io.IOException {
+  private void writeObject(java.io.ObjectOutputStream out)
+      throws java.io.IOException {
     try {
       write(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(out)));
     } catch (org.apache.thrift.TException te) {
@@ -384,7 +497,8 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
     }
   }
 
-  private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, java.lang.ClassNotFoundException {
+  private void readObject(java.io.ObjectInputStream in)
+      throws java.io.IOException, java.lang.ClassNotFoundException {
     try {
       read(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(in)));
     } catch (org.apache.thrift.TException te) {
@@ -402,11 +516,11 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
   private static class DataSourceStandardScheme extends 
org.apache.thrift.scheme.StandardScheme<DataSource> {
 
     @Override
-    public void read(org.apache.thrift.protocol.TProtocol iprot, DataSource 
struct) throws org.apache.thrift.TException {
+    public void read(org.apache.thrift.protocol.TProtocol iprot, DataSource 
struct)
+        throws org.apache.thrift.TException {
       org.apache.thrift.protocol.TField schemeField;
       iprot.readStructBegin();
-      while (true)
-      {
+      while (true) {
         schemeField = iprot.readFieldBegin();
         if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
           break;
@@ -429,6 +543,15 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
             }
             break;
+          case 3: // JOIN
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+              struct.join = new Join();
+              struct.join.read(iprot);
+              struct.setJoinIsSet(true);
+            } else {
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
         }
@@ -441,7 +564,8 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
     }
 
     @Override
-    public void write(org.apache.thrift.protocol.TProtocol oprot, DataSource 
struct) throws org.apache.thrift.TException {
+    public void write(org.apache.thrift.protocol.TProtocol oprot, DataSource 
struct)
+        throws org.apache.thrift.TException {
       struct.validate();
 
       oprot.writeStructBegin(STRUCT_DESC);
@@ -459,10 +583,16 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
           oprot.writeFieldEnd();
         }
       }
+      if (struct.join != null) {
+        if (struct.isSetJoin()) {
+          oprot.writeFieldBegin(JOIN_FIELD_DESC);
+          struct.join.write(oprot);
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
-
   }
 
   private static class DataSourceTupleSchemeFactory implements 
org.apache.thrift.scheme.SchemeFactory {
@@ -475,7 +605,8 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
   private static class DataSourceTupleScheme extends 
org.apache.thrift.scheme.TupleScheme<DataSource> {
 
     @Override
-    public void write(org.apache.thrift.protocol.TProtocol prot, DataSource 
struct) throws org.apache.thrift.TException {
+    public void write(org.apache.thrift.protocol.TProtocol prot, DataSource 
struct)
+        throws org.apache.thrift.TException {
       org.apache.thrift.protocol.TTupleProtocol oprot = 
(org.apache.thrift.protocol.TTupleProtocol) prot;
       java.util.BitSet optionals = new java.util.BitSet();
       if (struct.isSetTableName()) {
@@ -484,19 +615,26 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
       if (struct.isSetSubquery()) {
         optionals.set(1);
       }
-      oprot.writeBitSet(optionals, 2);
+      if (struct.isSetJoin()) {
+        optionals.set(2);
+      }
+      oprot.writeBitSet(optionals, 3);
       if (struct.isSetTableName()) {
         oprot.writeString(struct.tableName);
       }
       if (struct.isSetSubquery()) {
         struct.subquery.write(oprot);
       }
+      if (struct.isSetJoin()) {
+        struct.join.write(oprot);
+      }
     }
 
     @Override
-    public void read(org.apache.thrift.protocol.TProtocol prot, DataSource 
struct) throws org.apache.thrift.TException {
+    public void read(org.apache.thrift.protocol.TProtocol prot, DataSource 
struct)
+        throws org.apache.thrift.TException {
       org.apache.thrift.protocol.TTupleProtocol iprot = 
(org.apache.thrift.protocol.TTupleProtocol) prot;
-      java.util.BitSet incoming = iprot.readBitSet(2);
+      java.util.BitSet incoming = iprot.readBitSet(3);
       if (incoming.get(0)) {
         struct.tableName = iprot.readString();
         struct.setTableNameIsSet(true);
@@ -506,11 +644,17 @@ public class DataSource implements 
org.apache.thrift.TBase<DataSource, DataSourc
         struct.subquery.read(iprot);
         struct.setSubqueryIsSet(true);
       }
+      if (incoming.get(2)) {
+        struct.join = new Join();
+        struct.join.read(iprot);
+        struct.setJoinIsSet(true);
+      }
     }
   }
 
   private static <S extends org.apache.thrift.scheme.IScheme> S 
scheme(org.apache.thrift.protocol.TProtocol proto) {
-    return 
(org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? 
STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+    return 
(org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? 
STANDARD_SCHEME_FACTORY
+        : TUPLE_SCHEME_FACTORY).getScheme();
   }
 }
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/request/Join.java 
b/pinot-common/src/main/java/org/apache/pinot/common/request/Join.java
new file mode 100644
index 0000000000..b618ad4047
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/request/Join.java
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.15.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.pinot.common.request;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler 
(0.15.0)", date = "2023-03-14")
+public class Join
+    implements org.apache.thrift.TBase<Join, Join._Fields>, 
java.io.Serializable, Cloneable, Comparable<Join> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new 
org.apache.thrift.protocol.TStruct("Join");
+
+  private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC =
+      new org.apache.thrift.protocol.TField("type", 
org.apache.thrift.protocol.TType.I32, (short) 1);
+  private static final org.apache.thrift.protocol.TField LEFT_FIELD_DESC =
+      new org.apache.thrift.protocol.TField("left", 
org.apache.thrift.protocol.TType.STRUCT, (short) 2);
+  private static final org.apache.thrift.protocol.TField RIGHT_FIELD_DESC =
+      new org.apache.thrift.protocol.TField("right", 
org.apache.thrift.protocol.TType.STRUCT, (short) 3);
+  private static final org.apache.thrift.protocol.TField CONDITION_FIELD_DESC =
+      new org.apache.thrift.protocol.TField("condition", 
org.apache.thrift.protocol.TType.STRUCT, (short) 4);
+
+  private static final org.apache.thrift.scheme.SchemeFactory 
STANDARD_SCHEME_FACTORY = new JoinStandardSchemeFactory();
+  private static final org.apache.thrift.scheme.SchemeFactory 
TUPLE_SCHEME_FACTORY = new JoinTupleSchemeFactory();
+
+  public @org.apache.thrift.annotation.Nullable JoinType type; // required
+  public @org.apache.thrift.annotation.Nullable DataSource left; // required
+  public @org.apache.thrift.annotation.Nullable DataSource right; // required
+  public @org.apache.thrift.annotation.Nullable Expression condition; // 
optional
+
+  /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    TYPE((short) 1, "type"), LEFT((short) 2, "left"), RIGHT((short) 3, 
"right"), CONDITION((short) 4, "condition");
+
+    private static final java.util.Map<java.lang.String, _Fields> byName =
+        new java.util.HashMap<java.lang.String, _Fields>();
+
+    static {
+      for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not 
found.
+     */
+    @org.apache.thrift.annotation.Nullable
+    public static _Fields findByThriftId(int fieldId) {
+      switch (fieldId) {
+        case 1: // TYPE
+          return TYPE;
+        case 2: // LEFT
+          return LEFT;
+        case 3: // RIGHT
+          return RIGHT;
+        case 4: // CONDITION
+          return CONDITION;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) {
+        throw new java.lang.IllegalArgumentException("Field " + fieldId + " 
doesn't exist!");
+      }
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    @org.apache.thrift.annotation.Nullable
+    public static _Fields findByName(java.lang.String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final java.lang.String _fieldName;
+
+    _Fields(short thriftId, java.lang.String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    @Override
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    @Override
+    public java.lang.String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final _Fields optionals[] = {_Fields.CONDITION};
+  public static final java.util.Map<_Fields, 
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+
+  static {
+    java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap =
+        new java.util.EnumMap<_Fields, 
org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.TYPE,
+        new org.apache.thrift.meta_data.FieldMetaData("type", 
org.apache.thrift.TFieldRequirementType.REQUIRED,
+            new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.ENUM,
 "JoinType")));
+    tmpMap.put(_Fields.LEFT,
+        new org.apache.thrift.meta_data.FieldMetaData("left", 
org.apache.thrift.TFieldRequirementType.REQUIRED,
+            new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 DataSource.class)));
+    tmpMap.put(_Fields.RIGHT,
+        new org.apache.thrift.meta_data.FieldMetaData("right", 
org.apache.thrift.TFieldRequirementType.REQUIRED,
+            new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 DataSource.class)));
+    tmpMap.put(_Fields.CONDITION,
+        new org.apache.thrift.meta_data.FieldMetaData("condition", 
org.apache.thrift.TFieldRequirementType.OPTIONAL,
+            new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT,
 "Expression")));
+    metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Join.class, 
metaDataMap);
+  }
+
+  public Join() {
+  }
+
+  public Join(JoinType type, DataSource left, DataSource right) {
+    this();
+    this.type = type;
+    this.left = left;
+    this.right = right;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public Join(Join other) {
+    if (other.isSetType()) {
+      this.type = other.type;
+    }
+    if (other.isSetLeft()) {
+      this.left = new DataSource(other.left);
+    }
+    if (other.isSetRight()) {
+      this.right = new DataSource(other.right);
+    }
+    if (other.isSetCondition()) {
+      this.condition = new Expression(other.condition);
+    }
+  }
+
+  @Override
+  public Join deepCopy() {
+    return new Join(this);
+  }
+
+  @Override
+  public void clear() {
+    this.type = null;
+    this.left = null;
+    this.right = null;
+    this.condition = null;
+  }
+
+  @org.apache.thrift.annotation.Nullable
+  public JoinType getType() {
+    return this.type;
+  }
+
+  public Join setType(@org.apache.thrift.annotation.Nullable JoinType type) {
+    this.type = type;
+    return this;
+  }
+
+  public void unsetType() {
+    this.type = null;
+  }
+
+  /** Returns true if field type is set (has been assigned a value) and false 
otherwise */
+  public boolean isSetType() {
+    return this.type != null;
+  }
+
+  public void setTypeIsSet(boolean value) {
+    if (!value) {
+      this.type = null;
+    }
+  }
+
+  @org.apache.thrift.annotation.Nullable
+  public DataSource getLeft() {
+    return this.left;
+  }
+
+  public Join setLeft(@org.apache.thrift.annotation.Nullable DataSource left) {
+    this.left = left;
+    return this;
+  }
+
+  public void unsetLeft() {
+    this.left = null;
+  }
+
+  /** Returns true if field left is set (has been assigned a value) and false 
otherwise */
+  public boolean isSetLeft() {
+    return this.left != null;
+  }
+
+  public void setLeftIsSet(boolean value) {
+    if (!value) {
+      this.left = null;
+    }
+  }
+
+  @org.apache.thrift.annotation.Nullable
+  public DataSource getRight() {
+    return this.right;
+  }
+
+  public Join setRight(@org.apache.thrift.annotation.Nullable DataSource 
right) {
+    this.right = right;
+    return this;
+  }
+
+  public void unsetRight() {
+    this.right = null;
+  }
+
+  /** Returns true if field right is set (has been assigned a value) and false 
otherwise */
+  public boolean isSetRight() {
+    return this.right != null;
+  }
+
+  public void setRightIsSet(boolean value) {
+    if (!value) {
+      this.right = null;
+    }
+  }
+
+  @org.apache.thrift.annotation.Nullable
+  public Expression getCondition() {
+    return this.condition;
+  }
+
+  public Join setCondition(@org.apache.thrift.annotation.Nullable Expression 
condition) {
+    this.condition = condition;
+    return this;
+  }
+
+  public void unsetCondition() {
+    this.condition = null;
+  }
+
+  /** Returns true if field condition is set (has been assigned a value) and 
false otherwise */
+  public boolean isSetCondition() {
+    return this.condition != null;
+  }
+
+  public void setConditionIsSet(boolean value) {
+    if (!value) {
+      this.condition = null;
+    }
+  }
+
+  @Override
+  public void setFieldValue(_Fields field, 
@org.apache.thrift.annotation.Nullable java.lang.Object value) {
+    switch (field) {
+      case TYPE:
+        if (value == null) {
+          unsetType();
+        } else {
+          setType((JoinType) value);
+        }
+        break;
+
+      case LEFT:
+        if (value == null) {
+          unsetLeft();
+        } else {
+          setLeft((DataSource) value);
+        }
+        break;
+
+      case RIGHT:
+        if (value == null) {
+          unsetRight();
+        } else {
+          setRight((DataSource) value);
+        }
+        break;
+
+      case CONDITION:
+        if (value == null) {
+          unsetCondition();
+        } else {
+          setCondition((Expression) value);
+        }
+        break;
+    }
+  }
+
+  @org.apache.thrift.annotation.Nullable
+  @Override
+  public java.lang.Object getFieldValue(_Fields field) {
+    switch (field) {
+      case TYPE:
+        return getType();
+
+      case LEFT:
+        return getLeft();
+
+      case RIGHT:
+        return getRight();
+
+      case CONDITION:
+        return getCondition();
+    }
+    throw new java.lang.IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned 
a value) and false otherwise */
+  @Override
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new java.lang.IllegalArgumentException();
+    }
+
+    switch (field) {
+      case TYPE:
+        return isSetType();
+      case LEFT:
+        return isSetLeft();
+      case RIGHT:
+        return isSetRight();
+      case CONDITION:
+        return isSetCondition();
+    }
+    throw new java.lang.IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(java.lang.Object that) {
+    if (that instanceof Join) {
+      return this.equals((Join) that);
+    }
+    return false;
+  }
+
+  public boolean equals(Join that) {
+    if (that == null) {
+      return false;
+    }
+    if (this == that) {
+      return true;
+    }
+
+    boolean this_present_type = true && this.isSetType();
+    boolean that_present_type = true && that.isSetType();
+    if (this_present_type || that_present_type) {
+      if (!(this_present_type && that_present_type)) {
+        return false;
+      }
+      if (!this.type.equals(that.type)) {
+        return false;
+      }
+    }
+
+    boolean this_present_left = true && this.isSetLeft();
+    boolean that_present_left = true && that.isSetLeft();
+    if (this_present_left || that_present_left) {
+      if (!(this_present_left && that_present_left)) {
+        return false;
+      }
+      if (!this.left.equals(that.left)) {
+        return false;
+      }
+    }
+
+    boolean this_present_right = true && this.isSetRight();
+    boolean that_present_right = true && that.isSetRight();
+    if (this_present_right || that_present_right) {
+      if (!(this_present_right && that_present_right)) {
+        return false;
+      }
+      if (!this.right.equals(that.right)) {
+        return false;
+      }
+    }
+
+    boolean this_present_condition = true && this.isSetCondition();
+    boolean that_present_condition = true && that.isSetCondition();
+    if (this_present_condition || that_present_condition) {
+      if (!(this_present_condition && that_present_condition)) {
+        return false;
+      }
+      if (!this.condition.equals(that.condition)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int hashCode = 1;
+
+    hashCode = hashCode * 8191 + ((isSetType()) ? 131071 : 524287);
+    if (isSetType()) {
+      hashCode = hashCode * 8191 + type.getValue();
+    }
+
+    hashCode = hashCode * 8191 + ((isSetLeft()) ? 131071 : 524287);
+    if (isSetLeft()) {
+      hashCode = hashCode * 8191 + left.hashCode();
+    }
+
+    hashCode = hashCode * 8191 + ((isSetRight()) ? 131071 : 524287);
+    if (isSetRight()) {
+      hashCode = hashCode * 8191 + right.hashCode();
+    }
+
+    hashCode = hashCode * 8191 + ((isSetCondition()) ? 131071 : 524287);
+    if (isSetCondition()) {
+      hashCode = hashCode * 8191 + condition.hashCode();
+    }
+
+    return hashCode;
+  }
+
+  @Override
+  public int compareTo(Join other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = java.lang.Boolean.compare(isSetType(), other.isSetType());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetType()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.type, 
other.type);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = java.lang.Boolean.compare(isSetLeft(), other.isSetLeft());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetLeft()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.left, 
other.left);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = java.lang.Boolean.compare(isSetRight(), 
other.isSetRight());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetRight()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.right, 
other.right);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = java.lang.Boolean.compare(isSetCondition(), 
other.isSetCondition());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetCondition()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.condition, 
other.condition);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  @org.apache.thrift.annotation.Nullable
+  @Override
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  @Override
+  public void read(org.apache.thrift.protocol.TProtocol iprot)
+      throws org.apache.thrift.TException {
+    scheme(iprot).read(iprot, this);
+  }
+
+  @Override
+  public void write(org.apache.thrift.protocol.TProtocol oprot)
+      throws org.apache.thrift.TException {
+    scheme(oprot).write(oprot, this);
+  }
+
+  @Override
+  public java.lang.String toString() {
+    java.lang.StringBuilder sb = new java.lang.StringBuilder("Join(");
+    boolean first = true;
+
+    sb.append("type:");
+    if (this.type == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.type);
+    }
+    first = false;
+    if (!first) {
+      sb.append(", ");
+    }
+    sb.append("left:");
+    if (this.left == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.left);
+    }
+    first = false;
+    if (!first) {
+      sb.append(", ");
+    }
+    sb.append("right:");
+    if (this.right == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.right);
+    }
+    first = false;
+    if (isSetCondition()) {
+      if (!first) {
+        sb.append(", ");
+      }
+      sb.append("condition:");
+      if (this.condition == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.condition);
+      }
+      first = false;
+    }
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate()
+      throws org.apache.thrift.TException {
+    // check for required fields
+    if (type == null) {
+      throw new org.apache.thrift.protocol.TProtocolException(
+          "Required field 'type' was not present! Struct: " + toString());
+    }
+    if (left == null) {
+      throw new org.apache.thrift.protocol.TProtocolException(
+          "Required field 'left' was not present! Struct: " + toString());
+    }
+    if (right == null) {
+      throw new org.apache.thrift.protocol.TProtocolException(
+          "Required field 'right' was not present! Struct: " + toString());
+    }
+    // check for sub-struct validity
+    if (left != null) {
+      left.validate();
+    }
+    if (right != null) {
+      right.validate();
+    }
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out)
+      throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in)
+      throws java.io.IOException, java.lang.ClassNotFoundException {
+    try {
+      read(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class JoinStandardSchemeFactory implements 
org.apache.thrift.scheme.SchemeFactory {
+    @Override
+    public JoinStandardScheme getScheme() {
+      return new JoinStandardScheme();
+    }
+  }
+
+  private static class JoinStandardScheme extends 
org.apache.thrift.scheme.StandardScheme<Join> {
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol iprot, Join struct)
+        throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true) {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // TYPE
+            if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+              struct.type = 
org.apache.pinot.common.request.JoinType.findByValue(iprot.readI32());
+              struct.setTypeIsSet(true);
+            } else {
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          case 2: // LEFT
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+              struct.left = new DataSource();
+              struct.left.read(iprot);
+              struct.setLeftIsSet(true);
+            } else {
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          case 3: // RIGHT
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+              struct.right = new DataSource();
+              struct.right.read(iprot);
+              struct.setRightIsSet(true);
+            } else {
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          case 4: // CONDITION
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+              struct.condition = new Expression();
+              struct.condition.read(iprot);
+              struct.setConditionIsSet(true);
+            } else {
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked 
in the validate method
+      struct.validate();
+    }
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol oprot, Join struct)
+        throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.type != null) {
+        oprot.writeFieldBegin(TYPE_FIELD_DESC);
+        oprot.writeI32(struct.type.getValue());
+        oprot.writeFieldEnd();
+      }
+      if (struct.left != null) {
+        oprot.writeFieldBegin(LEFT_FIELD_DESC);
+        struct.left.write(oprot);
+        oprot.writeFieldEnd();
+      }
+      if (struct.right != null) {
+        oprot.writeFieldBegin(RIGHT_FIELD_DESC);
+        struct.right.write(oprot);
+        oprot.writeFieldEnd();
+      }
+      if (struct.condition != null) {
+        if (struct.isSetCondition()) {
+          oprot.writeFieldBegin(CONDITION_FIELD_DESC);
+          struct.condition.write(oprot);
+          oprot.writeFieldEnd();
+        }
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+  }
+
+  private static class JoinTupleSchemeFactory implements 
org.apache.thrift.scheme.SchemeFactory {
+    @Override
+    public JoinTupleScheme getScheme() {
+      return new JoinTupleScheme();
+    }
+  }
+
+  private static class JoinTupleScheme extends 
org.apache.thrift.scheme.TupleScheme<Join> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, Join struct)
+        throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TTupleProtocol oprot = 
(org.apache.thrift.protocol.TTupleProtocol) prot;
+      oprot.writeI32(struct.type.getValue());
+      struct.left.write(oprot);
+      struct.right.write(oprot);
+      java.util.BitSet optionals = new java.util.BitSet();
+      if (struct.isSetCondition()) {
+        optionals.set(0);
+      }
+      oprot.writeBitSet(optionals, 1);
+      if (struct.isSetCondition()) {
+        struct.condition.write(oprot);
+      }
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, Join struct)
+        throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TTupleProtocol iprot = 
(org.apache.thrift.protocol.TTupleProtocol) prot;
+      struct.type = 
org.apache.pinot.common.request.JoinType.findByValue(iprot.readI32());
+      struct.setTypeIsSet(true);
+      struct.left = new DataSource();
+      struct.left.read(iprot);
+      struct.setLeftIsSet(true);
+      struct.right = new DataSource();
+      struct.right.read(iprot);
+      struct.setRightIsSet(true);
+      java.util.BitSet incoming = iprot.readBitSet(1);
+      if (incoming.get(0)) {
+        struct.condition = new Expression();
+        struct.condition.read(iprot);
+        struct.setConditionIsSet(true);
+      }
+    }
+  }
+
+  private static <S extends org.apache.thrift.scheme.IScheme> S 
scheme(org.apache.thrift.protocol.TProtocol proto) {
+    return 
(org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? 
STANDARD_SCHEME_FACTORY
+        : TUPLE_SCHEME_FACTORY).getScheme();
+  }
+}
+
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/request/JoinType.java 
b/pinot-common/src/main/java/org/apache/pinot/common/request/JoinType.java
new file mode 100644
index 0000000000..08efcf21bb
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/request/JoinType.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.15.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.pinot.common.request;
+
+
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler 
(0.15.0)", date = "2023-03-14")
+public enum JoinType implements org.apache.thrift.TEnum {
+  INNER(0),
+  LEFT(1),
+  RIGHT(2),
+  FULL(3);
+
+  private final int value;
+
+  private JoinType(int value) {
+    this.value = value;
+  }
+
+  /**
+   * Get the integer value of this enum value, as defined in the Thrift IDL.
+   */
+  public int getValue() {
+    return value;
+  }
+
+  /**
+   * Find a the enum type by its integer value, as defined in the Thrift IDL.
+   * @return null if the value is not found.
+   */
+  @org.apache.thrift.annotation.Nullable
+  public static JoinType findByValue(int value) {
+    switch (value) {
+      case 0:
+        return INNER;
+      case 1:
+        return LEFT;
+      case 2:
+        return RIGHT;
+      case 3:
+        return FULL;
+      default:
+        return null;
+    }
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
index 27cf9b77be..237f0eb496 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
@@ -61,6 +61,8 @@ import org.apache.pinot.common.request.Expression;
 import org.apache.pinot.common.request.ExpressionType;
 import org.apache.pinot.common.request.Function;
 import org.apache.pinot.common.request.Identifier;
+import org.apache.pinot.common.request.Join;
+import org.apache.pinot.common.request.JoinType;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.common.utils.request.RequestUtils;
@@ -450,12 +452,7 @@ public class CalciteSqlParser {
     // FROM
     SqlNode fromNode = selectNode.getFrom();
     if (fromNode != null) {
-      DataSource dataSource = new DataSource();
-      dataSource.setTableName(fromNode.toString());
-      pinotQuery.setDataSource(dataSource);
-      if (fromNode instanceof SqlSelect || fromNode instanceof SqlOrderBy) {
-        dataSource.setSubquery(compileSqlNodeToPinotQuery(fromNode));
-      }
+      pinotQuery.setDataSource(compileToDataSource(fromNode));
     }
     // WHERE
     SqlNode whereNode = selectNode.getWhere();
@@ -492,6 +489,62 @@ public class CalciteSqlParser {
     return pinotQuery;
   }
 
+  private static DataSource compileToDataSource(SqlNode sqlNode) {
+    DataSource dataSource = new DataSource();
+    switch (sqlNode.getKind()) {
+      case IDENTIFIER:
+        dataSource.setTableName(sqlNode.toString());
+        break;
+      case AS:
+        List<SqlNode> operandList = ((SqlBasicCall) sqlNode).getOperandList();
+        dataSource.setSubquery(compileSqlNodeToPinotQuery(operandList.get(0)));
+        dataSource.setTableName(operandList.get(1).toString());
+        break;
+      case SELECT:
+      case ORDER_BY:
+        dataSource.setSubquery(compileSqlNodeToPinotQuery(sqlNode));
+        break;
+      case JOIN:
+        dataSource.setJoin(compileToJoin((SqlJoin) sqlNode));
+        break;
+      default:
+        throw new IllegalStateException("Unsupported SQL node kind as 
DataSource: " + sqlNode.getKind());
+    }
+    return dataSource;
+  }
+
+  private static Join compileToJoin(SqlJoin sqlJoin) {
+    Join join = new Join();
+    switch (sqlJoin.getJoinType()) {
+      case INNER:
+        join.setType(JoinType.INNER);
+        break;
+      case LEFT:
+        join.setType(JoinType.LEFT);
+        break;
+      case RIGHT:
+        join.setType(JoinType.RIGHT);
+        break;
+      case FULL:
+        join.setType(JoinType.FULL);
+        break;
+      default:
+        throw new IllegalStateException("Unsupported join type: " + 
sqlJoin.getJoinType());
+    }
+    join.setLeft(compileToDataSource(sqlJoin.getLeft()));
+    join.setRight(compileToDataSource(sqlJoin.getRight()));
+    switch (sqlJoin.getConditionType()) {
+      case ON:
+        join.setCondition(toExpression(sqlJoin.getCondition()));
+        break;
+      case NONE:
+        break;
+      default:
+        throw new IllegalStateException("Unsupported join condition type: " + 
sqlJoin.getConditionType());
+    }
+    return join;
+  }
+
   private static void queryRewrite(PinotQuery pinotQuery) {
     for (QueryRewriter queryRewriter : QUERY_REWRITERS) {
       pinotQuery = queryRewriter.rewrite(pinotQuery);
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
index f97257336c..9ac05dd985 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
@@ -27,10 +27,13 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlNumericLiteral;
+import org.apache.pinot.common.request.DataSource;
 import org.apache.pinot.common.request.Expression;
 import org.apache.pinot.common.request.ExpressionType;
 import org.apache.pinot.common.request.Function;
 import org.apache.pinot.common.request.Identifier;
+import org.apache.pinot.common.request.Join;
+import org.apache.pinot.common.request.JoinType;
 import org.apache.pinot.common.request.Literal;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.common.utils.request.RequestUtils;
@@ -1831,7 +1834,7 @@ public class CalciteSqlCompilerTest {
         "distinct_bar");
 
     query = "SELECT sum(distinct bar) AS distinct_bar, count(*), 
sum(a),min(a),max(b) FROM foo GROUP BY city ORDER BY "
-            + "distinct_bar";
+        + "distinct_bar";
     pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
     Assert.assertEquals(pinotQuery.getSelectList().size(), 5);
     Function selectFunctionCall = 
pinotQuery.getSelectList().get(0).getFunctionCall();
@@ -2239,9 +2242,8 @@ public class CalciteSqlCompilerTest {
     result = pinotQuery.getSelectList().get(0).getLiteral().getBoolValue();
     Assert.assertTrue(result);
 
-    query =
-        "select isSubnetOf('2001:db8:85a3::8a2e:370:7334/62', 
'2001:0db8:85a3:0003:ffff:ffff:ffff:ffff') from "
-            + "mytable";
+    query = "select isSubnetOf('2001:db8:85a3::8a2e:370:7334/62', 
'2001:0db8:85a3:0003:ffff:ffff:ffff:ffff') from "
+        + "mytable";
     pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
     result = pinotQuery.getSelectList().get(0).getLiteral().getBoolValue();
     Assert.assertTrue(result);
@@ -2949,13 +2951,24 @@ public class CalciteSqlCompilerTest {
    * Test for customized components in src/main/codegen/parserImpls.ftl file.
    */
   @Test
-  public void testParserExtensionImpl() {
+  public void testParserExtensionImpl()
+      throws Exception {
     String customSql = "INSERT INTO db.tbl FROM FILE 'file:///tmp/file1', FILE 
'file:///tmp/file2'";
     SqlNodeAndOptions sqlNodeAndOptions = 
testSqlWithCustomSqlParser(customSql);
     Assert.assertTrue(sqlNodeAndOptions.getSqlNode() instanceof 
SqlInsertFromFile);
     Assert.assertEquals(sqlNodeAndOptions.getSqlType(), PinotSqlType.DML);
   }
 
+  private static SqlNodeAndOptions testSqlWithCustomSqlParser(String sqlString)
+      throws Exception {
+    try (StringReader inStream = new StringReader(sqlString)) {
+      SqlParserImpl sqlParser = CalciteSqlParser.newSqlParser(inStream);
+      SqlNodeList sqlNodeList = sqlParser.SqlStmtsEof();
+      // Extract OPTION statements from sql.
+      return CalciteSqlParser.extractSqlNodeAndOptions(sqlString, sqlNodeList);
+    }
+  }
+
   @Test
   public void shouldParseBasicAtTimeZoneExtension() {
     // Given:
@@ -3023,7 +3036,7 @@ public class CalciteSqlCompilerTest {
 
     // query with IN / NOT IN clause
     query = "SELECT COUNT(*) FROM tbl1 WHERE userUUID IN (SELECT userUUID FROM 
tbl2) "
-            + "and uuid NOT IN (SELECT uuid from tbl3)";
+        + "and uuid NOT IN (SELECT uuid from tbl3)";
     sqlNodeAndOptions = RequestUtils.parseQuery(query);
     tableNames = 
CalciteSqlParser.extractTableNamesFromNode(sqlNodeAndOptions.getSqlNode());
     Assert.assertEquals(tableNames.size(), 3);
@@ -3043,8 +3056,8 @@ public class CalciteSqlCompilerTest {
 
     // query with aliases, JOIN, IN/NOT-IN, group-by
     query = "with tmp as (select col1, count(*) from tbl1 where condition1 = 
filter1 group by col1), "
-            + "tmp2 as (select A.col1, B.col2 from tbl2 as A JOIN tbl3 AS B on 
A.key = B.key) "
-            + "select sum(col1) from tmp where col1 in (select col1 from tmp2) 
and col1 not in (select col1 from tbl4)";
+        + "tmp2 as (select A.col1, B.col2 from tbl2 as A JOIN tbl3 AS B on 
A.key = B.key) "
+        + "select sum(col1) from tmp where col1 in (select col1 from tmp2) and 
col1 not in (select col1 from tbl4)";
     sqlNodeAndOptions = RequestUtils.parseQuery(query);
     tableNames = 
CalciteSqlParser.extractTableNamesFromNode(sqlNodeAndOptions.getSqlNode());
     Assert.assertEquals(tableNames.size(), 4);
@@ -3054,15 +3067,88 @@ public class CalciteSqlCompilerTest {
     Assert.assertEquals(tableNames.get(3), "tbl4");
   }
 
-  private static SqlNodeAndOptions testSqlWithCustomSqlParser(String 
sqlString) {
-    try (StringReader inStream = new StringReader(sqlString)) {
-      SqlParserImpl sqlParser = CalciteSqlParser.newSqlParser(inStream);
-      SqlNodeList sqlNodeList = sqlParser.SqlStmtsEof();
-      // Extract OPTION statements from sql.
-      return CalciteSqlParser.extractSqlNodeAndOptions(sqlString, sqlNodeList);
-    } catch (Exception e) {
-      Assert.fail("test custom sql parser failed", e);
-      return null;
-    }
+  @Test
+  public void testJoin() {
+    String query = "SELECT T1.a, T2.b FROM T1 JOIN T2";
+    PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+    DataSource dataSource = pinotQuery.getDataSource();
+    Assert.assertNull(dataSource.getTableName());
+    Assert.assertNull(dataSource.getSubquery());
+    Assert.assertNotNull(dataSource.getJoin());
+    Join join = dataSource.getJoin();
+    Assert.assertEquals(join.getType(), JoinType.INNER);
+    Assert.assertEquals(join.getLeft().getTableName(), "T1");
+    Assert.assertEquals(join.getRight().getTableName(), "T2");
+    Assert.assertNull(join.getCondition());
+
+    query = "SELECT T1.a, T2.b FROM T1 INNER JOIN T2 ON T1.key = T2.key";
+    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+    dataSource = pinotQuery.getDataSource();
+    Assert.assertNull(dataSource.getTableName());
+    Assert.assertNull(dataSource.getSubquery());
+    Assert.assertNotNull(dataSource.getJoin());
+    join = dataSource.getJoin();
+    Assert.assertEquals(join.getType(), JoinType.INNER);
+    Assert.assertEquals(join.getLeft().getTableName(), "T1");
+    Assert.assertEquals(join.getRight().getTableName(), "T2");
+    Assert.assertEquals(join.getCondition(), 
CalciteSqlParser.compileToExpression("T1.key = T2.key"));
+
+    query = "SELECT T1.a, T2.b FROM T1 FULL JOIN T2 ON T1.key = T2.key";
+    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+    dataSource = pinotQuery.getDataSource();
+    Assert.assertNull(dataSource.getTableName());
+    Assert.assertNull(dataSource.getSubquery());
+    Assert.assertNotNull(dataSource.getJoin());
+    join = dataSource.getJoin();
+    Assert.assertEquals(join.getType(), JoinType.FULL);
+    Assert.assertEquals(join.getLeft().getTableName(), "T1");
+    Assert.assertEquals(join.getRight().getTableName(), "T2");
+    Assert.assertEquals(join.getCondition(), 
CalciteSqlParser.compileToExpression("T1.key = T2.key"));
+
+    query = "SELECT T1.a, T2.b FROM T1 LEFT JOIN T2 ON T1.a > T2.b";
+    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+    dataSource = pinotQuery.getDataSource();
+    Assert.assertNull(dataSource.getTableName());
+    Assert.assertNull(dataSource.getSubquery());
+    Assert.assertNotNull(dataSource.getJoin());
+    join = dataSource.getJoin();
+    Assert.assertEquals(join.getType(), JoinType.LEFT);
+    Assert.assertEquals(join.getLeft().getTableName(), "T1");
+    Assert.assertEquals(join.getRight().getTableName(), "T2");
+    Assert.assertEquals(join.getCondition(), 
CalciteSqlParser.compileToExpression("T1.a > T2.b"));
+
+    query =
+        "SELECT T1.a, T2.b FROM T1 RIGHT JOIN (SELECT a, COUNT(*) AS b FROM T3 
GROUP BY a) AS T2 ON T1.key = T2.key";
+    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+    dataSource = pinotQuery.getDataSource();
+    Assert.assertNull(dataSource.getTableName());
+    Assert.assertNull(dataSource.getSubquery());
+    Assert.assertNotNull(dataSource.getJoin());
+    join = dataSource.getJoin();
+    Assert.assertEquals(join.getType(), JoinType.RIGHT);
+    Assert.assertEquals(join.getLeft().getTableName(), "T1");
+    DataSource right = join.getRight();
+    Assert.assertEquals(right.getTableName(), "T2");
+    PinotQuery rightSubquery = right.getSubquery();
+    Assert.assertEquals(rightSubquery,
+        CalciteSqlParser.compileToPinotQuery("SELECT a, COUNT(*) AS b FROM T3 
GROUP BY a"));
+    Assert.assertEquals(join.getCondition(), 
CalciteSqlParser.compileToExpression("T1.key = T2.key"));
+
+    query = "SELECT T1.a, T2.b FROM T1 JOIN (SELECT key, COUNT(*) AS b FROM T3 
JOIN T4 GROUP BY key) AS T2 "
+        + "ON T1.key = T2.key";
+    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+    dataSource = pinotQuery.getDataSource();
+    Assert.assertNull(dataSource.getTableName());
+    Assert.assertNull(dataSource.getSubquery());
+    Assert.assertNotNull(dataSource.getJoin());
+    join = dataSource.getJoin();
+    Assert.assertEquals(join.getType(), JoinType.INNER);
+    Assert.assertEquals(join.getLeft().getTableName(), "T1");
+    right = join.getRight();
+    Assert.assertEquals(right.getTableName(), "T2");
+    rightSubquery = right.getSubquery();
+    Assert.assertEquals(rightSubquery,
+        CalciteSqlParser.compileToPinotQuery("SELECT key, COUNT(*) AS b FROM 
T3 JOIN T4 GROUP BY key"));
+    Assert.assertEquals(join.getCondition(), 
CalciteSqlParser.compileToExpression("T1.key = T2.key"));
   }
 }
diff --git a/pinot-common/src/thrift/query.thrift 
b/pinot-common/src/thrift/query.thrift
index a2db738c36..12a601c2df 100644
--- a/pinot-common/src/thrift/query.thrift
+++ b/pinot-common/src/thrift/query.thrift
@@ -18,11 +18,6 @@
  */
 namespace java org.apache.pinot.common.request
 
-struct DataSource {
-  1: optional string tableName;
-  2: optional PinotQuery subquery;
-}
-
 struct PinotQuery {
   1: optional i32 version;
   2: optional DataSource dataSource;
@@ -39,10 +34,24 @@ struct PinotQuery {
   13: optional map<Expression, Expression> expressionOverrideHints;
 }
 
-enum ExpressionType {
-  LITERAL,
-  IDENTIFIER,
-  FUNCTION
+struct DataSource {
+  1: optional string tableName;
+  2: optional PinotQuery subquery;
+  3: optional Join join;
+}
+
+struct Join {
+  1: required JoinType type;
+  2: required DataSource left;
+  3: required DataSource right;
+  4: optional Expression condition;
+}
+
+enum JoinType {
+  INNER,
+  LEFT,
+  RIGHT,
+  FULL
 }
 
 struct Expression {
@@ -52,13 +61,15 @@ struct Expression {
   4: optional Identifier identifier;
 }
 
-struct Identifier {
-  1: required string name;
+enum ExpressionType {
+  LITERAL,
+  IDENTIFIER,
+  FUNCTION
 }
 
 union Literal {
   1: optional bool boolValue;
-  2: optional byte byteValue;
+  2: optional i8 byteValue;
   3: optional i16 shortValue;
   4: optional i32 intValue;
   5: optional i64 longValue;
@@ -69,6 +80,10 @@ union Literal {
   9: optional bool nullValue;
 }
 
+struct Identifier {
+  1: required string name;
+}
+
 struct Function {
   1: required string operator;
   2: optional list<Expression> operands;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to