Copilot commented on code in PR #2148:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2148#discussion_r3008069804


##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -293,20 +293,19 @@ void PutS3Object::onTrigger(core::ProcessContext& 
context, core::ProcessSession&
   }
 
   std::optional<minifi::aws::s3::PutObjectResult> result;
-  session.read(flow_file, [this, &flow_file, &put_s3_request_params, 
&result](const std::shared_ptr<io::InputStream>& stream) -> int64_t {
+  session.read(flow_file, [this, &flow_file, &put_s3_request_params, 
&result](const std::shared_ptr<io::InputStream>& stream) -> io::IoResult {
     try {
       if (flow_file->getSize() <= multipart_threshold_) {
         logger_->log_info("Uploading S3 Object '{}' in a single upload", 
put_s3_request_params->object_key);
         result = s3_wrapper_.putObject(*put_s3_request_params, stream, 
flow_file->getSize());
-        return gsl::narrow<int64_t>(flow_file->getSize());
-      } else {
-        logger_->log_info("S3 Object '{}' passes the multipart threshold, 
uploading it in multiple parts", put_s3_request_params->object_key);
-        result = s3_wrapper_.putObjectMultipart(*put_s3_request_params, 
stream, flow_file->getSize(), multipart_size_);
-        return gsl::narrow<int64_t>(flow_file->getSize());
+        return io::IoResult::fromSizeT(flow_file->getSize());
       }
+      logger_->log_info("S3 Object '{}' passes the multipart threshold, 
uploading it in multiple parts", put_s3_request_params->object_key);
+      result = s3_wrapper_.putObjectMultipart(*put_s3_request_params, stream, 
flow_file->getSize(), multipart_size_);
+      return io::IoResult::fromSizeT(flow_file->getSize());

Review Comment:
   `IoResult::fromSizeT(flow_file->getSize())` implicitly converts `uint64_t` 
to `size_t`, which can truncate on 32-bit builds (and may trigger conversion 
warnings). Consider adding/using an `IoResult` factory that accepts `uint64_t` 
(or explicitly `gsl::narrow<size_t>(flow_file->getSize())` if truncation is 
impossible here) to make the conversion safe and intentional.



##########
extensions/bustache/ApplyTemplate.cpp:
##########
@@ -61,8 +61,11 @@ void ApplyTemplate::onTrigger(core::ProcessContext& context, 
core::ProcessSessio
 
     // TODO(calebj) write ostream reciever for format() to prevent excessive 
copying
     std::string ostring = bustache::to_string(format(data));
-    output_stream->write(gsl::make_span(ostring).as_span<const std::byte>());
-    return gsl::narrow<int64_t>(ostring.length());
+    const auto write_res = 
output_stream->write(gsl::make_span(ostring).as_span<const std::byte>());
+    if (write_res < 0) {
+      return io::IoResult::error();
+    }
+    return io::IoResult::fromSizeT(ostring.length());

Review Comment:
   `output_stream->write(...)` returns a `size_t`, so `write_res < 0` is always 
false and write failures won’t be detected. Also, the callback currently 
returns `ostring.length()` rather than the actual `write_res` (and doesn’t 
verify a full write). Use `io::isError(write_res)` (and ideally check 
`write_res == ostring.size()`) and return an `IoResult` derived from the actual 
write result.
   ```suggestion
       if (io::isError(write_res) || write_res != ostring.size()) {
         return io::IoResult::error();
       }
       return io::IoResult::fromSizeT(write_res);
   ```



##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -163,21 +163,21 @@ std::queue<ConsumeMQTT::SmartMessage> 
ConsumeMQTT::getReceivedMqttMessages() {
   return msg_queue;
 }
 
-int64_t ConsumeMQTT::WriteCallback::operator() (const 
std::shared_ptr<io::OutputStream>& stream) {
+io::IoResult ConsumeMQTT::WriteCallback::operator() (const 
std::shared_ptr<io::OutputStream>& stream) {
   if (message_.contents->payloadlen < 0) {
     success_status_ = false;
     logger_->log_error("Payload length of message is negative, value is [{}]", 
message_.contents->payloadlen);
-    return -1;
+    return io::IoResult::error();
   }
 
-  const auto len = 
stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload), 
gsl::narrow<size_t>(message_.contents->payloadlen));
+  const size_t len = 
stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload), 
gsl::narrow<size_t>(message_.contents->payloadlen));
   if (io::isError(len)) {
     success_status_ = false;
     logger_->log_error("Stream writing error when processing message");
-    return -1;
+    return io::IoResult::error();
   }
 
-  return gsl::narrow<int64_t>(len);
+  return io::IoResult::fromSizeT(gsl::narrow<int64_t>(len));

Review Comment:
   `io::IoResult::fromSizeT(gsl::narrow<int64_t>(len))` is inconsistent: 
`fromSizeT` expects a `size_t`, and narrowing `len` to `int64_t` can throw/lose 
information for large payloads. This should just return 
`io::IoResult::fromSizeT(len)` (or `fromI64` if you intentionally want signed 
semantics).



##########
minifi-api/common/include/minifi-cpp/io/StreamCallback.h:
##########
@@ -16,24 +16,101 @@
  */
 #pragma once
 
+#include <cinttypes>
 #include <functional>
 #include <memory>
 #include <optional>
 
+#include "../../minifi-api/include/minifi-c/minifi-c.h"
+#include "Stream.h"
+#include "minifi-cpp/utils/gsl.h"
+#include "utils/expected.h"
+
 namespace org::apache::nifi::minifi::io {
 
 class InputStream;
 class OutputStream;
 
-struct ReadWriteResult {
-  int64_t bytes_written = 0;
-  int64_t bytes_read = 0;
+class IoResult {
+ public:
+  IoResult() = delete;
+  IoResult(const IoResult&) = default;
+  IoResult(IoResult&&) noexcept = default;
+  IoResult& operator=(IoResult&&) noexcept = default;
+  IoResult& operator=(const IoResult&) = default;
+
+  virtual ~IoResult() = default;
+
+  static IoResult error() { return 
IoResult(nonstd::make_unexpected(MINIFI_IO_ERROR)); }
+  static IoResult cancelled() { return 
IoResult(nonstd::make_unexpected(MINIFI_IO_CANCEL)); }
+  static IoResult zero() { return IoResult(0U); }
+
+  static IoResult fromI64(int64_t i64_val) {
+    if (i64_val < 0) { return 
IoResult(nonstd::make_unexpected(static_cast<MinifiIoStatus>(i64_val))); }
+    return IoResult(gsl::narrow<uint64_t>(i64_val));
+  }
+
+  static IoResult fromSizeT(size_t val) {
+    if (isError(val)) { return IoResult::error(); }
+    return IoResult(gsl::narrow<uint64_t>(val));
+  }
+
+  [[nodiscard]] int64_t toI64() const {
+    if (result_.has_value()) { return gsl::narrow<int64_t>(*result_); }
+    return result_.error();
+  }
+
+  [[nodiscard]] bool is_cancelled() const { return !result_ && result_.error() 
== MINIFI_IO_CANCEL; }
+
+  bool operator()() const { return result_.has_value(); }
+  bool operator!() const { return !result_.has_value(); }
+
+  uint64_t operator*() const { return *result_; }
+
+  nonstd::expected<uint64_t, MinifiIoStatus> inner() const { return result_; }
+
+ private:
+  explicit IoResult(nonstd::expected<uint64_t, MinifiIoStatus> result) : 
result_(std::move(result)) {}
+
+  nonstd::expected<uint64_t, MinifiIoStatus> result_;
+};
+
+class ReadWriteResult {
+ public:
+  ReadWriteResult() = delete;
+  ReadWriteResult(const ReadWriteResult&) = default;
+  ReadWriteResult(ReadWriteResult&&) noexcept = default;
+  ReadWriteResult& operator=(ReadWriteResult&&) noexcept = default;
+  ReadWriteResult& operator=(const ReadWriteResult&) = default;
+
+  ReadWriteResult(const uint64_t bytes_read, const uint64_t bytes_written)
+      : result_(ReadWrite{.bytes_read = bytes_read, .bytes_written = 
bytes_written}) {}
+
+  static ReadWriteResult zero() { return ReadWriteResult(ReadWrite{.bytes_read 
= 0, .bytes_written = 0}); };
+  static ReadWriteResult error() { return 
ReadWriteResult(nonstd::make_unexpected(MINIFI_IO_ERROR)); }
+  static ReadWriteResult cancelled() { return 
ReadWriteResult(nonstd::make_unexpected(MINIFI_IO_CANCEL)); }
+
+  virtual ~ReadWriteResult() = default;
+
+  bool operator()() const { return result_.has_value(); }
+  bool operator!() const { return !result_.has_value(); }
+
+  uint64_t bytesWritten() const { return result_->bytes_written; }
+  uint64_t bytesRead() const { return result_->bytes_read; }
+
+ private:
+  struct ReadWrite {
+    uint64_t bytes_read;
+    uint64_t bytes_written;
+  };
+  explicit ReadWriteResult(nonstd::expected<ReadWrite, MinifiIoStatus> result) 
: result_(std::move(result)) {}
+
+  nonstd::expected<ReadWrite, MinifiIoStatus> result_;
 };
 
-// FlowFile IO Callback functions for input and output
-// throw exception for error
-using InputStreamCallback = std::function<int64_t(const 
std::shared_ptr<InputStream>& input_stream)>;
-using OutputStreamCallback = std::function<int64_t(const 
std::shared_ptr<OutputStream>& output_stream)>;
-using InputOutputStreamCallback = 
std::function<std::optional<ReadWriteResult>(const 
std::shared_ptr<InputStream>& input_stream, const 
std::shared_ptr<OutputStream>& output_stream)>;
+using OutputStreamCallback = std::function<IoResult(const 
std::shared_ptr<OutputStream>& output_stream)>;
+using InputStreamCallback = std::function<IoResult(const 
std::shared_ptr<InputStream>& output_stream)>;

Review Comment:
   `InputStreamCallback`'s parameter is named `output_stream`, which is 
misleading (it’s a `std::shared_ptr<InputStream>`). Renaming the parameter to 
`input_stream` would improve readability and avoid confusion at call sites.



##########
minifi-api/common/include/minifi-cpp/io/StreamCallback.h:
##########
@@ -16,24 +16,101 @@
  */
 #pragma once
 
+#include <cinttypes>
 #include <functional>
 #include <memory>
 #include <optional>
 
+#include "../../minifi-api/include/minifi-c/minifi-c.h"
+#include "Stream.h"
+#include "minifi-cpp/utils/gsl.h"
+#include "utils/expected.h"

Review Comment:
   The `#include "../../minifi-api/include/minifi-c/minifi-c.h"` path appears 
incorrect relative to this header's location (it resolves under 
`minifi-api/common/include/...`, which doesn't contain 
`minifi-api/include/...`). This is likely to break compilation. Prefer 
including the header via an include-dir-relative path such as 
`minifi-c/minifi-c.h` (or otherwise adjust to the correct relative path to 
`minifi-api/include/minifi-c/minifi-c.h`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to