rseetham commented on code in PR #12980:
URL: https://github.com/apache/pinot/pull/12980#discussion_r1598807968


##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufUtils.java:
##########
@@ -48,21 +50,158 @@ public static InputStream 
getDescriptorFileInputStream(String descriptorFilePath
       PinotFS pinotFS = PinotFSFactory.create(scheme);
       Path localTmpDir = Files.createTempDirectory(TMP_DIR_PREFIX + 
System.currentTimeMillis());
       File protoDescriptorLocalFile = createLocalFile(descriptorFileURI, 
localTmpDir.toFile());
-      LOGGER.info("Copying protocol buffer descriptor file from source: {} to 
dst: {}", descriptorFilePath,
+      LOGGER.info("Copying protocol buffer jar/descriptor file from source: {} 
to dst: {}", filePath,
           protoDescriptorLocalFile.getAbsolutePath());
       pinotFS.copyToLocalFile(descriptorFileURI, protoDescriptorLocalFile);
-      return new FileInputStream(protoDescriptorLocalFile);
+      return protoDescriptorLocalFile;
     } else {
       throw new RuntimeException(String.format("Scheme: %s not supported in 
PinotFSFactory"
-          + " for protocol buffer descriptor file: %s.", scheme, 
descriptorFilePath));
+          + " for protocol buffer jar/descriptor file: %s.", scheme, 
filePath));
     }
   }
 
+  public static InputStream getDescriptorFileInputStream(String 
descriptorFilePath)
+      throws Exception {
+    return new FileInputStream(getFileCopiedToLocal(descriptorFilePath));
+  }
+
   public static File createLocalFile(URI srcURI, File dstDir) {
     String sourceURIPath = srcURI.getPath();
     File dstFile = new File(dstDir, new File(sourceURIPath).getName());
     LOGGER.debug("Created empty local temporary file {} to copy protocol "
         + "buffer descriptor {}", dstFile.getAbsolutePath(), srcURI);
     return dstFile;
   }
+
+  // Copied from Flink codebase. 
https://github.com/apache/flink/blob/master/flink-formats/flink-protobuf/
+  // src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java
+  // This is needed since the generated class name is not always the same as 
the proto file name.
+  // The descriptor that we get from the jar drops the first prefix of the 
proto class name.
+  // For example, insead of com.data.example.ExampleProto we get 
data.example.ExampleProto.
+  // Copied from Flink codebase.
+  // https://github.com/apache/flink/blob/master/flink-formats/flink-protobuf/
+  // src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java
+  public static String getFullJavaName(Descriptors.Descriptor descriptor) {
+    if (null != descriptor.getContainingType()) {
+      // nested type
+      String parentJavaFullName = 
getFullJavaName(descriptor.getContainingType());
+      return parentJavaFullName + "." + descriptor.getName();
+    } else {
+      // top level message
+      String outerProtoName = getOuterProtoPrefix(descriptor.getFile());
+      return outerProtoName + descriptor.getName();
+    }
+  }
+
+  public static String getFullJavaName(Descriptors.EnumDescriptor 
enumDescriptor) {
+    if (null != enumDescriptor.getContainingType()) {
+      return getFullJavaName(enumDescriptor.getContainingType())
+          + "."
+          + enumDescriptor.getName();
+    } else {
+      String outerProtoName = getOuterProtoPrefix(enumDescriptor.getFile());
+      return outerProtoName + enumDescriptor.getName();
+    }
+  }
+
+  public static String getOuterProtoPrefix(Descriptors.FileDescriptor 
fileDescriptor) {
+    String javaPackageName =
+        fileDescriptor.getOptions().hasJavaPackage()
+            ? fileDescriptor.getOptions().getJavaPackage()
+            : fileDescriptor.getPackage();
+    if (fileDescriptor.getOptions().getJavaMultipleFiles()) {
+      return javaPackageName + ".";
+    } else {
+      String outerClassName = getOuterClassName(fileDescriptor);
+      return javaPackageName + "." + outerClassName + ".";
+    }
+  }
+
+  public static String getOuterClassName(Descriptors.FileDescriptor 
fileDescriptor) {
+    if (fileDescriptor.getOptions().hasJavaOuterClassname()) {
+      return fileDescriptor.getOptions().getJavaOuterClassname();
+    } else {
+      String[] fileNames = fileDescriptor.getName().split("/");
+      String fileName = fileNames[fileNames.length - 1];
+      String outerName = 
ProtobufInternalUtils.underScoreToCamelCase(fileName.split("\\.")[0], true);
+      // 
https://developers.google.com/protocol-buffers/docs/reference/java-generated#invocation
+      // The name of the wrapper class is determined by converting the base 
name of the .proto
+      // file to camel case if the java_outer_classname option is not 
specified.
+      // For example, foo_bar.proto produces the class name FooBar. If there 
is a service,
+      // enum, or message (including nested types) in the file with the same 
name,
+      // "OuterClass" will be appended to the wrapper class's name.
+      boolean hasSameNameMessage =
+          fileDescriptor.getMessageTypes().stream()
+              .anyMatch(f -> f.getName().equals(outerName));
+      boolean hasSameNameEnum =
+          fileDescriptor.getEnumTypes().stream()
+              .anyMatch(f -> f.getName().equals(outerName));
+      boolean hasSameNameService =
+          fileDescriptor.getServices().stream()
+              .anyMatch(f -> f.getName().equals(outerName));
+      if (hasSameNameMessage || hasSameNameEnum || hasSameNameService) {
+        return outerName + PB_OUTER_CLASS_SUFFIX;
+      } else {
+        return outerName;
+      }
+    }
+  }
+
+  /**
+   * Get java type str from {@link Descriptors.FieldDescriptor} which directly 
fetched from protobuf object.
+   *
+   * @return The returned code phrase will be used as java type str in codegen 
sections.
+   */
+  public static String getTypeStrFromProto(Descriptors.FieldDescriptor fd, 
boolean isList) {
+    String typeStr;
+    switch (fd.getJavaType()) {
+      case MESSAGE:
+        if (fd.isMapField()) {
+          // map
+          Descriptors.FieldDescriptor keyFd =
+              fd.getMessageType().findFieldByName("key");

Review Comment:
   I can't make it static. It's in a recursive function. But I made it final.



##########
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufUtils.java:
##########
@@ -48,21 +50,158 @@ public static InputStream 
getDescriptorFileInputStream(String descriptorFilePath
       PinotFS pinotFS = PinotFSFactory.create(scheme);
       Path localTmpDir = Files.createTempDirectory(TMP_DIR_PREFIX + 
System.currentTimeMillis());
       File protoDescriptorLocalFile = createLocalFile(descriptorFileURI, 
localTmpDir.toFile());
-      LOGGER.info("Copying protocol buffer descriptor file from source: {} to 
dst: {}", descriptorFilePath,
+      LOGGER.info("Copying protocol buffer jar/descriptor file from source: {} 
to dst: {}", filePath,
           protoDescriptorLocalFile.getAbsolutePath());
       pinotFS.copyToLocalFile(descriptorFileURI, protoDescriptorLocalFile);
-      return new FileInputStream(protoDescriptorLocalFile);
+      return protoDescriptorLocalFile;

Review Comment:
   Renamed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to