billyean opened a new issue, #44895:
URL: https://github.com/apache/arrow/issues/44895

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   Platform: Ubuntu 20.04.6 LTS
   Arrow version: 15
   Client: C++
   Server: Java
   
   This is a reported issue, somehow I couldn't find the old reported issue in 
the system. If you happened to know why it lost, you can duplicate them.
   
   Description:
   We have Java flight server and C++ client both run in multiple threads 
environment. The Java flight server is using doEchange to communicate with 
client. Code is pasted as follows. We have identified line 
```reader.getDescriptor();``` went the infinite wait. 
   
   ```
       public void doExchange(CallContext context, FlightStream reader, 
ServerStreamListener writer) {
           try (BufferAllocator allocator = allocatorPool.submit(
               () -> this.allocator.newChildAllocator("exchange", 0, 
Long.MAX_VALUE)).get()) {
               FlightDescriptor descriptor = reader.getDescriptor();
               List<String> path = descriptor.getPath();
               String type = path.get(0);
               String funcSignature = path.get(1);
               switch (type.toLowerCase()) {
                   case "udf":
                       long start = System.currentTimeMillis();
                       AbstractUDFBatch udf = 
this.udfRegistry.get(funcSignature);
                       if (udf == null) {
                           writer.error(
                                   
CallStatus.NOT_FOUND.withDescription("Unregistered function: " + 
funcSignature).toRuntimeException());
                           return;
                       }
                       try (VectorSchemaRoot root = 
VectorSchemaRoot.create(udf.getOutputSchema(), allocator)) {
                           VectorLoader loader = new VectorLoader(root);
                           writer.start(root);
                           log.info("1. Read root.");
                           while (reader.next()) {
                               try (VectorSchemaRoot input = reader.getRoot()) {
                                   Iterator<VectorSchemaRoot> outputBatches = 
udf.invoke(input, allocator);
                                   while (outputBatches.hasNext()) {
                                       try (VectorSchemaRoot outputRoot = 
outputBatches.next()) {
                                           VectorUnloader unloader = new 
VectorUnloader(outputRoot);
                                           try (ArrowRecordBatch outputBatch = 
unloader.getRecordBatch()) {
                                               loader.load(outputBatch);
                                           }
                                       }
                                       writer.putNext();
                                   }
                               } catch (Exception e) {
                                   log.error("Error processing input", e);
                               }
                           }
                           log.info("1. Done read root.");
                           writer.completed();
                       }
                       log.info("Called user defined function {} costs {} ms.", 
funcSignature, System.currentTimeMillis() - start);
                       break;
   ```
   
   C++ code is simple as follows:
   ```
   std::vector<std::shared_ptr<RecordBatch>> UDFClient::Call(
       const std::vector<std::string>& paths,
       std::shared_ptr<RecordBatch>& batch) const {
     // Create a FlightDescriptor using a path
     FlightDescriptor descriptor = FlightDescriptor::Path(paths);
     auto exchange_result = Client_->DoExchange(descriptor);
     if (!exchange_result.ok()) {
       XXX_FAIL("Do exchange descriptor: "
               + exchange_result.status().ToString());
     }
     auto exchange = std::move(exchange_result.ValueUnsafe());
     // Get the writer and reader
     auto writer = std::move(exchange.writer);
     auto reader = std::move(exchange.reader);
   
     // Start sending the record batch
     auto status = writer->Begin(batch->schema());
     if (!status.ok()) {
       XXX_FAIL("Error write schema: " + status.ToString());
     }
     status = writer->WriteRecordBatch(*batch);
     if (!status.ok()) {
       XXX_FAIL("Error write data: " + status.ToString());
     }
     // Mark the stream as complete
     status = writer->DoneWriting();
     if (!status.ok()) {
       XXX_FAIL("Error done writing: " + status.ToString());
     }
     // Return the reader for reading the response
     auto result = reader->ToRecordBatches();
     if (!result.ok()) {
       XXX_FAIL(
           "colocate invocation failed: " + result.status().ToString());
     }
     return result.ValueUnsafe();
   }
   ```
   
   We have use `jstack` to check the all lock. 
```SettableFuture<FlightDescriptor> descriptor``` is the one not set that 
caused the lock.
   ```
   "pool-1-thread-16" #31 prio=5 os_prio=0 tid=0x00007f6d34009000 nid=0x3a19 
waiting on condition [0x00007f6d253ea000]
      java.lang.Thread.State: WAITING (parking)
           at sun.misc.Unsafe.park(Native Method)
           - parking to wait for  <0x000000076d596180> (a 
com.google.common.util.concurrent.SettableFuture)
           at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
           at 
com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:563)
           at 
com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:110)
           at 
org.apache.arrow.flight.FlightStream.getDescriptor(FlightStream.java:158)
           at com.bytedance.dp.udf.UDFProducer.doExchange(UDFProducer.java:266)
           at 
org.apache.arrow.flight.FlightService.lambda$doExchangeCustom$2(FlightService.java:382)
           at 
org.apache.arrow.flight.FlightService$$Lambda$65/298187580.run(Unknown Source)
           at io.grpc.Context$1.run(Context.java:566)
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   ```
   
   
   ### Component(s)
   
   C++, FlightRPC, Java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to