Copilot commented on code in PR #2148:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2148#discussion_r3008069804
##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -293,20 +293,19 @@ void PutS3Object::onTrigger(core::ProcessContext&
context, core::ProcessSession&
}
std::optional<minifi::aws::s3::PutObjectResult> result;
- session.read(flow_file, [this, &flow_file, &put_s3_request_params,
&result](const std::shared_ptr<io::InputStream>& stream) -> int64_t {
+ session.read(flow_file, [this, &flow_file, &put_s3_request_params,
&result](const std::shared_ptr<io::InputStream>& stream) -> io::IoResult {
try {
if (flow_file->getSize() <= multipart_threshold_) {
logger_->log_info("Uploading S3 Object '{}' in a single upload",
put_s3_request_params->object_key);
result = s3_wrapper_.putObject(*put_s3_request_params, stream,
flow_file->getSize());
- return gsl::narrow<int64_t>(flow_file->getSize());
- } else {
- logger_->log_info("S3 Object '{}' passes the multipart threshold,
uploading it in multiple parts", put_s3_request_params->object_key);
- result = s3_wrapper_.putObjectMultipart(*put_s3_request_params,
stream, flow_file->getSize(), multipart_size_);
- return gsl::narrow<int64_t>(flow_file->getSize());
+ return io::IoResult::fromSizeT(flow_file->getSize());
}
+ logger_->log_info("S3 Object '{}' passes the multipart threshold,
uploading it in multiple parts", put_s3_request_params->object_key);
+ result = s3_wrapper_.putObjectMultipart(*put_s3_request_params, stream,
flow_file->getSize(), multipart_size_);
+ return io::IoResult::fromSizeT(flow_file->getSize());
Review Comment:
`IoResult::fromSizeT(flow_file->getSize())` implicitly converts `uint64_t`
to `size_t`, which can truncate on 32-bit builds (and may trigger conversion
warnings). Consider adding/using an `IoResult` factory that accepts `uint64_t`
(or explicitly `gsl::narrow<size_t>(flow_file->getSize())` if truncation is
impossible here) to make the conversion safe and intentional.
##########
extensions/bustache/ApplyTemplate.cpp:
##########
@@ -61,8 +61,11 @@ void ApplyTemplate::onTrigger(core::ProcessContext& context,
core::ProcessSessio
// TODO(calebj) write ostream reciever for format() to prevent excessive
copying
std::string ostring = bustache::to_string(format(data));
- output_stream->write(gsl::make_span(ostring).as_span<const std::byte>());
- return gsl::narrow<int64_t>(ostring.length());
+ const auto write_res =
output_stream->write(gsl::make_span(ostring).as_span<const std::byte>());
+ if (write_res < 0) {
+ return io::IoResult::error();
+ }
+ return io::IoResult::fromSizeT(ostring.length());
Review Comment:
`output_stream->write(...)` returns a `size_t`, so `write_res < 0` is always
false and write failures won’t be detected. Also, the callback currently
returns `ostring.length()` rather than the actual `write_res` (and doesn’t
verify a full write). Use `io::isError(write_res)` (and ideally check
`write_res == ostring.size()`) and return an `IoResult` derived from the actual
write result.
```suggestion
if (io::isError(write_res) || write_res != ostring.size()) {
return io::IoResult::error();
}
return io::IoResult::fromSizeT(write_res);
```
##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -163,21 +163,21 @@ std::queue<ConsumeMQTT::SmartMessage>
ConsumeMQTT::getReceivedMqttMessages() {
return msg_queue;
}
-int64_t ConsumeMQTT::WriteCallback::operator() (const
std::shared_ptr<io::OutputStream>& stream) {
+io::IoResult ConsumeMQTT::WriteCallback::operator() (const
std::shared_ptr<io::OutputStream>& stream) {
if (message_.contents->payloadlen < 0) {
success_status_ = false;
logger_->log_error("Payload length of message is negative, value is [{}]",
message_.contents->payloadlen);
- return -1;
+ return io::IoResult::error();
}
- const auto len =
stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload),
gsl::narrow<size_t>(message_.contents->payloadlen));
+ const size_t len =
stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload),
gsl::narrow<size_t>(message_.contents->payloadlen));
if (io::isError(len)) {
success_status_ = false;
logger_->log_error("Stream writing error when processing message");
- return -1;
+ return io::IoResult::error();
}
- return gsl::narrow<int64_t>(len);
+ return io::IoResult::fromSizeT(gsl::narrow<int64_t>(len));
Review Comment:
`io::IoResult::fromSizeT(gsl::narrow<int64_t>(len))` is inconsistent:
`fromSizeT` expects a `size_t`, and narrowing `len` to `int64_t` can throw/lose
information for large payloads. This should just return
`io::IoResult::fromSizeT(len)` (or `fromI64` if you intentionally want signed
semantics).
##########
minifi-api/common/include/minifi-cpp/io/StreamCallback.h:
##########
@@ -16,24 +16,101 @@
*/
#pragma once
+#include <cinttypes>
#include <functional>
#include <memory>
#include <optional>
+#include "../../minifi-api/include/minifi-c/minifi-c.h"
+#include "Stream.h"
+#include "minifi-cpp/utils/gsl.h"
+#include "utils/expected.h"
+
namespace org::apache::nifi::minifi::io {
class InputStream;
class OutputStream;
-struct ReadWriteResult {
- int64_t bytes_written = 0;
- int64_t bytes_read = 0;
+class IoResult {
+ public:
+ IoResult() = delete;
+ IoResult(const IoResult&) = default;
+ IoResult(IoResult&&) noexcept = default;
+ IoResult& operator=(IoResult&&) noexcept = default;
+ IoResult& operator=(const IoResult&) = default;
+
+ virtual ~IoResult() = default;
+
+ static IoResult error() { return
IoResult(nonstd::make_unexpected(MINIFI_IO_ERROR)); }
+ static IoResult cancelled() { return
IoResult(nonstd::make_unexpected(MINIFI_IO_CANCEL)); }
+ static IoResult zero() { return IoResult(0U); }
+
+ static IoResult fromI64(int64_t i64_val) {
+ if (i64_val < 0) { return
IoResult(nonstd::make_unexpected(static_cast<MinifiIoStatus>(i64_val))); }
+ return IoResult(gsl::narrow<uint64_t>(i64_val));
+ }
+
+ static IoResult fromSizeT(size_t val) {
+ if (isError(val)) { return IoResult::error(); }
+ return IoResult(gsl::narrow<uint64_t>(val));
+ }
+
+ [[nodiscard]] int64_t toI64() const {
+ if (result_.has_value()) { return gsl::narrow<int64_t>(*result_); }
+ return result_.error();
+ }
+
+ [[nodiscard]] bool is_cancelled() const { return !result_ && result_.error()
== MINIFI_IO_CANCEL; }
+
+ bool operator()() const { return result_.has_value(); }
+ bool operator!() const { return !result_.has_value(); }
+
+ uint64_t operator*() const { return *result_; }
+
+ nonstd::expected<uint64_t, MinifiIoStatus> inner() const { return result_; }
+
+ private:
+ explicit IoResult(nonstd::expected<uint64_t, MinifiIoStatus> result) :
result_(std::move(result)) {}
+
+ nonstd::expected<uint64_t, MinifiIoStatus> result_;
+};
+
+class ReadWriteResult {
+ public:
+ ReadWriteResult() = delete;
+ ReadWriteResult(const ReadWriteResult&) = default;
+ ReadWriteResult(ReadWriteResult&&) noexcept = default;
+ ReadWriteResult& operator=(ReadWriteResult&&) noexcept = default;
+ ReadWriteResult& operator=(const ReadWriteResult&) = default;
+
+ ReadWriteResult(const uint64_t bytes_read, const uint64_t bytes_written)
+ : result_(ReadWrite{.bytes_read = bytes_read, .bytes_written =
bytes_written}) {}
+
+ static ReadWriteResult zero() { return ReadWriteResult(ReadWrite{.bytes_read
= 0, .bytes_written = 0}); };
+ static ReadWriteResult error() { return
ReadWriteResult(nonstd::make_unexpected(MINIFI_IO_ERROR)); }
+ static ReadWriteResult cancelled() { return
ReadWriteResult(nonstd::make_unexpected(MINIFI_IO_CANCEL)); }
+
+ virtual ~ReadWriteResult() = default;
+
+ bool operator()() const { return result_.has_value(); }
+ bool operator!() const { return !result_.has_value(); }
+
+ uint64_t bytesWritten() const { return result_->bytes_written; }
+ uint64_t bytesRead() const { return result_->bytes_read; }
+
+ private:
+ struct ReadWrite {
+ uint64_t bytes_read;
+ uint64_t bytes_written;
+ };
+ explicit ReadWriteResult(nonstd::expected<ReadWrite, MinifiIoStatus> result)
: result_(std::move(result)) {}
+
+ nonstd::expected<ReadWrite, MinifiIoStatus> result_;
};
-// FlowFile IO Callback functions for input and output
-// throw exception for error
-using InputStreamCallback = std::function<int64_t(const
std::shared_ptr<InputStream>& input_stream)>;
-using OutputStreamCallback = std::function<int64_t(const
std::shared_ptr<OutputStream>& output_stream)>;
-using InputOutputStreamCallback =
std::function<std::optional<ReadWriteResult>(const
std::shared_ptr<InputStream>& input_stream, const
std::shared_ptr<OutputStream>& output_stream)>;
+using OutputStreamCallback = std::function<IoResult(const
std::shared_ptr<OutputStream>& output_stream)>;
+using InputStreamCallback = std::function<IoResult(const
std::shared_ptr<InputStream>& output_stream)>;
Review Comment:
`InputStreamCallback`'s parameter is named `output_stream`, which is
misleading (it’s a `std::shared_ptr<InputStream>`). Renaming the parameter to
`input_stream` would improve readability and avoid confusion at call sites.
##########
minifi-api/common/include/minifi-cpp/io/StreamCallback.h:
##########
@@ -16,24 +16,101 @@
*/
#pragma once
+#include <cinttypes>
#include <functional>
#include <memory>
#include <optional>
+#include "../../minifi-api/include/minifi-c/minifi-c.h"
+#include "Stream.h"
+#include "minifi-cpp/utils/gsl.h"
+#include "utils/expected.h"
Review Comment:
The `#include "../../minifi-api/include/minifi-c/minifi-c.h"` path appears
incorrect relative to this header's location (it resolves under
`minifi-api/common/include/...`, which doesn't contain
`minifi-api/include/...`). This is likely to break compilation. Prefer
including the header via an include-dir-relative path such as
`minifi-c/minifi-c.h` (or otherwise adjust to the correct relative path to
`minifi-api/include/minifi-c/minifi-c.h`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]